From f551c5ba4ab770e62f98fede2aa9c066e1c72756 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20Bobi=C5=84ski?= <49727204+MarcinBobinski@users.noreply.github.com> Date: Fri, 10 Jan 2025 09:39:33 +0100 Subject: [PATCH] Skyeden-3020 | retransimission with detached consumers (#1938) * SKYEDEN-3020 | Retransmission with detached consumers * SKYEDEN-3020 | Refactor * SKYEDEN-3020 | Refactor * SKYEDEN-3271 | comment + removed move offsets to the end from the subscription diagnostic tab + improved retransmission UI * SKYEDEN-3020 | changes related to PR + frontend tests * SKYEDEN-3020 | resolve conflicts * SKYEDEN-3020 | changes related to CR - added toString for PartitionOffset class --- .../tech/hermes/api/ConsumerGroup.java | 8 ++ .../common/kafka/offset/PartitionOffset.java | 12 +++ hermes-console/json-server/server.ts | 8 +- hermes-console/src/api/hermes-client/index.ts | 9 --- .../useConsumerGroups.spec.ts | 57 ------------- .../use-consumer-groups/useConsumerGroups.ts | 35 +------- .../use-subscription/useSubscription.spec.ts | 69 +++++++++++++++- .../use-subscription/useSubscription.ts | 13 ++- hermes-console/src/i18n/en-US/index.ts | 4 - hermes-console/src/mocks/handlers.ts | 25 ++---- .../consumer-groups/ConsumerGroupsView.vue | 10 +-- .../subscription/SubscriptionView.spec.ts | 2 + .../views/subscription/SubscriptionView.vue | 10 ++- .../ManageMessagesCard.spec.ts | 45 +++++++++++ .../ManageMessagesCard.vue | 15 +++- .../management/api/SubscriptionsEndpoint.java | 17 +--- .../config/kafka/KafkaConfiguration.java | 10 +-- .../domain/message/RetransmissionService.java | 11 ++- .../subscription/SubscriptionService.java | 40 ++++++++++ .../TopicContentTypeMigrationService.java | 9 ++- .../kafka/MultiDCAwareService.java | 80 +++++++++++++------ .../kafka/service/BrokersClusterService.java | 74 ++++++++++------- .../KafkaRetransmissionService.java | 72 ++++++++++------- .../retransmit/OffsetNotFoundException.java | 2 +- .../client/integration/HermesTestClient.java | 12 +-- .../integration/ManagementTestClient.java | 14 ---- .../KafkaRetransmissionServiceTest.java | 18 ++++- .../SubscriptionManagementTest.java | 51 ------------ 28 files changed, 407 insertions(+), 325 deletions(-) diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java index 61528237df..223c7132be 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java @@ -41,6 +41,14 @@ public Set getMembers() { return members; } + public boolean isStable() { + return state.equals("Stable"); + } + + public boolean isEmpty() { + return state.equals("Empty"); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffset.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffset.java index 459770e508..10aa1f3caf 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffset.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffset.java @@ -52,4 +52,16 @@ public boolean equals(Object obj) { public int hashCode() { return Objects.hash(topic, partition, offset); } + + @Override + public String toString() { + return "PartitionOffset{" + + "topic=" + + topic + + ", partition=" + + partition + + ", offset=" + + offset + + '}'; + } } diff --git a/hermes-console/json-server/server.ts b/hermes-console/json-server/server.ts index 0a47cb6361..14790ef81f 100644 --- a/hermes-console/json-server/server.ts +++ b/hermes-console/json-server/server.ts @@ -20,10 +20,6 @@ server.post('/query/subscriptions', (req, res) => { res.jsonp(subscriptions); }); -server.post('/topics/*/subscriptions/*/moveOffsetsToTheEnd', (req, res) => { - res.sendStatus(200); -}); - server.post('/topicSubscriptions', (req, res) => { res.sendStatus(200); }); @@ -83,7 +79,9 @@ server.post('/offline-retransmission/tasks', (req, res) => { server.put( '/topics/:topic/subscriptions/:subscroption/retransmission', (req, res) => { - res.sendStatus(200); + setTimeout(() => { + res.sendStatus(200); + }, 2000); }, ); diff --git a/hermes-console/src/api/hermes-client/index.ts b/hermes-console/src/api/hermes-client/index.ts index c98cefc70f..ecd99622fc 100644 --- a/hermes-console/src/api/hermes-client/index.ts +++ b/hermes-console/src/api/hermes-client/index.ts @@ -310,15 +310,6 @@ export function fetchDashboardUrl(path: string): ResponsePromise { return axios.get(path); } -export function moveSubscriptionOffsets( - topicName: string, - subscription: string, -): ResponsePromise { - return axios.post( - `/topics/${topicName}/subscriptions/${subscription}/moveOffsetsToTheEnd`, - ); -} - export function removeTopic(topic: String): ResponsePromise { return axios.delete(`/topics/${topic}`); } diff --git a/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.spec.ts b/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.spec.ts index 0da6ef7da9..f6a2dc05c4 100644 --- a/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.spec.ts +++ b/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.spec.ts @@ -3,14 +3,9 @@ import { createTestingPinia } from '@pinia/testing'; import { dummyConsumerGroups } from '@/dummy/consumerGroups'; import { dummySubscription } from '@/dummy/subscription'; import { dummyTopic } from '@/dummy/topic'; -import { - expectNotificationDispatched, - notificationStoreSpy, -} from '@/utils/test-utils'; import { fetchConsumerGroupsErrorHandler, fetchConsumerGroupsHandler, - moveSubscriptionOffsetsHandler, } from '@/mocks/handlers'; import { setActivePinia } from 'pinia'; import { setupServer } from 'msw/node'; @@ -81,56 +76,4 @@ describe('useConsumerGroups', () => { expect(error.value.fetchConsumerGroups).not.toBeNull(); }); }); - - it('should show message that moving offsets was successful', async () => { - // given - server.use( - moveSubscriptionOffsetsHandler({ - topicName, - subscriptionName, - statusCode: 200, - }), - ); - server.listen(); - const notificationStore = notificationStoreSpy(); - - const { moveOffsets } = useConsumerGroups(topicName, subscriptionName); - - // when - moveOffsets(); - - // then - await waitFor(() => { - expectNotificationDispatched(notificationStore, { - type: 'success', - title: 'notifications.subscriptionOffsets.move.success', - }); - }); - }); - - it('should show message that moving offsets was unsuccessful', async () => { - // given - server.use( - moveSubscriptionOffsetsHandler({ - topicName, - subscriptionName, - statusCode: 500, - }), - ); - server.listen(); - - const notificationStore = notificationStoreSpy(); - const { moveOffsets } = useConsumerGroups(topicName, subscriptionName); - - // when - moveOffsets(); - - // then - await waitFor(() => { - expectNotificationDispatched(notificationStore, { - type: 'error', - title: 'notifications.subscriptionOffsets.move.failure', - }); - }); - }); }); diff --git a/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.ts b/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.ts index 0d5557c4a3..8956243cef 100644 --- a/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.ts +++ b/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.ts @@ -1,17 +1,10 @@ -import { dispatchErrorNotification } from '@/utils/notification-utils'; -import { - fetchConsumerGroups as getConsumerGroups, - moveSubscriptionOffsets, -} from '@/api/hermes-client'; +import { fetchConsumerGroups as getConsumerGroups } from '@/api/hermes-client'; import { ref } from 'vue'; -import { useGlobalI18n } from '@/i18n'; -import { useNotificationsStore } from '@/store/app-notifications/useAppNotifications'; import type { ConsumerGroup } from '@/api/consumer-group'; import type { Ref } from 'vue'; export interface UseConsumerGroups { consumerGroups: Ref; - moveOffsets: () => void; loading: Ref; error: Ref; } @@ -43,36 +36,10 @@ export function useConsumerGroups( } }; - const moveOffsets = async () => { - const notificationsStore = useNotificationsStore(); - try { - await moveSubscriptionOffsets(topicName, subscriptionName); - await notificationsStore.dispatchNotification({ - title: useGlobalI18n().t( - 'notifications.subscriptionOffsets.move.success', - { - subscriptionName, - }, - ), - text: '', - type: 'success', - }); - } catch (e: any) { - await dispatchErrorNotification( - e, - notificationsStore, - useGlobalI18n().t('notifications.subscriptionOffsets.move.failure', { - subscriptionName, - }), - ); - } - }; - fetchConsumerGroups(); return { consumerGroups, - moveOffsets, loading, error, }; diff --git a/hermes-console/src/composables/subscription/use-subscription/useSubscription.spec.ts b/hermes-console/src/composables/subscription/use-subscription/useSubscription.spec.ts index d4bfdc6f82..6a615eace7 100644 --- a/hermes-console/src/composables/subscription/use-subscription/useSubscription.spec.ts +++ b/hermes-console/src/composables/subscription/use-subscription/useSubscription.spec.ts @@ -1,4 +1,4 @@ -import { afterEach } from 'vitest'; +import { afterEach, expect } from 'vitest'; import { createRetransmissionHandler, fetchSubscriptionErrorHandler, @@ -18,7 +18,6 @@ import { dummySubscriptionHealth, dummySubscriptionMetrics, } from '@/dummy/subscription'; -import { expect } from 'vitest'; import { expectNotificationDispatched, notificationStoreSpy, @@ -391,6 +390,39 @@ describe('useSubscription', () => { }); }); + [200, 500].forEach((statusCode) => { + it(`should correctly manage the state of retransmission with status code ${statusCode}`, async () => { + // given + server.use( + createRetransmissionHandler({ + topicName: dummySubscription.topicName, + subscriptionName: dummySubscription.name, + statusCode, + delayMs: 100, + }), + ); + server.listen(); + + const { retransmitMessages, retransmitting } = useSubscription( + dummySubscription.topicName, + dummySubscription.name, + ); + + expect(retransmitting.value).toBeFalsy(); + + // when + retransmitMessages(new Date().toISOString()); + + // then + await waitFor(() => { + expect(retransmitting.value).toBeTruthy(); + }); + await waitFor(() => { + expect(retransmitting.value).toBeFalsy(); + }); + }); + }); + it('should show message that skipping all messages was successful', async () => { // given server.use( @@ -448,6 +480,39 @@ describe('useSubscription', () => { }); }); }); + + [200, 500].forEach((statusCode) => { + it(`should correctly manage the state of skipping all messages with status code ${statusCode}`, async () => { + // given + server.use( + createRetransmissionHandler({ + topicName: dummySubscription.topicName, + subscriptionName: dummySubscription.name, + statusCode, + delayMs: 100, + }), + ); + server.listen(); + + const { skipAllMessages, skippingAllMessages } = useSubscription( + dummySubscription.topicName, + dummySubscription.name, + ); + + expect(skippingAllMessages.value).toBeFalsy(); + + // when + skipAllMessages(); + + // then + await waitFor(() => { + expect(skippingAllMessages.value).toBeTruthy(); + }); + await waitFor(() => { + expect(skippingAllMessages.value).toBeFalsy(); + }); + }); + }); }); function expectErrors( diff --git a/hermes-console/src/composables/subscription/use-subscription/useSubscription.ts b/hermes-console/src/composables/subscription/use-subscription/useSubscription.ts index 7b4581bda1..149ac21a1c 100644 --- a/hermes-console/src/composables/subscription/use-subscription/useSubscription.ts +++ b/hermes-console/src/composables/subscription/use-subscription/useSubscription.ts @@ -32,6 +32,8 @@ export interface UseSubscription { subscriptionLastUndeliveredMessage: Ref; trackingUrls: Ref; loading: Ref; + retransmitting: Ref; + skippingAllMessages: Ref; error: Ref; removeSubscription: () => Promise; suspendSubscription: () => Promise; @@ -73,7 +75,8 @@ export function useSubscription( fetchSubscriptionLastUndeliveredMessage: null, getSubscriptionTrackingUrls: null, }); - + const retransmitting = ref(false); + const skippingAllMessages = ref(false); const fetchSubscription = async () => { try { loading.value = true; @@ -233,6 +236,7 @@ export function useSubscription( }; const retransmitMessages = async (from: string): Promise => { + retransmitting.value = true; try { await retransmitSubscriptionMessages(topicName, subscriptionName, { retransmissionDate: from, @@ -257,10 +261,13 @@ export function useSubscription( }), ); return false; + } finally { + retransmitting.value = false; } }; const skipAllMessages = async (): Promise => { + skippingAllMessages.value = true; const tomorrowDate = new Date(); tomorrowDate.setDate(tomorrowDate.getDate() + 1); try { @@ -290,6 +297,8 @@ export function useSubscription( ), ); return false; + } finally { + skippingAllMessages.value = false; } }; @@ -305,6 +314,8 @@ export function useSubscription( subscriptionLastUndeliveredMessage, trackingUrls, loading, + retransmitting, + skippingAllMessages, error, removeSubscription, suspendSubscription, diff --git a/hermes-console/src/i18n/en-US/index.ts b/hermes-console/src/i18n/en-US/index.ts index e189bac9df..2af47cb606 100644 --- a/hermes-console/src/i18n/en-US/index.ts +++ b/hermes-console/src/i18n/en-US/index.ts @@ -625,10 +625,6 @@ const en_US = { reason: 'Reason', timestamp: 'Timestamp', }, - moveOffsets: { - tooltip: 'Move subscription offsets to the end', - button: 'MOVE OFFSETS', - }, }, search: { collection: { diff --git a/hermes-console/src/mocks/handlers.ts b/hermes-console/src/mocks/handlers.ts index 335b4d152f..02927fad47 100644 --- a/hermes-console/src/mocks/handlers.ts +++ b/hermes-console/src/mocks/handlers.ts @@ -817,24 +817,6 @@ export const switchReadinessErrorHandler = ({ }); }); -export const moveSubscriptionOffsetsHandler = ({ - topicName, - subscriptionName, - statusCode, -}: { - topicName: string; - subscriptionName: string; - statusCode: number; -}) => - http.post( - `${url}/topics/${topicName}/subscriptions/${subscriptionName}/moveOffsetsToTheEnd`, - () => { - return new HttpResponse(undefined, { - status: statusCode, - }); - }, - ); - export const upsertTopicConstraintHandler = ({ statusCode, }: { @@ -974,14 +956,19 @@ export const createRetransmissionHandler = ({ statusCode, topicName, subscriptionName, + delayMs, }: { statusCode: number; topicName: string; subscriptionName: string; + delayMs?: number; }) => http.put( `${url}/topics/${topicName}/subscriptions/${subscriptionName}/retransmission`, - () => { + async () => { + if (delayMs && delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } return new HttpResponse(undefined, { status: statusCode, }); diff --git a/hermes-console/src/views/admin/consumer-groups/ConsumerGroupsView.vue b/hermes-console/src/views/admin/consumer-groups/ConsumerGroupsView.vue index e6149517dc..1da2a768ce 100644 --- a/hermes-console/src/views/admin/consumer-groups/ConsumerGroupsView.vue +++ b/hermes-console/src/views/admin/consumer-groups/ConsumerGroupsView.vue @@ -12,7 +12,7 @@ const params = route.params as Record; const { subscriptionId, topicId, groupId } = params; - const { consumerGroups, moveOffsets, loading, error } = useConsumerGroups( + const { consumerGroups, loading, error } = useConsumerGroups( topicId, subscriptionId, ); @@ -64,14 +64,6 @@ {{ $t('consumerGroups.title') }}

- - - {{ $t('subscription.moveOffsets.button') }} - {{ - $t('subscription.moveOffsets.tooltip') - }} - - diff --git a/hermes-console/src/views/subscription/SubscriptionView.spec.ts b/hermes-console/src/views/subscription/SubscriptionView.spec.ts index 5cf315fa29..06b5372ab4 100644 --- a/hermes-console/src/views/subscription/SubscriptionView.spec.ts +++ b/hermes-console/src/views/subscription/SubscriptionView.spec.ts @@ -37,6 +37,8 @@ const useSubscriptionStub: ReturnType = { subscriptionUndeliveredMessages: ref(dummyUndeliveredMessages), subscriptionLastUndeliveredMessage: ref(dummyUndeliveredMessage), trackingUrls: ref(dummyTrackingUrls), + retransmitting: computed(() => false), + skippingAllMessages: computed(() => false), error: ref({ fetchSubscription: null, fetchOwner: null, diff --git a/hermes-console/src/views/subscription/SubscriptionView.vue b/hermes-console/src/views/subscription/SubscriptionView.vue index 728ae5e0d0..e314757030 100644 --- a/hermes-console/src/views/subscription/SubscriptionView.vue +++ b/hermes-console/src/views/subscription/SubscriptionView.vue @@ -36,6 +36,8 @@ subscriptionUndeliveredMessages, subscriptionLastUndeliveredMessage, trackingUrls, + retransmitting, + skippingAllMessages, error, loading, removeSubscription, @@ -108,6 +110,10 @@ await retransmitMessages(fromDate); }; + const onSkipAllMessages = async () => { + await skipAllMessages(); + }; + const breadcrumbsItems = [ { title: t('subscription.subscriptionBreadcrumbs.home'), @@ -234,8 +240,10 @@ v-if="isSubscriptionOwnerOrAdmin(roles)" :topic="topicId" :subscription="subscriptionId" + :retransmitting="retransmitting" + :skippingAllMessages="skippingAllMessages" @retransmit="onRetransmit" - @skipAllMessages="skipAllMessages" + @skipAllMessages="onSkipAllMessages" /> - {{ $t('subscription.manageMessagesCard.retransmitButton') }} + + + {{ $t('subscription.manageMessagesCard.retransmitButton') }} + @@ -128,12 +135,16 @@ {{ $t('subscription.manageMessagesCard.skipAllMessagesTitle') }}

- {{ $t('subscription.manageMessagesCard.skipAllMessagesButton') }} + + + {{ $t('subscription.manageMessagesCard.skipAllMessagesButton') }} + diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java index 4c92836529..faa1b7a5b4 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java @@ -35,7 +35,6 @@ import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.SubscriptionHealth; import pl.allegro.tech.hermes.api.SubscriptionMetrics; -import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.Topic; import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.management.api.auth.HermesSecurityAwareRequestUser; @@ -273,7 +272,7 @@ public Response retransmit( @Context ContainerRequestContext requestContext) { MultiDCOffsetChangeSummary summary = - multiDCAwareService.retransmit( + subscriptionService.retransmit( topicService.getTopicDetails(TopicName.fromQualifiedName(qualifiedTopicName)), subscriptionName, offsetRetransmissionDate.getRetransmissionDate().toInstant().toEpochMilli(), @@ -283,20 +282,6 @@ public Response retransmit( return Response.status(OK).entity(summary).build(); } - @POST - @Consumes(APPLICATION_JSON) - @Produces(APPLICATION_JSON) - @RolesAllowed({Roles.ADMIN}) - @Path("/{subscriptionName}/moveOffsetsToTheEnd") - public Response moveOffsetsToTheEnd( - @PathParam("topicName") String qualifiedTopicName, - @PathParam("subscriptionName") String subscriptionName) { - TopicName topicName = fromQualifiedName(qualifiedTopicName); - multiDCAwareService.moveOffsetsToTheEnd( - topicService.getTopicDetails(topicName), new SubscriptionName(subscriptionName, topicName)); - return responseStatus(OK); - } - @GET @Produces(APPLICATION_JSON) @Path("/{subscriptionName}/events/{messageId}/trace") diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java index 995c6b2370..0ec19b12a9 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java @@ -36,7 +36,6 @@ import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaBrokerTopicManagement; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerGroupManager; -import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerManager; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaRawMessageReader; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaSingleMessageReader; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.LogEndOffsetChecker; @@ -111,8 +110,7 @@ MultiDCAwareService multiDCAwareService( new LogEndOffsetChecker(consumerPool), brokerAdminClient, createConsumerGroupManager( - kafkaProperties, kafkaNamesMapper, brokerAdminClient), - createKafkaConsumerManager(kafkaProperties, kafkaNamesMapper)); + kafkaProperties, kafkaNamesMapper, brokerAdminClient)); }) .collect(toList()); @@ -138,12 +136,6 @@ private ConsumerGroupManager createConsumerGroupManager( : new NoOpConsumerGroupManager(); } - private KafkaConsumerManager createKafkaConsumerManager( - KafkaProperties kafkaProperties, KafkaNamesMapper kafkaNamesMapper) { - return new KafkaConsumerManager( - kafkaProperties, kafkaNamesMapper, kafkaProperties.getBrokerList()); - } - private SubscriptionOffsetChangeIndicator getRepository( List> repositories, KafkaProperties kafkaProperties) { diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java index 5ad8f6fc35..76e8e64546 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java @@ -6,8 +6,15 @@ public interface RetransmissionService { - List indicateOffsetChange( - Topic topic, String subscription, String brokersClusterName, long timestamp, boolean dryRun); + List fetchTopicOffsetsAt(Topic topic, Long timestamp); + + List fetchTopicEndOffsets(Topic topic); + + void indicateOffsetChange( + Topic topic, + String subscription, + String brokersClusterName, + List partitionOffsets); boolean areOffsetsMoved(Topic topic, String subscriptionName, String brokersClusterName); } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java index c1fdd819c1..cac619a239 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java @@ -51,7 +51,9 @@ import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthChecker; import pl.allegro.tech.hermes.management.domain.subscription.validator.SubscriptionValidator; import pl.allegro.tech.hermes.management.domain.topic.TopicService; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MovingSubscriptionOffsetsValidationException; import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCOffsetChangeSummary; import pl.allegro.tech.hermes.tracker.management.LogRepository; public class SubscriptionService { @@ -466,4 +468,42 @@ private List getSubscriptionsMetrics( }) .collect(toList()); } + + public MultiDCOffsetChangeSummary retransmit( + Topic topic, String subscriptionName, Long timestamp, boolean dryRun, RequestUser requester) { + Subscription subscription = getSubscriptionDetails(topic.getName(), subscriptionName); + + MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = + multiDCAwareService.fetchTopicOffsetsAt(topic, timestamp); + + if (dryRun) return multiDCOffsetChangeSummary; + + /* + * The subscription state is used to determine how to move the offsets. + * When the subscription is ACTIVE, the management instance notifies consumers to change offsets. + * The consumers are responsible for moving their local offsets(KafkaConsumer::seek method) as well as committed ones on Kafka (KafkaConsumer::commitSync method). + * When the subscription is SUSPENDED, the management instance changes the commited offsets on kafka on its own (AdminClient::alterConsumerGroupOffsets). + * There is no active consumer to notify in that case. + */ + switch (subscription.getState()) { + case ACTIVE: + multiDCAwareService.moveOffsetsForActiveConsumers( + topic, + subscriptionName, + multiDCOffsetChangeSummary.getPartitionOffsetListPerBrokerName(), + requester); + break; + case SUSPENDED: + multiDCAwareService.moveOffsets( + topic, + subscriptionName, + multiDCOffsetChangeSummary.getPartitionOffsetListPerBrokerName()); + break; + case PENDING: + throw new MovingSubscriptionOffsetsValidationException( + "Cannot retransmit messages for subscription in PENDING state"); + } + + return multiDCOffsetChangeSummary; + } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java index 44117189d9..1ab6675422 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java @@ -63,8 +63,13 @@ void waitUntilAllSubscriptionsHasConsumersAssigned( private void notifySingleSubscription( Topic topic, Instant beforeMigrationInstant, String subscriptionName, RequestUser requester) { - multiDCAwareService.retransmit( - topic, subscriptionName, beforeMigrationInstant.toEpochMilli(), false, requester); + multiDCAwareService.moveOffsetsForActiveConsumers( + topic, + subscriptionName, + multiDCAwareService + .fetchTopicOffsetsAt(topic, beforeMigrationInstant.toEpochMilli()) + .getPartitionOffsetListPerBrokerName(), + requester); } private void waitUntilOffsetsAvailableOnAllKafkaTopics( diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java index 474c99d202..3b1833a5ed 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java @@ -6,8 +6,10 @@ import java.time.Duration; import java.time.Instant; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Consumer; @@ -19,6 +21,7 @@ import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.Topic; import pl.allegro.tech.hermes.common.exception.InternalProcessingException; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; import pl.allegro.tech.hermes.management.domain.auth.RequestUser; import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor; import pl.allegro.tech.hermes.management.domain.retransmit.RetransmitCommand; @@ -62,36 +65,67 @@ public String readMessageFromPrimary( .readMessageFromPrimary(topic, partition, offset); } - public MultiDCOffsetChangeSummary retransmit( - Topic topic, String subscriptionName, Long timestamp, boolean dryRun, RequestUser requester) { + public MultiDCOffsetChangeSummary fetchTopicOffsetsAt(Topic topic, Long timestamp) { MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = new MultiDCOffsetChangeSummary(); clusters.forEach( cluster -> multiDCOffsetChangeSummary.addPartitionOffsetList( - cluster.getClusterName(), - cluster.indicateOffsetChange(topic, subscriptionName, timestamp, dryRun))); - - if (!dryRun) { - logger.info( - "Starting retransmission for subscription {}. Requested by {}. Retransmission timestamp: {}", - topic.getQualifiedName() + "$" + subscriptionName, - requester.getUsername(), - timestamp); - multiDcExecutor.executeByUser( - new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())), - requester); - clusters.forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName)); - logger.info( - "Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission timestamp: {}", - topic.getQualifiedName() + "$" + subscriptionName, - requester.getUsername(), - timestamp); - } + cluster.getClusterName(), cluster.fetchTopicOffsetsAt(topic, timestamp))); return multiDCOffsetChangeSummary; } + public void moveOffsets( + Topic topic, + String subscriptionName, + Map> brokerPartitionOffsets) { + clusters.forEach( + cluster -> + cluster.validateIfOffsetsCanBeMoved( + topic, new SubscriptionName(subscriptionName, topic.getName()))); + + clusters.forEach( + cluster -> + cluster.moveOffsets( + new SubscriptionName(subscriptionName, topic.getName()), + brokerPartitionOffsets.getOrDefault( + cluster.getClusterName(), Collections.emptyList()))); + } + + public void moveOffsetsForActiveConsumers( + Topic topic, + String subscriptionName, + Map> brokerPartitionOffsets, + RequestUser requester) { + clusters.forEach( + cluster -> + cluster.validateIfOffsetsCanBeMovedByConsumers( + topic, new SubscriptionName(subscriptionName, topic.getName()))); + + clusters.forEach( + cluster -> + Optional.ofNullable(brokerPartitionOffsets.get(cluster.getClusterName())) + .ifPresent( + offsets -> cluster.indicateOffsetChange(topic, subscriptionName, offsets))); + + logger.info( + "Starting moving offsets for subscription {}. Requested by {}. Retransmission offsets: {}", + topic.getQualifiedName() + "$" + subscriptionName, + requester.getUsername(), + brokerPartitionOffsets); + + multiDcExecutor.executeByUser( + new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())), requester); + clusters.forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName)); + + logger.info( + "Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission offsets: {}", + topic.getQualifiedName() + "$" + subscriptionName, + requester.getUsername(), + brokerPartitionOffsets); + } + public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) { return clusters.stream() .allMatch(cluster -> cluster.areOffsetsAvailableOnAllKafkaTopics(topic)); @@ -166,10 +200,6 @@ public List describeConsumerGroups(Topic topic, String subscripti .collect(toList()); } - public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) { - clusters.forEach(c -> c.moveOffsetsToTheEnd(topic, subscription)); - } - public void deleteConsumerGroupForDatacenter( SubscriptionName subscriptionName, String datacenter) { clusters.stream() diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java index 85752100eb..185048d93f 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java @@ -29,6 +29,7 @@ import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId; import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper; import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; import pl.allegro.tech.hermes.management.domain.message.RetransmissionService; @@ -51,7 +52,6 @@ public class BrokersClusterService { private final ConsumerGroupsDescriber consumerGroupsDescriber; private final AdminClient adminClient; private final ConsumerGroupManager consumerGroupManager; - private final KafkaConsumerManager kafkaConsumerManager; public BrokersClusterService( String datacenter, @@ -63,8 +63,7 @@ public BrokersClusterService( OffsetsAvailableChecker offsetsAvailableChecker, LogEndOffsetChecker logEndOffsetChecker, AdminClient adminClient, - ConsumerGroupManager consumerGroupManager, - KafkaConsumerManager kafkaConsumerManager) { + ConsumerGroupManager consumerGroupManager) { this.datacenter = datacenter; this.clusterName = clusterName; this.singleMessageReader = singleMessageReader; @@ -77,7 +76,6 @@ public BrokersClusterService( kafkaNamesMapper, adminClient, logEndOffsetChecker, clusterName); this.adminClient = adminClient; this.consumerGroupManager = consumerGroupManager; - this.kafkaConsumerManager = kafkaConsumerManager; } public String getClusterName() { @@ -97,10 +95,14 @@ public String readMessageFromPrimary(Topic topic, Integer partition, Long offset topic, kafkaNamesMapper.toKafkaTopics(topic).getPrimary(), partition, offset); } - public List indicateOffsetChange( - Topic topic, String subscriptionName, Long timestamp, boolean dryRun) { - return retransmissionService.indicateOffsetChange( - topic, subscriptionName, clusterName, timestamp, dryRun); + public void indicateOffsetChange( + Topic topic, String subscriptionName, List partitionOffsets) { + retransmissionService.indicateOffsetChange( + topic, subscriptionName, clusterName, partitionOffsets); + } + + public List fetchTopicOffsetsAt(Topic topic, Long timestamp) { + return retransmissionService.fetchTopicOffsetsAt(topic, timestamp); } public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) { @@ -162,23 +164,16 @@ public Optional describeConsumerGroup(Topic topic, String subscri return consumerGroupsDescriber.describeConsumerGroup(topic, subscriptionName); } - public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) { - validateIfOffsetsCanBeMoved(topic, subscription); - - KafkaConsumer consumer = kafkaConsumerManager.createConsumer(subscription); - String kafkaTopicName = kafkaNamesMapper.toKafkaTopics(topic).getPrimary().name().asString(); - Set topicPartitions = getTopicPartitions(consumer, kafkaTopicName); - consumer.assign(topicPartitions); - - Map endOffsets = consumer.endOffsets(topicPartitions); - Map endOffsetsMetadata = buildOffsetsMetadata(endOffsets); - consumer.commitSync(endOffsetsMetadata); - consumer.close(); + public void moveOffsets(SubscriptionName subscription, List offsets) { + ConsumerGroupId consumerGroupId = kafkaNamesMapper.toConsumerGroupId(subscription); + Map offsetAndMetadata = buildOffsetsMetadata(offsets); + adminClient.alterConsumerGroupOffsets(consumerGroupId.asString(), offsetAndMetadata).all(); logger.info( - "Successfully moved offset to the end position for subscription {} and consumer group {}", + "Successfully moved offsets for subscription {} and consumer group {} to {}", subscription.getQualifiedName(), - kafkaNamesMapper.toConsumerGroupId(subscription)); + kafkaNamesMapper.toConsumerGroupId(subscription), + offsetAndMetadata.toString()); } private int numberOfAssignmentsForConsumersGroups(List consumerGroupsIds) @@ -193,11 +188,32 @@ private int numberOfAssignmentsForConsumersGroups(List consumerGroupsIds .size(); } - private void validateIfOffsetsCanBeMoved(Topic topic, SubscriptionName subscription) { + public void validateIfOffsetsCanBeMovedByConsumers(Topic topic, SubscriptionName subscription) { + describeConsumerGroup(topic, subscription.getName()) + .ifPresentOrElse( + group -> { + if (!group.isStable()) { + String s = + format( + "Consumer group %s for subscription %s is not stable.", + group.getGroupId(), subscription.getQualifiedName()); + throw new MovingSubscriptionOffsetsValidationException(s); + } + }, + () -> { + String s = + format( + "No consumer group for subscription %s exists.", + subscription.getQualifiedName()); + throw new MovingSubscriptionOffsetsValidationException(s); + }); + } + + public void validateIfOffsetsCanBeMoved(Topic topic, SubscriptionName subscription) { describeConsumerGroup(topic, subscription.getName()) .ifPresentOrElse( group -> { - if (!group.getMembers().isEmpty()) { + if (!group.isEmpty()) { String s = format( "Consumer group %s for subscription %s has still active members.", @@ -234,9 +250,13 @@ private Set getTopicPartitions( } private Map buildOffsetsMetadata( - Map offsets) { - return offsets.entrySet().stream() - .map(entry -> ImmutablePair.of(entry.getKey(), new OffsetAndMetadata(entry.getValue()))) + List offsets) { + return offsets.stream() + .map( + offset -> + ImmutablePair.of( + new TopicPartition(offset.getTopic().asString(), offset.getPartition()), + new OffsetAndMetadata(offset.getOffset()))) .collect(toMap(Pair::getKey, Pair::getValue)); } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java index d805d639c7..8c8cc04182 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java @@ -37,63 +37,79 @@ public KafkaRetransmissionService( } @Override - public List indicateOffsetChange( - Topic topic, String subscription, String brokersClusterName, long timestamp, boolean dryRun) { + public void indicateOffsetChange( + Topic topic, + String subscription, + String brokersClusterName, + List partitionOffsets) { + for (PartitionOffset partitionOffset : partitionOffsets) { + subscriptionOffsetChange.setSubscriptionOffset( + topic.getName(), subscription, brokersClusterName, partitionOffset); + } + } + @Override + public boolean areOffsetsMoved(Topic topic, String subscriptionName, String brokersClusterName) { + return kafkaNamesMapper + .toKafkaTopics(topic) + .allMatch( + kafkaTopic -> { + List partitionIds = + brokerStorage.readPartitionsIds(kafkaTopic.name().asString()); + return subscriptionOffsetChange.areOffsetsMoved( + topic.getName(), subscriptionName, brokersClusterName, kafkaTopic, partitionIds); + }); + } + + private KafkaConsumer createKafkaConsumer(KafkaTopic kafkaTopic, int partition) { + return consumerPool.get(kafkaTopic, partition); + } + + public List fetchTopicEndOffsets(Topic topic) { + return fetchTopicOffsetsAt(topic, null); + } + + public List fetchTopicOffsetsAt(Topic topic, Long timestamp) { List partitionOffsetList = new ArrayList<>(); kafkaNamesMapper .toKafkaTopics(topic) .forEach( k -> { List partitionsIds = brokerStorage.readPartitionsIds(k.name().asString()); - for (Integer partitionId : partitionsIds) { KafkaConsumer consumer = createKafkaConsumer(k, partitionId); - long offset = - findClosestOffsetJustBeforeTimestamp(consumer, k, partitionId, timestamp); + long offset = getOffsetForTimestampOrEnd(timestamp, k, partitionId, consumer); PartitionOffset partitionOffset = new PartitionOffset(k.name(), offset, partitionId); partitionOffsetList.add(partitionOffset); - if (!dryRun) { - subscriptionOffsetChange.setSubscriptionOffset( - topic.getName(), subscription, brokersClusterName, partitionOffset); - } } }); return partitionOffsetList; } - @Override - public boolean areOffsetsMoved(Topic topic, String subscriptionName, String brokersClusterName) { - return kafkaNamesMapper - .toKafkaTopics(topic) - .allMatch( - kafkaTopic -> { - List partitionIds = - brokerStorage.readPartitionsIds(kafkaTopic.name().asString()); - return subscriptionOffsetChange.areOffsetsMoved( - topic.getName(), subscriptionName, brokersClusterName, kafkaTopic, partitionIds); - }); - } - - private KafkaConsumer createKafkaConsumer(KafkaTopic kafkaTopic, int partition) { - return consumerPool.get(kafkaTopic, partition); + private long getOffsetForTimestampOrEnd( + Long timestamp, + KafkaTopic kafkaTopic, + Integer partitionId, + KafkaConsumer consumer) { + long endOffset = getEndingOffset(consumer, kafkaTopic, partitionId); + return Optional.ofNullable(timestamp) + .flatMap(ts -> findClosestOffsetJustBeforeTimestamp(consumer, kafkaTopic, partitionId, ts)) + .orElse(endOffset); } - private long findClosestOffsetJustBeforeTimestamp( + private Optional findClosestOffsetJustBeforeTimestamp( KafkaConsumer consumer, KafkaTopic kafkaTopic, int partition, long timestamp) { - long endOffset = getEndingOffset(consumer, kafkaTopic, partition); TopicPartition topicPartition = new TopicPartition(kafkaTopic.name().asString(), partition); return Optional.ofNullable( consumer .offsetsForTimes(Collections.singletonMap(topicPartition, timestamp)) .get(topicPartition)) - .orElse(new OffsetAndTimestamp(endOffset, timestamp)) - .offset(); + .map(OffsetAndTimestamp::offset); } private long getEndingOffset( diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java index 6220c31f3b..616dfa8b69 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java @@ -7,7 +7,7 @@ class OffsetNotFoundException extends ManagementException { - OffsetNotFoundException(String message) { + public OffsetNotFoundException(String message) { super(message); } diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java index c9a8293b0b..b4c3bb02b0 100644 --- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java +++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java @@ -119,6 +119,13 @@ public WebTestClient.ResponseSpec suspendSubscription(Topic topic, String subscr .is2xxSuccessful(); } + public WebTestClient.ResponseSpec activateSubscription(Topic topic, String subscription) { + return managementTestClient + .updateSubscriptionState(topic, subscription, Subscription.State.ACTIVE) + .expectStatus() + .is2xxSuccessful(); + } + public void waitUntilSubscriptionActivated(String topicQualifiedName, String subscriptionName) { waitAtMost(Duration.ofSeconds(10)) .untilAsserted( @@ -550,9 +557,4 @@ public WebTestClient.ResponseSpec updateGroup(String groupName, Group group) { public List getGroups() { return managementTestClient.getGroups(); } - - public WebTestClient.ResponseSpec moveOffsetsToTheEnd( - String topicQualifiedName, String subscriptionName) { - return managementTestClient.moveOffsetsToTheEnd(topicQualifiedName, subscriptionName); - } } diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/ManagementTestClient.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/ManagementTestClient.java index 2735b5f710..9a36ee9d31 100644 --- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/ManagementTestClient.java +++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/ManagementTestClient.java @@ -59,9 +59,6 @@ public class ManagementTestClient { private static final String TOPIC_PREVIEW_OFFSET = "/topics/{topicName}/preview/cluster/{brokersClusterName}/partition/{partition}/offset/{offset}"; - private static final String MOVE_SUBSCRIPTION_OFFSETS = - "/topics/{topicName}/subscriptions/{subscriptionName}/moveOffsetsToTheEnd"; - private static final String SET_READINESS = "/readiness/datacenters/{dc}"; private static final String GET_READINESS = "/readiness/datacenters"; @@ -804,15 +801,4 @@ public WebTestClient.ResponseSpec updateGroup(String groupName, Group group) { .body(Mono.just(group), Group.class) .exchange(); } - - public WebTestClient.ResponseSpec moveOffsetsToTheEnd( - String topicQualifiedName, String subscriptionName) { - return webTestClient - .post() - .uri( - UriBuilder.fromUri(managementContainerUrl) - .path(MOVE_SUBSCRIPTION_OFFSETS) - .build(topicQualifiedName, subscriptionName)) - .exchange(); - } } diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java index 94c7eb8f43..77a2b9c93f 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java @@ -14,6 +14,8 @@ import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.springframework.test.web.reactive.server.WebTestClient; import pl.allegro.tech.hermes.api.ContentType; import pl.allegro.tech.hermes.api.OffsetRetransmissionDate; @@ -53,8 +55,10 @@ public class KafkaRetransmissionServiceTest { @RegisterExtension public static final TestSubscribersExtension subscribers = new TestSubscribersExtension(); - @Test - public void shouldMoveOffsetNearGivenTimestamp() throws InterruptedException { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldMoveOffsetNearGivenTimestamp(boolean suspendedSubscription) + throws InterruptedException { // given final TestSubscriber subscriber = subscribers.createSubscriber(); final Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); @@ -71,6 +75,11 @@ public void shouldMoveOffsetNearGivenTimestamp() throws InterruptedException { publishAndConsumeMessages(messages2, topic, subscriber); hermes.api().waitUntilConsumerCommitsOffset(topic.getQualifiedName(), subscription.getName()); + if (suspendedSubscription) { + hermes.api().suspendSubscription(topic, subscription.getName()); + hermes.api().waitUntilSubscriptionSuspended(topic.getQualifiedName(), subscription.getName()); + } + // when WebTestClient.ResponseSpec response = hermes @@ -78,6 +87,11 @@ public void shouldMoveOffsetNearGivenTimestamp() throws InterruptedException { .retransmit( topic.getQualifiedName(), subscription.getName(), retransmissionDate, false); + if (suspendedSubscription) { + hermes.api().activateSubscription(topic, subscription.getName()); + hermes.api().waitUntilSubscriptionActivated(topic.getQualifiedName(), subscription.getName()); + } + // then response.expectStatus().isOk(); messages2.forEach(subscriber::waitUntilReceived); diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java index 3cf0ff68b5..c09b4cf7cd 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java @@ -9,7 +9,6 @@ import static pl.allegro.tech.hermes.integrationtests.prometheus.SubscriptionMetrics.subscriptionMetrics; import static pl.allegro.tech.hermes.integrationtests.prometheus.TopicMetrics.topicMetrics; import static pl.allegro.tech.hermes.integrationtests.setup.HermesExtension.auditEvents; -import static pl.allegro.tech.hermes.integrationtests.setup.HermesExtension.brokerOperations; import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription; import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscriptionWithRandomName; import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topicWithRandomName; @@ -18,7 +17,6 @@ import java.time.Duration; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; @@ -39,7 +37,6 @@ import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.api.TopicPartition; import pl.allegro.tech.hermes.api.TrackingMode; -import pl.allegro.tech.hermes.env.BrokerOperations; import pl.allegro.tech.hermes.integrationtests.prometheus.PrometheusExtension; import pl.allegro.tech.hermes.integrationtests.setup.HermesExtension; import pl.allegro.tech.hermes.integrationtests.subscriber.TestSubscriber; @@ -760,52 +757,4 @@ public void shouldReturnInflightSizeWhenSetToNonNullValue() { // then assertThat(response.getSerialSubscriptionPolicy().getInflightSize()).isEqualTo(42); } - - @Test - public void shouldMoveOffsetsToTheEnd() { - // given - TestSubscriber subscriber = subscribers.createSubscriber(503); - Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); - Subscription subscription = - hermes - .initHelper() - .createSubscription( - subscriptionWithRandomName(topic.getName(), subscriber.getEndpoint()) - .withSubscriptionPolicy(SubscriptionPolicy.create(Map.of("messageTtl", 3600))) - .build()); - List messages = List.of(MESSAGE.body(), MESSAGE.body(), MESSAGE.body(), MESSAGE.body()); - - // prevents from moving offsets during messages sending - messages.forEach( - message -> { - hermes.api().publishUntilSuccess(topic.getQualifiedName(), message); - subscriber.waitUntilReceived(message); - }); - - assertThat(allConsumerGroupOffsetsMovedToTheEnd(subscription)).isFalse(); - - hermes.api().deleteSubscription(topic.getQualifiedName(), subscription.getName()); - - // when - waitAtMost(Duration.ofSeconds(10)) - .untilAsserted( - () -> - hermes - .api() - .moveOffsetsToTheEnd(topic.getQualifiedName(), subscription.getName()) - .expectStatus() - .isOk()); - - // then - waitAtMost(Duration.ofSeconds(10)) - .untilAsserted( - () -> assertThat(allConsumerGroupOffsetsMovedToTheEnd(subscription)).isTrue()); - } - - private boolean allConsumerGroupOffsetsMovedToTheEnd(Subscription subscription) { - List partitionsOffsets = - brokerOperations.getTopicPartitionsOffsets(subscription.getQualifiedName()); - return !partitionsOffsets.isEmpty() - && partitionsOffsets.stream().allMatch(BrokerOperations.ConsumerGroupOffset::movedToEnd); - } }