-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: delete zombie consumer groups 🧟 #6160
Conversation
6163242
to
55d72a7
Compare
ksqldb-engine/src/main/java/io/confluent/ksql/engine/QueryCleanupService.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public void run() { | ||
tryRun( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want these three cleanups to happen atomically, i.e all or nothing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, if one of them fails we should still try to clean the others
() -> adminClient.get().deleteConsumerGroups(groups).all().get(), | ||
e -> (e instanceof RetriableException) | ||
|| (e instanceof GroupNotEmptyException && retryCount.getAndIncrement() < 5), | ||
() -> Duration.of(3, ChronoUnit.SECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the actual value of heartbeat.interval.ms
? Why hardcode it to 3 seconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a consumer config so it can be different for each consumer (and unfortunately this isn't a constant in the kafka code, so I can't just use a constant from that class directly)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
fixes #1283
Description
Before this PR, we don't clean up consumer groups that are leftover which makes it really annoying for users to look into their existing consumer groups. This PR adds cleanup on close for persistent and transient queries.
The unfortunate part, is that the consumer group API takes a while to update when the last consumer has left the group (specifically, the
heartbeat.interval.ms
which is default to 3 seconds, needs to pass after the last consumer leaves the group). To account for this, I've taken the external resource cleanup out of the main execution path so that the UI doesn't hang when we terminate a query.Testing done
Unit testing, and manual testing to make sure the resources are cleaned up.
Reviewer checklist