Skip to content

Commit

Permalink
Review Franz
Browse files Browse the repository at this point in the history
Modifications:

* `KafkaConsumer`:
    * rename `commitSync(_:)` -> `commit(_:)`
    * rename `commitAsync(_:)` -> `scheduleCommit(_:)`
* `RDKafkaClient`:
    * rename `commitSync(_:)` -> `commit(_:)`
    * rename `commitAsync(_:)` -> `scheduleCommit(_:)`
  • Loading branch information
felixschlegel committed Sep 20, 2023
1 parent 0fa9862 commit dd7e233
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 12 deletions.
13 changes: 9 additions & 4 deletions Sources/Kafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ public final class KafkaConsumer: Sendable, Service {
/// - Parameters:
/// - message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if committing failed.
public func commitAsync(_ message: KafkaConsumerMessage) throws {
public func triggerCommit(_ message: KafkaConsumerMessage) throws {
let action = self.stateMachine.withLockedValue { $0.commit() }
switch action {
case .throwClosedError:
Expand All @@ -379,10 +379,15 @@ public final class KafkaConsumer: Sendable, Service {
throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false")
}

try client.commitAsync(message)
try client.triggerCommit(message)
}
}

@available(*, deprecated, renamed: "commit")
public func commitSync(_ message: KafkaConsumerMessage) async throws {
try await self.commit(message)
}

/// Mark all messages up to the passed message in the topic as read.
/// Awaits until the commit succeeds or an error is encountered.
///
Expand All @@ -393,7 +398,7 @@ public final class KafkaConsumer: Sendable, Service {
/// - Parameters:
/// - message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if committing failed.
public func commitSync(_ message: KafkaConsumerMessage) async throws {
public func commit(_ message: KafkaConsumerMessage) async throws {
let action = self.stateMachine.withLockedValue { $0.commit() }
switch action {
case .throwClosedError:
Expand All @@ -403,7 +408,7 @@ public final class KafkaConsumer: Sendable, Service {
throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false")
}

try await client.commitSync(message)
try await client.commit(message)
}
}

Expand Down
6 changes: 3 additions & 3 deletions Sources/Kafka/RDKafka/RDKafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ final class RDKafkaClient: Sendable {
///
/// - Parameter message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if scheduling the commit failed.
func commitAsync(_ message: KafkaConsumerMessage) throws {
func triggerCommit(_ message: KafkaConsumerMessage) throws {
// The offset committed is always the offset of the next requested message.
// Thus, we increase the offset of the current message by one before committing it.
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
Expand Down Expand Up @@ -566,12 +566,12 @@ final class RDKafkaClient: Sendable {
///
/// - Parameter message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if the commit failed.
func commitSync(_ message: KafkaConsumerMessage) async throws {
func commit(_ message: KafkaConsumerMessage) async throws {
// Declare captured closure outside of withCheckedContinuation.
// We do that because do an unretained pass of the captured closure to
// librdkafka which means we have to keep a reference to the closure
// ourselves to make sure it does not get deallocated before
// commitSync returns.
// commit returns.
var capturedClosure: CapturedCommitCallback!
try await withCheckedThrowingContinuation { continuation in
capturedClosure = CapturedCommitCallback { result in
Expand Down
10 changes: 5 additions & 5 deletions Tests/IntegrationTests/KafkaTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ final class KafkaTests: XCTestCase {
}
}

func testProduceAndConsumeWithCommitAsync() async throws {
func testProduceAndConsumeWithTriggerCommit() async throws {
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest)

Expand Down Expand Up @@ -254,7 +254,7 @@ final class KafkaTests: XCTestCase {
var consumedMessages = [KafkaConsumerMessage]()
for try await message in consumer.messages {
consumedMessages.append(message)
try consumer.commitAsync(message)
try consumer.triggerCommit(message)

if consumedMessages.count >= testMessages.count {
break
Expand All @@ -272,7 +272,7 @@ final class KafkaTests: XCTestCase {
}
}

func testProduceAndConsumeWithCommitSync() async throws {
func testProduceAndConsumeWithCommit() async throws {
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest)

Expand Down Expand Up @@ -312,7 +312,7 @@ final class KafkaTests: XCTestCase {
var consumedMessages = [KafkaConsumerMessage]()
for try await message in consumer.messages {
consumedMessages.append(message)
try await consumer.commitSync(message)
try await consumer.commit(message)

if consumedMessages.count >= testMessages.count {
break
Expand Down Expand Up @@ -381,7 +381,7 @@ final class KafkaTests: XCTestCase {
continue
}
consumedMessages.append(message)
try await consumer.commitSync(message)
try await consumer.commit(message)

if consumedMessages.count >= testMessages.count {
break
Expand Down

0 comments on commit dd7e233

Please sign in to comment.