Skip to content

Commit

Permalink
Merge branch 'master' into SKYEDEN-3020-ConsumerRetransmissionImprove…
Browse files Browse the repository at this point in the history
…ments
  • Loading branch information
MarcinBobinski authored Jan 9, 2025
2 parents 9f879a0 + da5a91f commit d686f7a
Show file tree
Hide file tree
Showing 34 changed files with 1,222 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public enum ErrorCode {
INVALID_QUERY(BAD_REQUEST),
IMPLEMENTATION_ABSENT(NOT_FOUND),
MOVING_SUBSCRIPTION_OFFSETS_VALIDATION_ERROR(BAD_REQUEST),
SENDING_TO_KAFKA_TIMEOUT(SERVICE_UNAVAILABLE);
SENDING_TO_KAFKA_TIMEOUT(SERVICE_UNAVAILABLE),
CONSUMER_GROUP_DELETION_ERROR(INTERNAL_SERVER_ERROR);

private final int httpCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class ZookeeperPaths {
public static final String INACTIVE_TOPICS_PATH = "inactive-topics";
public static final String MANAGEMENT_PATH = "management";
public static final String MANAGEMENT_PATH_LEADER = "leader";
public static final String CONSUMER_GROUP_TO_DELETE = "consumer-group-to-delete";
public static final String CONSUMER_GROUP_TO_DELETE_TASKS = "tasks";

private final String basePath;

Expand Down Expand Up @@ -193,6 +195,16 @@ public String managementLeaderPath() {
return Joiner.on(URL_SEPARATOR).join(basePath, MANAGEMENT_PATH, MANAGEMENT_PATH_LEADER);
}

public String consumerGroupToDeletePath() {
return Joiner.on(URL_SEPARATOR)
.join(basePath, CONSUMER_GROUP_TO_DELETE, CONSUMER_GROUP_TO_DELETE_TASKS);
}

public String consumerGroupToDeletePath(String taskId) {
return Joiner.on(URL_SEPARATOR)
.join(basePath, CONSUMER_GROUP_TO_DELETE, CONSUMER_GROUP_TO_DELETE_TASKS, taskId);
}

public String join(String... parts) {
return Joiner.on(URL_SEPARATOR).join(parts);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@
import pl.allegro.tech.hermes.common.kafka.KafkaConsumerPoolConfig;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper;
import pl.allegro.tech.hermes.management.config.SubscriptionProperties;
import pl.allegro.tech.hermes.management.config.TopicProperties;
import pl.allegro.tech.hermes.management.config.subscription.SubscriptionProperties;
import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager;
import pl.allegro.tech.hermes.management.domain.subscription.consumergroup.ConsumerGroupManager;
import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;
Expand All @@ -58,8 +57,6 @@ public class KafkaConfiguration implements MultipleDcKafkaNamesMappersFactory {

@Autowired SubscriptionProperties subscriptionProperties;

@Autowired CompositeMessageContentWrapper compositeMessageContentWrapper;

@Autowired ZookeeperRepositoryManager zookeeperRepositoryManager;

@Autowired MultiDatacenterRepositoryCommandExecutor multiDcExecutor;
Expand Down Expand Up @@ -104,6 +101,7 @@ MultiDCAwareService multiDCAwareService(
new KafkaSingleMessageReader(
kafkaRawMessageReader, schemaRepository, jsonAvroConverter);
return new BrokersClusterService(
kafkaProperties.getDatacenter(),
kafkaProperties.getQualifiedClusterName(),
messageReader,
retransmissionService,
Expand All @@ -112,7 +110,8 @@ MultiDCAwareService multiDCAwareService(
new OffsetsAvailableChecker(consumerPool, storage),
new LogEndOffsetChecker(consumerPool),
brokerAdminClient,
createConsumerGroupManager(kafkaProperties, kafkaNamesMapper),
createConsumerGroupManager(
kafkaProperties, kafkaNamesMapper, brokerAdminClient),
createKafkaConsumerManager(kafkaProperties, kafkaNamesMapper));
})
.collect(toList());
Expand All @@ -126,13 +125,16 @@ MultiDCAwareService multiDCAwareService(
}

private ConsumerGroupManager createConsumerGroupManager(
KafkaProperties kafkaProperties, KafkaNamesMapper kafkaNamesMapper) {
KafkaProperties kafkaProperties,
KafkaNamesMapper kafkaNamesMapper,
AdminClient kafkaAdminClient) {
return subscriptionProperties.isCreateConsumerGroupManuallyEnabled()
? new KafkaConsumerGroupManager(
kafkaNamesMapper,
kafkaProperties.getQualifiedClusterName(),
kafkaProperties.getBrokerList(),
kafkaProperties)
kafkaProperties,
kafkaAdminClient)
: new NoOpConsumerGroupManager();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.allegro.tech.hermes.management.config;
package pl.allegro.tech.hermes.management.config.subscription;

import static java.util.stream.Collectors.toList;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package pl.allegro.tech.hermes.management.config;
package pl.allegro.tech.hermes.management.config.subscription;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Clock;
import java.util.concurrent.Executors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
import pl.allegro.tech.hermes.management.config.subscription.consumergroup.ConsumerGroupCleanUpProperties;
import pl.allegro.tech.hermes.management.domain.Auditor;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.dc.RepositoryManager;
Expand Down Expand Up @@ -123,11 +125,15 @@ public SubscriptionRemover subscriptionRemover(
Auditor auditor,
MultiDatacenterRepositoryCommandExecutor multiDatacenterRepositoryCommandExecutor,
SubscriptionOwnerCache subscriptionOwnerCache,
SubscriptionRepository subscriptionRepository) {
SubscriptionRepository subscriptionRepository,
ConsumerGroupCleanUpProperties consumerGroupCleanUpProperties,
Clock clock) {
return new SubscriptionRemover(
auditor,
multiDatacenterRepositoryCommandExecutor,
subscriptionOwnerCache,
subscriptionRepository);
subscriptionRepository,
consumerGroupCleanUpProperties.isEnabled(),
clock);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.allegro.tech.hermes.management.config;
package pl.allegro.tech.hermes.management.config.subscription;

import org.springframework.boot.context.properties.ConfigurationProperties;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.allegro.tech.hermes.management.config;
package pl.allegro.tech.hermes.management.config.subscription;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package pl.allegro.tech.hermes.management.config.subscription.consumergroup;

import java.time.Clock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionService;
import pl.allegro.tech.hermes.management.domain.subscription.consumergroup.ConsumerGroupCleanUpScheduler;
import pl.allegro.tech.hermes.management.domain.subscription.consumergroup.ConsumerGroupToDeleteRepository;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;
import pl.allegro.tech.hermes.management.infrastructure.leader.ManagementLeadership;
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperRepositoryManager;

@Configuration
@EnableConfigurationProperties(ConsumerGroupCleanUpProperties.class)
public class ConsumerGroupCleanUpConfig {

@Autowired ZookeeperRepositoryManager zookeeperRepositoryManager;

@Bean
ConsumerGroupCleanUpScheduler consumerGroupCleanUpScheduler(
MultiDCAwareService multiDCAwareService,
SubscriptionService subscriptionService,
ConsumerGroupCleanUpProperties properties,
ManagementLeadership managementLeadership,
Clock clock) {
return new ConsumerGroupCleanUpScheduler(
multiDCAwareService,
zookeeperRepositoryManager.getRepositoriesByType(ConsumerGroupToDeleteRepository.class),
subscriptionService,
properties,
managementLeadership,
clock);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package pl.allegro.tech.hermes.management.config.subscription.consumergroup;

import java.time.Duration;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "consumer-group.clean-up")
public class ConsumerGroupCleanUpProperties {
private boolean enabled = true;
private Duration interval = Duration.ofMinutes(5);
private Duration initialDelay = Duration.ofMinutes(1);
private Duration timeout = Duration.ofHours(24);
private boolean removeTasksAfterTimeout = true;

public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public Duration getInterval() {
return interval;
}

public void setInterval(Duration interval) {
this.interval = interval;
}

public Duration getInitialDelay() {
return initialDelay;
}

public void setInitialDelay(Duration initialDelay) {
this.initialDelay = initialDelay;
}

public Duration getTimeout() {
return timeout;
}

public void setTimeout(Duration timeout) {
this.timeout = timeout;
}

public void setRemoveTasksAfterTimeout(boolean removeTasksAfterTimeout) {
this.removeTasksAfterTimeout = removeTasksAfterTimeout;
}

public boolean isRemoveTasksAfterTimeout() {
return removeTasksAfterTimeout;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package pl.allegro.tech.hermes.management.domain.subscription;

import java.time.Clock;
import java.time.Instant;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
Expand All @@ -12,6 +15,7 @@
import pl.allegro.tech.hermes.management.domain.auth.RequestUser;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.subscription.commands.RemoveSubscriptionRepositoryCommand;
import pl.allegro.tech.hermes.management.domain.subscription.consumergroup.command.ScheduleConsumerGroupToDeleteCommand;

public class SubscriptionRemover {

Expand All @@ -20,16 +24,22 @@ public class SubscriptionRemover {
private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor;
private final SubscriptionOwnerCache subscriptionOwnerCache;
private final SubscriptionRepository subscriptionRepository;
private final boolean scheduleConsumerGroupRemoval;
private final Clock clock;

public SubscriptionRemover(
Auditor auditor,
MultiDatacenterRepositoryCommandExecutor multiDcExecutor,
SubscriptionOwnerCache subscriptionOwnerCache,
SubscriptionRepository subscriptionRepository) {
SubscriptionRepository subscriptionRepository,
boolean scheduleConsumerGroupRemoval,
Clock clock) {
this.auditor = auditor;
this.multiDcExecutor = multiDcExecutor;
this.subscriptionOwnerCache = subscriptionOwnerCache;
this.subscriptionRepository = subscriptionRepository;
this.scheduleConsumerGroupRemoval = scheduleConsumerGroupRemoval;
this.clock = clock;
}

public void removeSubscription(
Expand All @@ -40,6 +50,14 @@ public void removeSubscription(
subscriptionRepository.getSubscriptionDetails(topicName, subscriptionName);
multiDcExecutor.executeByUser(
new RemoveSubscriptionRepositoryCommand(topicName, subscriptionName), removedBy);

if (scheduleConsumerGroupRemoval) {
multiDcExecutor.executeByUser(
new ScheduleConsumerGroupToDeleteCommand(
new SubscriptionName(subscriptionName, topicName), Instant.now(clock)),
removedBy);
}

auditor.objectRemoved(removedBy.getUsername(), subscription);
subscriptionOwnerCache.onRemovedSubscription(subscriptionName, topicName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ private List<UnhealthySubscription> getUnhealthyList(
}
}

public boolean subscriptionExists(SubscriptionName subscriptionName) {
return subscriptionRepository.subscriptionExists(
subscriptionName.getTopicName(), subscriptionName.getName());
}

private List<CompletableFuture<UnhealthySubscription>> filterSubscriptions(
Collection<Subscription> subscriptions,
boolean respectMonitoringSeverity,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package pl.allegro.tech.hermes.management.domain.subscription.consumergroup;

import static java.lang.String.format;

public class ConsumerGroupAlreadyScheduledToDeleteException extends RuntimeException {

public ConsumerGroupAlreadyScheduledToDeleteException(
ConsumerGroupToDelete consumerGroupToDelete, Throwable e) {
super(
format(
"Consumer group already scheduled to delete, for subscription %s ",
consumerGroupToDelete.subscriptionName().getQualifiedName()),
e);
}
}
Loading

0 comments on commit d686f7a

Please sign in to comment.