Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SKYEDEN-3271 | consumer group cleanup #1933

Merged
merged 22 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
e57248b
SKYEDEN-3271 | consumer group cleanup
MarcinBobinski Nov 28, 2024
3246fd7
SKYEDEN-3271 | lint
MarcinBobinski Dec 2, 2024
ac4a550
Merge remote-tracking branch 'origin/master' into SKYEDEN-3271-KafkaC…
MarcinBobinski Dec 2, 2024
f308826
SKYEDEN-3271 | tests
MarcinBobinski Dec 2, 2024
e6039cf
SKYEDEN-3271 | remove println
MarcinBobinski Dec 2, 2024
93f92b6
SKYEDEN-3271 | refactor + lint
MarcinBobinski Dec 3, 2024
380a992
SKYEDEN-3271 | integration tests
MarcinBobinski Dec 4, 2024
1350b25
SKYEDEN-3271 | lint
MarcinBobinski Dec 4, 2024
9a54adf
SKYEDEN-3271 | remove consumerGroupDeletionTasks after expiration
MarcinBobinski Dec 4, 2024
9fad1ad
SKYEDEN-3271 | added TODOs
MarcinBobinski Dec 10, 2024
e58a279
SKYEDEN-3271 | Management Leader
MarcinBobinski Dec 16, 2024
664e6ef
Merge branch 'master' into SKYEDEN-3271-KafkaConumerGroupDeletion-V2
MarcinBobinski Dec 19, 2024
13889a4
Merge remote-tracking branch 'origin/master' into SKYEDEN-3271-KafkaC…
MarcinBobinski Dec 19, 2024
0864abd
SKYEDEN-3271 | changes related to PR
MarcinBobinski Dec 19, 2024
8bdc29e
Merge remote-tracking branch 'origin/SKYEDEN-3271-KafkaConumerGroupDe…
MarcinBobinski Dec 19, 2024
6391fbb
SKYEDEN-3271 | changes related to PR
MarcinBobinski Dec 19, 2024
9f97071
Merge remote-tracking branch 'origin/master' into SKYEDEN-3271-KafkaC…
MarcinBobinski Dec 20, 2024
45259d0
Merge branch 'master' into SKYEDEN-3271-KafkaConumerGroupDeletion-V2
MarcinBobinski Dec 23, 2024
8c280bf
Merge branch 'master' into SKYEDEN-3271-KafkaConumerGroupDeletion-V2
MarcinBobinski Dec 23, 2024
3e3b4ec
SKYEDEN-3271 | Changes related to CR
MarcinBobinski Jan 3, 2025
effb792
Merge branch 'master' into SKYEDEN-3271-KafkaConumerGroupDeletion-V2
MarcinBobinski Jan 8, 2025
71964ce
Merge branch 'master' into SKYEDEN-3271-KafkaConumerGroupDeletion-V2
szczygiel-m Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public enum ErrorCode {
INVALID_QUERY(BAD_REQUEST),
IMPLEMENTATION_ABSENT(NOT_FOUND),
MOVING_SUBSCRIPTION_OFFSETS_VALIDATION_ERROR(BAD_REQUEST),
SENDING_TO_KAFKA_TIMEOUT(SERVICE_UNAVAILABLE);
SENDING_TO_KAFKA_TIMEOUT(SERVICE_UNAVAILABLE),
CONSUMER_GROUP_DELETION_ERROR(INTERNAL_SERVER_ERROR);

private final int httpCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class ZookeeperPaths {
public static final String INACTIVE_TOPICS_PATH = "inactive-topics";
public static final String MANAGEMENT_PATH = "management";
public static final String MANAGEMENT_PATH_LEADER = "leader";
public static final String CONSUMER_GROUP_TO_DELETE = "consumer-group-to-delete";
public static final String CONSUMER_GROUP_TO_DELETE_TASKS = "tasks";

private final String basePath;

Expand Down Expand Up @@ -193,6 +195,16 @@ public String managementLeaderPath() {
return Joiner.on(URL_SEPARATOR).join(basePath, MANAGEMENT_PATH, MANAGEMENT_PATH_LEADER);
}

public String consumerGroupToDeletePath() {
return Joiner.on(URL_SEPARATOR)
.join(basePath, CONSUMER_GROUP_TO_DELETE, CONSUMER_GROUP_TO_DELETE_TASKS);
}

public String consumerGroupToDeletePath(String taskId) {
return Joiner.on(URL_SEPARATOR)
.join(basePath, CONSUMER_GROUP_TO_DELETE, CONSUMER_GROUP_TO_DELETE_TASKS, taskId);
}

public String join(String... parts) {
return Joiner.on(URL_SEPARATOR).join(parts);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@
import pl.allegro.tech.hermes.common.kafka.KafkaConsumerPoolConfig;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper;
import pl.allegro.tech.hermes.management.config.SubscriptionProperties;
import pl.allegro.tech.hermes.management.config.TopicProperties;
import pl.allegro.tech.hermes.management.config.subscription.SubscriptionProperties;
import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager;
import pl.allegro.tech.hermes.management.domain.subscription.consumergroup.ConsumerGroupManager;
import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;
Expand All @@ -58,8 +57,6 @@ public class KafkaConfiguration implements MultipleDcKafkaNamesMappersFactory {

@Autowired SubscriptionProperties subscriptionProperties;

@Autowired CompositeMessageContentWrapper compositeMessageContentWrapper;

@Autowired ZookeeperRepositoryManager zookeeperRepositoryManager;

@Autowired MultiDatacenterRepositoryCommandExecutor multiDcExecutor;
Expand Down Expand Up @@ -104,6 +101,7 @@ MultiDCAwareService multiDCAwareService(
new KafkaSingleMessageReader(
kafkaRawMessageReader, schemaRepository, jsonAvroConverter);
return new BrokersClusterService(
kafkaProperties.getDatacenter(),
kafkaProperties.getQualifiedClusterName(),
messageReader,
retransmissionService,
Expand All @@ -112,7 +110,8 @@ MultiDCAwareService multiDCAwareService(
new OffsetsAvailableChecker(consumerPool, storage),
new LogEndOffsetChecker(consumerPool),
brokerAdminClient,
createConsumerGroupManager(kafkaProperties, kafkaNamesMapper),
createConsumerGroupManager(
kafkaProperties, kafkaNamesMapper, brokerAdminClient),
createKafkaConsumerManager(kafkaProperties, kafkaNamesMapper));
})
.collect(toList());
Expand All @@ -126,13 +125,16 @@ MultiDCAwareService multiDCAwareService(
}

private ConsumerGroupManager createConsumerGroupManager(
KafkaProperties kafkaProperties, KafkaNamesMapper kafkaNamesMapper) {
KafkaProperties kafkaProperties,
KafkaNamesMapper kafkaNamesMapper,
AdminClient kafkaAdminClient) {
return subscriptionProperties.isCreateConsumerGroupManuallyEnabled()
? new KafkaConsumerGroupManager(
kafkaNamesMapper,
kafkaProperties.getQualifiedClusterName(),
kafkaProperties.getBrokerList(),
kafkaProperties)
kafkaProperties,
kafkaAdminClient)
: new NoOpConsumerGroupManager();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.allegro.tech.hermes.management.config;
package pl.allegro.tech.hermes.management.config.subscription;

import static java.util.stream.Collectors.toList;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package pl.allegro.tech.hermes.management.config;
package pl.allegro.tech.hermes.management.config.subscription;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Clock;
import java.util.concurrent.Executors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
import pl.allegro.tech.hermes.management.config.subscription.consumergroup.ConsumerGroupCleanUpProperties;
import pl.allegro.tech.hermes.management.domain.Auditor;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.dc.RepositoryManager;
Expand Down Expand Up @@ -123,11 +125,15 @@ public SubscriptionRemover subscriptionRemover(
Auditor auditor,
MultiDatacenterRepositoryCommandExecutor multiDatacenterRepositoryCommandExecutor,
SubscriptionOwnerCache subscriptionOwnerCache,
SubscriptionRepository subscriptionRepository) {
SubscriptionRepository subscriptionRepository,
ConsumerGroupCleanUpProperties consumerGroupCleanUpProperties,
Clock clock) {
return new SubscriptionRemover(
auditor,
multiDatacenterRepositoryCommandExecutor,
subscriptionOwnerCache,
subscriptionRepository);
subscriptionRepository,
consumerGroupCleanUpProperties.isEnabled(),
clock);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.allegro.tech.hermes.management.config;
package pl.allegro.tech.hermes.management.config.subscription;

import org.springframework.boot.context.properties.ConfigurationProperties;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.allegro.tech.hermes.management.config;
package pl.allegro.tech.hermes.management.config.subscription;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package pl.allegro.tech.hermes.management.config.subscription.consumergroup;

import java.time.Clock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionService;
import pl.allegro.tech.hermes.management.domain.subscription.consumergroup.ConsumerGroupCleanUpScheduler;
import pl.allegro.tech.hermes.management.domain.subscription.consumergroup.ConsumerGroupToDeleteRepository;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;
import pl.allegro.tech.hermes.management.infrastructure.leader.ManagementLeadership;
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperRepositoryManager;

@Configuration
@EnableConfigurationProperties(ConsumerGroupCleanUpProperties.class)
public class ConsumerGroupCleanUpConfig {

@Autowired ZookeeperRepositoryManager zookeeperRepositoryManager;

@Bean
ConsumerGroupCleanUpScheduler consumerGroupCleanUpScheduler(
MultiDCAwareService multiDCAwareService,
SubscriptionService subscriptionService,
ConsumerGroupCleanUpProperties properties,
ManagementLeadership managementLeadership,
Clock clock) {
return new ConsumerGroupCleanUpScheduler(
multiDCAwareService,
zookeeperRepositoryManager.getRepositoriesByType(ConsumerGroupToDeleteRepository.class),
subscriptionService,
properties,
managementLeadership,
clock);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package pl.allegro.tech.hermes.management.config.subscription.consumergroup;

import java.time.Duration;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "consumer-group.clean-up")
public class ConsumerGroupCleanUpProperties {
private boolean enabled = true;
private Duration interval = Duration.ofMinutes(5);
private Duration initialDelay = Duration.ofMinutes(1);
private Duration timeout = Duration.ofHours(24);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't 24h timeout too long? just asking

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this timeout indicates after what period of time it should ignore tthe removal of consumer group.
I might change it to lower value, but i'm wondering as well if your question is not brought from the ambigious naiming of this variable. Maybe i should change it to something like "skipAfter" ?

private boolean removeTasksAfterTimeout = true;

public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public Duration getInterval() {
return interval;
}

public void setInterval(Duration interval) {
this.interval = interval;
}

public Duration getInitialDelay() {
return initialDelay;
}

public void setInitialDelay(Duration initialDelay) {
this.initialDelay = initialDelay;
}

public Duration getTimeout() {
return timeout;
}

public void setTimeout(Duration timeout) {
this.timeout = timeout;
}

public void setRemoveTasksAfterTimeout(boolean removeTasksAfterTimeout) {
this.removeTasksAfterTimeout = removeTasksAfterTimeout;
}

public boolean isRemoveTasksAfterTimeout() {
return removeTasksAfterTimeout;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package pl.allegro.tech.hermes.management.domain.subscription;

import java.time.Clock;
import java.time.Instant;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
Expand All @@ -12,6 +15,7 @@
import pl.allegro.tech.hermes.management.domain.auth.RequestUser;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.subscription.commands.RemoveSubscriptionRepositoryCommand;
import pl.allegro.tech.hermes.management.domain.subscription.consumergroup.command.ScheduleConsumerGroupToDeleteCommand;

public class SubscriptionRemover {

Expand All @@ -20,16 +24,22 @@ public class SubscriptionRemover {
private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor;
private final SubscriptionOwnerCache subscriptionOwnerCache;
private final SubscriptionRepository subscriptionRepository;
private final boolean scheduleConsumerGroupRemoval;
private final Clock clock;

public SubscriptionRemover(
Auditor auditor,
MultiDatacenterRepositoryCommandExecutor multiDcExecutor,
SubscriptionOwnerCache subscriptionOwnerCache,
SubscriptionRepository subscriptionRepository) {
SubscriptionRepository subscriptionRepository,
boolean scheduleConsumerGroupRemoval,
Clock clock) {
this.auditor = auditor;
this.multiDcExecutor = multiDcExecutor;
this.subscriptionOwnerCache = subscriptionOwnerCache;
this.subscriptionRepository = subscriptionRepository;
this.scheduleConsumerGroupRemoval = scheduleConsumerGroupRemoval;
this.clock = clock;
}

public void removeSubscription(
Expand All @@ -40,6 +50,14 @@ public void removeSubscription(
subscriptionRepository.getSubscriptionDetails(topicName, subscriptionName);
multiDcExecutor.executeByUser(
new RemoveSubscriptionRepositoryCommand(topicName, subscriptionName), removedBy);

if (scheduleConsumerGroupRemoval) {
multiDcExecutor.executeByUser(
new ScheduleConsumerGroupToDeleteCommand(
new SubscriptionName(subscriptionName, topicName), Instant.now(clock)),
removedBy);
}

auditor.objectRemoved(removedBy.getUsername(), subscription);
subscriptionOwnerCache.onRemovedSubscription(subscriptionName, topicName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,4 +461,9 @@ private List<SubscriptionNameWithMetrics> getSubscriptionsMetrics(
})
.collect(toList());
}

public boolean subscriptionExists(SubscriptionName subscriptionName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public functions are meant to be at the top of the class, private one are at the bottom, lets not break the convention here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

return subscriptionRepository.subscriptionExists(
subscriptionName.getTopicName(), subscriptionName.getName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package pl.allegro.tech.hermes.management.domain.subscription.consumergroup;

import static java.lang.String.format;

public class ConsumerGroupAlreadyScheduledToDeleteException extends RuntimeException {

public ConsumerGroupAlreadyScheduledToDeleteException(
ConsumerGroupToDelete consumerGroupToDelete, Throwable e) {
super(
format(
"Consumer group already scheduled to delete, for subscription %s ",
consumerGroupToDelete.subscriptionName().getQualifiedName()),
e);
}
}
Loading
Loading