-
Notifications
You must be signed in to change notification settings - Fork 217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Skyeden-3020 | retransimission with detached consumers #1938
Skyeden-3020 | retransimission with detached consumers #1938
Conversation
…scription diagnostic tab + improved retransmission UI
@@ -4,6 +4,7 @@ | |||
import { useDialog } from '@/composables/dialog/use-dialog/useDialog'; | |||
import { useI18n } from 'vue-i18n'; | |||
import ConfirmationDialog from '@/components/confirmation-dialog/ConfirmationDialog.vue'; | |||
import LoadingSpinner from '@/components/loading-spinner/LoadingSpinner.vue'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
retransmit: [fromDate: string]; | ||
skipAllMessages: []; | ||
retransmit: [fromDate: string, onComplete: () => void]; | ||
skipAllMessages: [onComplete: () => void]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this necessary to have onComplete here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, now that I think about it, a cleaner way would be to eliminate that and lift the state to the parent component.
* The consumers are responsible for moving their local offsets(KafkaConsumer::seek method) as well as committed ones on Kafka (KafkaConsumer::commitSync method). | ||
* When the subscription is SUSPENDED, the management instance changes the commited offsets on kafka on its own (AdminClient::alterConsumerGroupOffsets). | ||
* There is no active consumer to notify in that case. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the explanation 🙇
cluster.getClusterName(), | ||
cluster.indicateOffsetChange(topic, subscriptionName, timestamp, dryRun))); | ||
|
||
if (!dryRun) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do You know why dryRun flag was used and is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no clue at all, I haven't decided to remove it because i don't want to break anything :P
offsets -> cluster.indicateOffsetChange(topic, subscriptionName, offsets))); | ||
|
||
logger.info( | ||
"Starting offsets move for subscription {}. Requested by {}. Retransmission offsets: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: maybe "Starting moving offsets" would sound better? let me know what do You think about it
|
||
OffsetNotFoundException(String message) { | ||
public OffsetNotFoundException(String message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this needed to be public?
…simissionWithDetachedConsumers # Conflicts: # hermes-console/src/views/subscription/SubscriptionView.spec.ts # hermes-console/src/views/subscription/SubscriptionView.vue
…simissionWithDetachedConsumers # Conflicts: # hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java # hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java # hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java
"Starting moving offsets for subscription {}. Requested by {}. Retransmission offsets: {}", | ||
topic.getQualifiedName() + "$" + subscriptionName, | ||
requester.getUsername(), | ||
brokerPartitionOffsets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it was before your change but this will not print partitions but a java object refrence: the PartitionOffset
has no toString()
method overriden ;) Could you fix that in the PR? ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
example from tests
Successfully moved offsets for retransmission of subscription TopicBuilderGroup1.TopicBuilderTopic2$subscription1. Requested by user: test-user. Retransmission offsets: {primary-dc=[pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset@e057ca5d, pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset@e057ca7c]}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch 👍
5780ef7
Current retransmission solution requires that the Hermes subscription is active and has consumers attached to the consumer group. If this condition is not met, retransmission will not occur.
This PR introduces the ability to perform retransmission by management instances (via offset changes on the consumer group) when the subscription is suspended and no consumers are attached to the group. Additionally, it adds validation to check the Hermes subscription state and the consumer group itself before performing retransmission.