-
Notifications
You must be signed in to change notification settings - Fork 217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SKYEDEN-3020 | consumers retransmission improvements #1941
SKYEDEN-3020 | consumers retransmission improvements #1941
Conversation
9981d34
to
b304b45
Compare
…erRetransmissionImprovements # Conflicts: # hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java
if (!movedOffsets.isEmpty()) { | ||
// Incrementing assignment term ensures that currently committed offsets won't be overwritten | ||
// by the events from the past which are concurrently processed by the consumer | ||
partitionAssignmentState.incrementTerm(subscriptionName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are You sure that this is the correct order to commit first and then increment assignment term?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Order doesn't matter in this case. Even though the events are processed concurrently, the commitment of offsets is synchronous. The method commitIfReady can only be executed when the entire process of retransmission is complete.
hermes/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java
Lines 122 to 123 in 9f879a0
signalsInterrupt.run(); | |
commitIfReady(); |
…erRetransmissionImprovements # Conflicts: # integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java
4e2865e
In the current retransmission process, offsets are updated only locally in Hermes consumers via
KafkaConsumer::seek()
.This pull request introduces a change to also commit offsets to Kafka using
KafkaConsumer::commitSync()
.Furthermore, it increments the partitionAssignmentTerm to ensure Kafka-committed offsets are not overwritten by concurrently processed events created before retransmission.