Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SKYEDEN-3271 | consumer group cleanup #1933

Merged
merged 22 commits into from
Jan 8, 2025

Conversation

MarcinBobinski
Copy link
Collaborator

@MarcinBobinski MarcinBobinski commented Nov 28, 2024

After a Hermes subscription is removed, the associated Kafka consumer group is not deleted, causing increasing lag, as observed in Burrow.

This PR introduces a process to asynchronously delete consumer groups once a subscription is removed and all consumers have detached.

@MarcinBobinski MarcinBobinski marked this pull request as ready for review December 10, 2024 13:04
moscicky
moscicky previously approved these changes Dec 18, 2024
Copy link
Collaborator

@moscicky moscicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some minor suggestions, overall really clean solution and great test coverage


private void logTaskRemoval(ConsumerGroupToDelete task) {
logger.info(
"Removed deletion task for subscription {} in datacenter {}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Removed deletion task for subscription {} in datacenter {}",
"Removed consumer group deletion task for subscription {} in datacenter {}",

import pl.allegro.tech.hermes.api.ErrorCode;
import pl.allegro.tech.hermes.management.domain.ManagementException;

public class ConsumerGroupAlreadyScheduledToDeleteException extends ManagementException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this exception is always ignored and will never return 5xx to the user, perhaps we should simply extend RuntimeException instead of ManagementException?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure


@Override
public void execute(DatacenterBoundRepositoryHolder<ConsumerGroupToDeleteRepository> holder) {
consumerGroupToDelete =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we don't have to have this mutable property? for rollback we can get datacenter name from holder and we don't need to cache this

@@ -80,4 +90,40 @@ public void createConsumerGroup(Topic topic, Subscription subscription) {
e);
}
}

@Override
public void deleteConsumerGroup(SubscriptionName subscriptionName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we log datacenter instead of / additionally to cluster? Unfortunately cluster name does not give full context in multi dc setup

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clusterName property contains the value in the format {clustername}-{datacenter}.
I’ve renamed it to qualifiedClusterName, but if you think it would be clearer to separate them, I can adjust it accordingly.

and:
Subscription subscriptionToDelete = createTestSubscription(topic, "test-subscription-to-delete")
consumerGroupManager.createConsumerGroup(topic, subscriptionToDelete)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps add an assertion here to make sure that there were 2 groups before removal?

MarcinBobinski and others added 4 commits December 19, 2024 16:13
…onumerGroupDeletion-V2

# Conflicts:
#	hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperPaths.java
#	hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperRepositoryManager.java
#	hermes-management/src/main/resources/application.yaml
Copy link
Contributor

@szczygiel-m szczygiel-m left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice improvement 🎉

private boolean enabled = true;
private Duration interval = Duration.ofMinutes(5);
private Duration initialDelay = Duration.ofMinutes(1);
private Duration timeout = Duration.ofHours(24);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't 24h timeout too long? just asking

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this timeout indicates after what period of time it should ignore tthe removal of consumer group.
I might change it to lower value, but i'm wondering as well if your question is not brought from the ambigious naiming of this variable. Maybe i should change it to something like "skipAfter" ?

@@ -461,4 +461,9 @@ private List<SubscriptionNameWithMetrics> getSubscriptionsMetrics(
})
.collect(toList());
}

public boolean subscriptionExists(SubscriptionName subscriptionName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public functions are meant to be at the top of the class, private one are at the bottom, lets not break the convention here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

import pl.allegro.tech.hermes.api.Subscription
import pl.allegro.tech.hermes.api.SubscriptionMode
import pl.allegro.tech.hermes.api.Topic
import pl.allegro.tech.hermes.api.*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: * import

Copy link
Collaborator Author

@MarcinBobinski MarcinBobinski Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh not configured intellij with java files
A little bit sad that it is not detected with linter

edit: grovy files :P

import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
import static org.apache.kafka.clients.producer.ProducerConfig.*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above: * import

@szczygiel-m szczygiel-m merged commit da5a91f into master Jan 8, 2025
13 checks passed
@szczygiel-m szczygiel-m deleted the SKYEDEN-3271-KafkaConumerGroupDeletion-V2 branch January 8, 2025 15:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants