Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaConsumer.commitAsync #126

Merged

Conversation

felixschlegel
Copy link
Contributor

Motivation:

Having KafkaConsumer.commitSync be async is not always convenient as
it suspends the KafkaConsumer.messages read loop and can therefore
lower throughput.

This PR introduces a new method KafkaConsumer.commitAsync that allows
users who don't care about the result of the commit to commit in a
"fire-and-forget" manner.

Modifications:

  • new method KafkaConsumer.commitAsync
  • rename KafkaConsumer.StateMachine.commitSync to
    KafkaConsumer.StateMachine.commit to serve both commitSync and
    commitAsync
  • add new test for KafkaConsumer.commitAsync

@blindspotbounty
Copy link
Collaborator

That make sense to have async method, however it is especially a problem with fixed poll interval. If commits are often and poll interval is adaptive, that should not be a problem (unless it is a rare operation).
We bumped into that problem in particular (#128) and made polls adaptive, that helps a lot and in most cases not require async commit.

@FranzBusch
Copy link
Contributor

@felixschlegel Can you elaborate the use-case for the async commit here a bit more? When would you want to use this while doing manual commit management?

Motivation:

Having `KafkaConsumer.commitSync` be `async` is not always convenient as
it suspends the `KafkaConsumer.messages` read loop and can therefore
lower throughput.

This PR introduces a new method `KafkaConsumer.commitAsync` that allows
users who don't care about the result of the `commit` to commit in a
"fire-and-forget" manner.

Modifications:

* new method `KafkaConsumer.commitAsync`
* rename `KafkaConsumer.StateMachine.commitSync` to
  `KafkaConsumer.StateMachine.commit` to serve both `commitSync` and
  `commitAsync`
* add new test for `KafkaConsumer.commitAsync`
@felixschlegel felixschlegel force-pushed the fs-kafka-consumer-commit-async branch from 0324457 to 0fa9862 Compare September 4, 2023 15:07
@felixschlegel
Copy link
Contributor Author

@felixschlegel Can you elaborate the use-case for the async commit here a bit more? When would you want to use this while doing manual commit management?

I don't know if this is the answer you're looking for, but generally manual commits allow the user to decide when a message is marked as processed.

The problem with commitSync is that it adds a lot of latency to the read loop as it waits for the commit to be complete before moving on.

With commitAsync you just trigger a commit without handling the result of the commit (although we could add this with a callback-based approach) to get higher throughput while still having the freedom to decide when a message is considered processed.

In the worst case, if commitAsync fails, either the offset of message i is committed by committing the offset of message i + 1 or the offset of the message is not committed at all, which could lead to another consumer reading the message again.

Copy link
Contributor

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining @felixschlegel. One last comment

What about naming these two methods commit() async throws and triggerCommit() throws?

Copy link
Contributor

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixschlegel Can you check why the CI is failing?

///
/// - Parameter message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if scheduling the commit failed.
func commitAsync(_ message: KafkaConsumerMessage) throws {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we align the name here with triggerCommit and commit please

@felixschlegel felixschlegel force-pushed the fs-kafka-consumer-commit-async branch from 46098d3 to f294751 Compare September 6, 2023 12:33
@@ -367,17 +369,46 @@ public final class KafkaConsumer: Sendable, Service {
/// - Parameters:
/// - message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if committing failed.
public func triggerCommit(_ message: KafkaConsumerMessage) throws {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay sorry for the bike shedding but we probably should call this scheduleCommit which has a bit better semantics. Can you rename the public and internal methods?

@felixschlegel felixschlegel force-pushed the fs-kafka-consumer-commit-async branch from f294751 to dd7e233 Compare September 20, 2023 15:55
Modifications:

* `KafkaConsumer`:
    * rename `commitSync(_:)` -> `commit(_:)`
    * rename `commitAsync(_:)` -> `scheduleCommit(_:)`
* `RDKafkaClient`:
    * rename `commitSync(_:)` -> `commit(_:)`
    * rename `commitAsync(_:)` -> `scheduleCommit(_:)`
@felixschlegel felixschlegel force-pushed the fs-kafka-consumer-commit-async branch from dd7e233 to 6847dda Compare September 20, 2023 15:56
@FranzBusch FranzBusch merged commit 89ac807 into swift-server:main Sep 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants