From 8baa676abd04657b9931cbd08470d0576cb087ca Mon Sep 17 00:00:00 2001 From: lokeshAlamuri Date: Sat, 5 Oct 2024 19:45:02 +0530 Subject: [PATCH] Forbid fenced Container to stop ConcurContainer [DRAFT] Fixes #GH-3448 https://github.com/spring-projects/spring-kafka/issues/3448 Issue: Fenced Child Container could stop the running ConcurrentContainer Fix: Configure KafkaMessageListenerContainer (KMLC) to use ConcurrentMessagleListenerContainerRef instead ofConcurrentContainer. Internally, ConcurrentContainerRef checks if KMLC is fenced when stop operations are called on Concurrent Container. If KMLC is fenced, suppress the `stop` related operations. If KMLC is not fenced, delegate the stop call to ConcurrentContainer. --- .../AbstractMessageListenerContainer.java | 28 +- .../ConcurrentMessageListenerContainer.java | 10 +- ...ConcurrentMessageListenerContainerRef.java | 285 +++++++++++++++++ .../KafkaMessageListenerContainer.java | 33 +- ...rentMessageListenerContainerMockTests.java | 1 - ...ncurrentMessageListenerContainerTests.java | 296 +++++++++++++++++- 6 files changed, 629 insertions(+), 24 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerRef.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 80c732f46e..32f46ba3fc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -193,6 +193,14 @@ protected AbstractMessageListenerContainer(ConsumerFactory } } + /** + * To be used only with {@link ConcurrentMessageListenerContainerRef}. + */ + AbstractMessageListenerContainer() { + this.containerProperties = null; + this.consumerFactory = null; + } + @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; @@ -282,6 +290,10 @@ protected void setFenced(boolean fenced) { this.fenced = fenced; } + boolean isFenced() { + return this.fenced; + } + @Deprecated(since = "3.2", forRemoval = true) protected boolean isPaused() { return this.paused; @@ -722,7 +734,7 @@ public void onPartitionsLost(Collection partitions) { protected void publishContainerStoppedEvent() { ApplicationEventPublisher eventPublisher = getApplicationEventPublisher(); if (eventPublisher != null) { - eventPublisher.publishEvent(new ContainerStoppedEvent(this, parentOrThis())); + eventPublisher.publishEvent(new ContainerStoppedEvent(this, parentContainerOrThis())); } } @@ -735,6 +747,20 @@ protected void publishContainerStoppedEvent() { return this; } + /** + * Return the actual {@link ConcurrentMessageListenerContainer} if the parent is instance of + * {@link ConcurrentMessageListenerContainerRef}. + * + * @return the parent or this + * @since 3.3 + */ + AbstractMessageListenerContainer parentContainerOrThis() { + if (parentOrThis() instanceof ConcurrentMessageListenerContainerRef) { + return ((ConcurrentMessageListenerContainerRef) parentOrThis()).getConcurrentContainer(); + } + return parentOrThis(); + } + /** * Make any default consumer override properties explicit properties. * @return the properties. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java index 06b136f434..beb81380a4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java @@ -305,13 +305,17 @@ private KafkaMessageListenerContainer constructContainer(ContainerProperti @Nullable TopicPartitionOffset[] topicPartitions, int i) { KafkaMessageListenerContainer container; + ConcurrentMessageListenerContainerRef concurrentMessageListenerContainerRef = + new ConcurrentMessageListenerContainerRef<>(this, this.lifecycleLock); if (topicPartitions == null) { - container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties); // NOSONAR + container = new KafkaMessageListenerContainer<>(concurrentMessageListenerContainerRef, this.consumerFactory, + containerProperties); // NOSONAR } else { - container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, // NOSONAR - containerProperties, partitionSubset(containerProperties, i)); + container = new KafkaMessageListenerContainer<>(concurrentMessageListenerContainerRef, this.consumerFactory, + containerProperties, partitionSubset(containerProperties, i)); // NOSONAR } + concurrentMessageListenerContainerRef.setKafkaMessageListenerContainer(container); return container; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerRef.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerRef.java new file mode 100644 index 0000000000..dae076f7b6 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerRef.java @@ -0,0 +1,285 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; + +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.event.ConsumerStoppedEvent; + +/** + * Reference of {@link ConcurrentMessageListenerContainer} to be passed to the {@link KafkaMessageListenerContainer}. + * This container is used for internal purpose. Detects if the {@link KafkaMessageListenerContainer} is fenced and + * forbids `stop` calls on {@link ConcurrentMessageListenerContainer} + * + * @param the key type. + * @param the value type. + * @author Lokesh Alamuri + */ +class ConcurrentMessageListenerContainerRef extends AbstractMessageListenerContainer { + + protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); // NOSONAR + + private final ConcurrentMessageListenerContainer concurrentMessageListenerContainer; + + private final ReentrantLock lifecycleLock; + + private KafkaMessageListenerContainer kafkaMessageListenerContainer; + + ConcurrentMessageListenerContainerRef(ConcurrentMessageListenerContainer concurrentMessageListenerContainer, + ReentrantLock lifecycleLock) { + super(); + this.concurrentMessageListenerContainer = concurrentMessageListenerContainer; + this.lifecycleLock = lifecycleLock; + } + + void setKafkaMessageListenerContainer(KafkaMessageListenerContainer kafkaMessageListenerContainer) { + this.kafkaMessageListenerContainer = kafkaMessageListenerContainer; + } + + @Override + public void setupMessageListener(Object messageListener) { + throw new UnsupportedOperationException("This container doesn't support setting up MessageListener"); + } + + @Override + public Map> metrics() { + return this.concurrentMessageListenerContainer.metrics(); + } + + @Override + public ContainerProperties getContainerProperties() { + return this.concurrentMessageListenerContainer.getContainerProperties(); + } + + @Override + public Collection getAssignedPartitions() { + return this.concurrentMessageListenerContainer.getAssignedPartitions(); + } + + @Override + public Map> getAssignmentsByClientId() { + return this.concurrentMessageListenerContainer.getAssignmentsByClientId(); + } + + @Override + public void enforceRebalance() { + this.concurrentMessageListenerContainer.enforceRebalance(); + } + + @Override + public void pause() { + this.concurrentMessageListenerContainer.pause(); + } + + @Override + public void resume() { + this.concurrentMessageListenerContainer.resume(); + } + + @Override + public void pausePartition(TopicPartition topicPartition) { + this.concurrentMessageListenerContainer.pausePartition(topicPartition); + } + + @Override + public void resumePartition(TopicPartition topicPartition) { + this.concurrentMessageListenerContainer.resumePartition(topicPartition); + } + + @Override + public boolean isPartitionPauseRequested(TopicPartition topicPartition) { + return this.concurrentMessageListenerContainer.isPartitionPauseRequested(topicPartition); + } + + @Override + public boolean isPartitionPaused(TopicPartition topicPartition) { + return this.concurrentMessageListenerContainer.isPartitionPaused(topicPartition); + } + + @Override + public boolean isPauseRequested() { + return this.concurrentMessageListenerContainer.isPauseRequested(); + } + + @Override + public boolean isContainerPaused() { + return this.concurrentMessageListenerContainer.isContainerPaused(); + } + + @Override + public String getGroupId() { + return this.concurrentMessageListenerContainer.getGroupId(); + } + + @Override + public String getListenerId() { + return this.concurrentMessageListenerContainer.getListenerId(); + } + + @Override + public String getMainListenerId() { + return this.concurrentMessageListenerContainer.getMainListenerId(); + } + + @Override + public byte[] getListenerInfo() { + return this.concurrentMessageListenerContainer.getListenerInfo(); + } + + @Override + public boolean isChildRunning() { + return this.concurrentMessageListenerContainer.isChildRunning(); + } + + @Override + public boolean isInExpectedState() { + return this.concurrentMessageListenerContainer.isInExpectedState(); + } + + @Override + public void stopAbnormally(Runnable callback) { + this.lifecycleLock.lock(); + try { + if (!this.kafkaMessageListenerContainer.isFenced()) { + // kafkaMessageListenerContainer is not fenced. Allow stopAbnormally call on + // concurrentMessageListenerContainer + this.concurrentMessageListenerContainer.stopAbnormally(callback); + } + else if (this.concurrentMessageListenerContainer.isFenced() && + !this.concurrentMessageListenerContainer.isRunning()) { + // kafkaMessageListenerContainer is fenced and concurrentMessageListenerContainer is not running. Allow + // callback to run + callback.run(); + } + else { + this.logger.error(() -> String.format("Suppressed `stopAbnormal` operation called by " + + "MessageListenerContainer [" + this.kafkaMessageListenerContainer.getBeanName() + "]")); + } + } + finally { + this.lifecycleLock.unlock(); + } + } + + @Override + protected void doStop(Runnable callback, boolean normal) { + this.lifecycleLock.lock(); + try { + if (!this.kafkaMessageListenerContainer.isFenced()) { + // kafkaMessageListenerContainer is not fenced. Allow doStop call on + // concurrentMessageListenerContainer + this.concurrentMessageListenerContainer.doStop(callback, normal); + } + else if (this.concurrentMessageListenerContainer.isFenced() && + !this.concurrentMessageListenerContainer.isRunning()) { + // kafkaMessageListenerContainer is fenced and concurrentMessageListenerContainer is not running. Allow + // callback to run + callback.run(); + } + else { + this.logger.error(() -> String.format("Suppressed `doStop` operation called by " + + "MessageListenerContainer [" + this.kafkaMessageListenerContainer.getBeanName() + "]")); + } + } + finally { + this.lifecycleLock.unlock(); + } + } + + @Override + public MessageListenerContainer getContainerFor(String topic, int partition) { + return this.concurrentMessageListenerContainer.getContainerFor(topic, partition); + } + + @Override + public void childStopped(MessageListenerContainer child, ConsumerStoppedEvent.Reason reason) { + this.concurrentMessageListenerContainer.childStopped(child, reason); + } + + @Override + public void childStarted(MessageListenerContainer child) { + this.concurrentMessageListenerContainer.childStarted(child); + } + + @Override + protected void doStart() { + this.concurrentMessageListenerContainer.doStart(); + } + + @Override + public boolean isRunning() { + return this.concurrentMessageListenerContainer.isRunning(); + } + + @Override + public boolean isAutoStartup() { + return this.concurrentMessageListenerContainer.isAutoStartup(); + } + + @Override + public void setAutoStartup(boolean autoStartup) { + throw new UnsupportedOperationException("This container doesn't support `setAutoStartup`"); + } + + @Override + public void stop(Runnable callback) { + this.lifecycleLock.lock(); + try { + if (!this.kafkaMessageListenerContainer.isFenced()) { + // kafkaMessageListenerContainer is not fenced. Allow stop call on + // concurrentMessageListenerContainer + this.concurrentMessageListenerContainer.stop(callback); + } + else if (this.concurrentMessageListenerContainer.isFenced() && + !this.concurrentMessageListenerContainer.isRunning()) { + // kafkaMessageListenerContainer is fenced and concurrentMessageListenerContainer is not running. Allow + // callback to run + callback.run(); + } + else { + this.logger.error(() -> String.format("Suppressed `stop` operation called by " + + "MessageListenerContainer [" + this.kafkaMessageListenerContainer.getBeanName() + "]")); + } + } + finally { + this.lifecycleLock.unlock(); + } + } + + AbstractMessageListenerContainer getConcurrentContainer() { + return this.concurrentMessageListenerContainer; + } + + @Override + public int hashCode() { + return this.concurrentMessageListenerContainer.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return this.concurrentMessageListenerContainer.equals(obj); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index bbc8ae8ccb..c75db94194 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -427,7 +427,7 @@ private void publishIdlePartitionEvent(long idleTime, TopicPartition topicPartit ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { publisher.publishEvent(new ListenerContainerPartitionIdleEvent(this, - this.thisOrParentContainer, idleTime, getBeanName(), topicPartition, consumer, paused)); + parentContainerOrThis(), idleTime, getBeanName(), topicPartition, consumer, paused)); } } @@ -435,7 +435,7 @@ private void publishNoLongerIdlePartitionEvent(long idleTime, Consumer con ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { publisher.publishEvent(new ListenerContainerPartitionNoLongerIdleEvent(this, - this.thisOrParentContainer, idleTime, getBeanName(), topicPartition, consumer)); + parentContainerOrThis(), idleTime, getBeanName(), topicPartition, consumer)); } } @@ -443,7 +443,7 @@ private void publishIdleContainerEvent(long idleTime, Consumer consumer, b ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { publisher.publishEvent(new ListenerContainerIdleEvent(this, - this.thisOrParentContainer, idleTime, getBeanName(), getAssignedPartitions(), consumer, paused)); + parentContainerOrThis(), idleTime, getBeanName(), getAssignedPartitions(), consumer, paused)); } } @@ -451,7 +451,7 @@ private void publishNoLongerIdleContainerEvent(long idleTime, Consumer con ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { publisher.publishEvent(new ListenerContainerNoLongerIdleEvent(this, - this.thisOrParentContainer, idleTime, getBeanName(), getAssignedPartitions(), consumer)); + parentContainerOrThis(), idleTime, getBeanName(), getAssignedPartitions(), consumer)); } } @@ -459,7 +459,7 @@ private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer< ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { publisher.publishEvent( - new NonResponsiveConsumerEvent(this, this.thisOrParentContainer, timeSinceLastPoll, + new NonResponsiveConsumerEvent(this, parentContainerOrThis(), timeSinceLastPoll, getBeanName(), getAssignedPartitions(), consumer)); } } @@ -468,7 +468,7 @@ private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer< public void publishConsumerPausedEvent(Collection partitions, String reason) { ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { - publisher.publishEvent(new ConsumerPausedEvent(this, this.thisOrParentContainer, + publisher.publishEvent(new ConsumerPausedEvent(this, parentContainerOrThis(), Collections.unmodifiableCollection(partitions), reason)); } } @@ -477,7 +477,7 @@ public void publishConsumerPausedEvent(Collection partitions, St public void publishConsumerResumedEvent(Collection partitions) { ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { - publisher.publishEvent(new ConsumerResumedEvent(this, this.thisOrParentContainer, + publisher.publishEvent(new ConsumerResumedEvent(this, parentContainerOrThis(), Collections.unmodifiableCollection(partitions))); } } @@ -485,7 +485,7 @@ public void publishConsumerResumedEvent(Collection partitions) { private void publishConsumerPartitionPausedEvent(TopicPartition partition) { ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { - publisher.publishEvent(new ConsumerPartitionPausedEvent(this, this.thisOrParentContainer, + publisher.publishEvent(new ConsumerPartitionPausedEvent(this, parentContainerOrThis(), partition)); } } @@ -493,7 +493,7 @@ private void publishConsumerPartitionPausedEvent(TopicPartition partition) { private void publishConsumerPartitionResumedEvent(TopicPartition partition) { ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { - publisher.publishEvent(new ConsumerPartitionResumedEvent(this, this.thisOrParentContainer, + publisher.publishEvent(new ConsumerPartitionResumedEvent(this, parentContainerOrThis(), partition)); } } @@ -503,7 +503,7 @@ private void publishConsumerStoppingEvent(Consumer consumer) { ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { publisher.publishEvent( - new ConsumerStoppingEvent(this, this.thisOrParentContainer, consumer, getAssignedPartitions())); + new ConsumerStoppingEvent(this, parentContainerOrThis(), consumer, getAssignedPartitions())); } } catch (Exception e) { @@ -530,8 +530,7 @@ else if (throwable instanceof NoOffsetForPartitionException) { else { reason = Reason.NORMAL; } - publisher.publishEvent(new ConsumerStoppedEvent(this, this.thisOrParentContainer, - reason)); + publisher.publishEvent(new ConsumerStoppedEvent(this, parentContainerOrThis(), reason)); this.thisOrParentContainer.childStopped(this, reason); } } @@ -540,21 +539,21 @@ private void publishConsumerStartingEvent() { this.startLatch.countDown(); ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { - publisher.publishEvent(new ConsumerStartingEvent(this, this.thisOrParentContainer)); + publisher.publishEvent(new ConsumerStartingEvent(this, parentContainerOrThis())); } } private void publishConsumerStartedEvent() { ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { - publisher.publishEvent(new ConsumerStartedEvent(this, this.thisOrParentContainer)); + publisher.publishEvent(new ConsumerStartedEvent(this, parentContainerOrThis())); } } private void publishConsumerFailedToStart() { ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { - publisher.publishEvent(new ConsumerFailedToStartEvent(this, this.thisOrParentContainer)); + publisher.publishEvent(new ConsumerFailedToStartEvent(this, parentContainerOrThis())); } } @@ -571,14 +570,14 @@ else if (throwable instanceof AuthorizationException) { else { throw new IllegalArgumentException("Only Authentication or Authorization Exceptions are allowed", throwable); } - publisher.publishEvent(new ConsumerRetryAuthEvent(this, this.thisOrParentContainer, reason)); + publisher.publishEvent(new ConsumerRetryAuthEvent(this, parentContainerOrThis(), reason)); } } private void publishRetryAuthSuccessfulEvent() { ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { - publisher.publishEvent(new ConsumerRetryAuthSuccessfulEvent(this, this.thisOrParentContainer)); + publisher.publishEvent(new ConsumerRetryAuthSuccessfulEvent(this, parentContainerOrThis())); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java index 0a59a00b8b..1fcf136372 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java @@ -172,7 +172,6 @@ public void handleOtherException(Exception thrownException, Consumer consu }); container.start(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(errorContainer.get()).isSameAs(container); container.stop(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index b699f68098..ceff1ffcc7 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -199,7 +199,8 @@ protected Consumer createKafkaConsumer(String groupId, String c assertThat(intercepted.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(payloads).containsExactlyInAnyOrder("foo", "bar", "qux"); - assertThat(listenerThreadNames).contains("testAuto-0", "testAuto-1"); + assertThat(listenerThreadNames).containsAnyOf("testAuto-0-C-0", "testAuto-0-C-1", + "testAuto-1-C-0", "testAuto-1-C-1"); List> containers = KafkaTestUtils.getPropertyValue(container, "containers", List.class); assertThat(containers).hasSize(2); @@ -933,7 +934,8 @@ protected Consumer createKafkaConsumer(String groupId, String c firstLatch.countDown(); - assertThat(listenerThreadNames).containsAnyOf("testAuto-0", "testAuto-1"); + assertThat(listenerThreadNames).containsAnyOf("testAuto-0-C-0", "testAuto-0-C-1", + "testAuto-1-C-0", "testAuto-1-C-1"); assertThat(concurrentContainerStopLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(container.isInExpectedState()).isTrue(); @@ -1095,4 +1097,294 @@ protected Consumer createKafkaConsumer(String groupId, String c this.logger.info("Stop containerStartStop"); } + @Test + public void testFencedContainerFailed() throws Exception { + this.logger.info("Start testFencedContainerFailed"); + Map props = KafkaTestUtils.consumerProps("test1", "true", + embeddedKafka); + AtomicReference overrides = new AtomicReference<>(); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props) { + @Override + protected Consumer createKafkaConsumer(String groupId, String clientIdPrefix, + String clientIdSuffixArg, Properties properties) { + overrides.set(properties); + return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties); + } + }; + ContainerProperties containerProps = new ContainerProperties(topic1); + containerProps.setLogContainerConfig(true); + containerProps.setClientId("client"); + containerProps.setAckMode(ContainerProperties.AckMode.RECORD); + + final CountDownLatch secondRunLatch = new CountDownLatch(5); + final Set listenerThreadNames = new ConcurrentSkipListSet<>(); + final List payloads = new ArrayList<>(); + final CountDownLatch processingLatch = new CountDownLatch(1); + final CountDownLatch firstLatch = new CountDownLatch(1); + + AtomicBoolean first = new AtomicBoolean(true); + + containerProps.setMessageListener((MessageListener) message -> { + if (first.getAndSet(false)) { + try { + firstLatch.await(100, TimeUnit.SECONDS); + throw new NullPointerException(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message); + listenerThreadNames.add(Thread.currentThread().getName()); + payloads.add(message.value()); + secondRunLatch.countDown(); + processingLatch.countDown(); + }); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); + container.setConcurrency(2); + container.setBeanName("testAuto"); + container.setChangeConsumerThreadName(true); + container.setCommonErrorHandler(new CommonContainerStoppingErrorHandler()); + + BlockingQueue events = new LinkedBlockingQueue<>(); + CountDownLatch concurrentContainerStopLatch = new CountDownLatch(1); + CountDownLatch consumerStoppedEventLatch = new CountDownLatch(1); + + container.setApplicationEventPublisher(e -> { + events.add((KafkaEvent) e); + if (e instanceof ConcurrentContainerStoppedEvent) { + concurrentContainerStopLatch.countDown(); + } + if (e instanceof ConsumerStoppedEvent) { + consumerStoppedEventLatch.countDown(); + } + }); + + container.start(); + + MessageListenerContainer childContainer0 = container.getContainers().get(0); + MessageListenerContainer childContainer1 = container.getContainers().get(1); + + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + assertThat(container.getAssignedPartitions()).hasSize(2); + Map> assignments = container.getAssignmentsByClientId(); + assertThat(assignments).hasSize(2); + assertThat(assignments.get("client-0")).isNotNull(); + assertThat(assignments.get("client-1")).isNotNull(); + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(topic1); + template.sendDefault(0, 0, "foo"); + template.sendDefault(1, 2, "bar"); + template.sendDefault(0, 0, "baz"); + template.sendDefault(1, 2, "qux"); + template.flush(); + + assertThat(container.metrics()).isNotNull(); + assertThat(container.isInExpectedState()).isTrue(); + assertThat(childContainer0.isRunning()).isTrue(); + assertThat(childContainer1.isRunning()).isTrue(); + assertThat(container.isChildRunning()).isTrue(); + + assertThat(processingLatch.await(60, TimeUnit.SECONDS)).isTrue(); + + container.stop(); + + assertThat(container.isChildRunning()).isTrue(); + assertThat(container.isRunning()).isFalse(); + assertThat(childContainer0.isRunning()).isFalse(); + assertThat(childContainer1.isRunning()).isFalse(); + + assertThat(consumerStoppedEventLatch.await(30, TimeUnit.SECONDS)).isTrue(); + + assertThat(container.isChildRunning()).isTrue(); + + assertThat(listenerThreadNames).containsAnyOf("testAuto-0-C-0", "testAuto-0-C-1", + "testAuto-1-C-0", "testAuto-1-C-1"); + + assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isFalse(); + + + template.sendDefault(0, 0, "FOO"); + template.sendDefault(1, 2, "BAR"); + template.sendDefault(0, 0, "BAZ"); + template.sendDefault(1, 2, "QUX"); + template.flush(); + + // permitted since stop is called prior. + container.start(); + + assertThat(container.isRunning()).isTrue(); + assertThat(container.getContainers().stream().allMatch(containerL -> containerL.isRunning())) + .isTrue(); + + firstLatch.countDown(); + + assertThat(container.isRunning()).isTrue(); + assertThat(container.getContainers().stream().allMatch(containerL -> containerL.isRunning())) + .isTrue(); + assertThat(secondRunLatch.await(30, TimeUnit.SECONDS)).isTrue(); + container.stop(); + assertThat(container.isRunning()).isFalse(); + this.logger.info("Stop testFencedContainerFailed"); + } + + @Test + public void testFencedContainerFailedWithCustomErrorHandler() throws Exception { + this.logger.info("Start testFencedContainerFailed"); + Map props = KafkaTestUtils.consumerProps("test1", "true", + embeddedKafka); + AtomicReference overrides = new AtomicReference<>(); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props) { + @Override + protected Consumer createKafkaConsumer(String groupId, String clientIdPrefix, + String clientIdSuffixArg, Properties properties) { + overrides.set(properties); + return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties); + } + }; + ContainerProperties containerProps = new ContainerProperties(topic1); + containerProps.setLogContainerConfig(true); + containerProps.setClientId("client"); + containerProps.setAckMode(ContainerProperties.AckMode.RECORD); + + final CountDownLatch secondRunLatch = new CountDownLatch(5); + final Set listenerThreadNames = new ConcurrentSkipListSet<>(); + final List payloads = new ArrayList<>(); + final CountDownLatch processingLatch = new CountDownLatch(1); + final CountDownLatch firstLatch = new CountDownLatch(1); + + AtomicBoolean first = new AtomicBoolean(true); + + containerProps.setMessageListener((MessageListener) message -> { + if (first.getAndSet(false)) { + try { + firstLatch.await(100, TimeUnit.SECONDS); + throw new NullPointerException(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message); + listenerThreadNames.add(Thread.currentThread().getName()); + payloads.add(message.value()); + secondRunLatch.countDown(); + processingLatch.countDown(); + }); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); + container.setConcurrency(2); + container.setBeanName("testAuto"); + container.setChangeConsumerThreadName(true); + + final CountDownLatch erroHandlerLatch = new CountDownLatch(1); + final CountDownLatch erroHandlerLatchInCallBack = new CountDownLatch(1); + + container.setCommonErrorHandler(new CommonErrorHandler() { + @Override + public boolean seeksAfterHandling() { + return true; + } + + @Override + public void handleRemaining(Exception thrownException, List> records, + Consumer consumer, MessageListenerContainer container) { + container.stop(() -> erroHandlerLatchInCallBack.countDown()); + erroHandlerLatch.countDown(); + } + }); + + BlockingQueue events = new LinkedBlockingQueue<>(); + CountDownLatch concurrentContainerStopLatch = new CountDownLatch(1); + CountDownLatch consumerStoppedEventLatch = new CountDownLatch(1); + + container.setApplicationEventPublisher(e -> { + events.add((KafkaEvent) e); + if (e instanceof ConcurrentContainerStoppedEvent) { + concurrentContainerStopLatch.countDown(); + } + if (e instanceof ConsumerStoppedEvent) { + consumerStoppedEventLatch.countDown(); + } + }); + + container.start(); + + MessageListenerContainer childContainer0 = container.getContainers().get(0); + MessageListenerContainer childContainer1 = container.getContainers().get(1); + + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + assertThat(container.getAssignedPartitions()).hasSize(2); + Map> assignments = container.getAssignmentsByClientId(); + assertThat(assignments).hasSize(2); + assertThat(assignments.get("client-0")).isNotNull(); + assertThat(assignments.get("client-1")).isNotNull(); + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(topic1); + template.sendDefault(0, 0, "foo"); + template.sendDefault(1, 2, "bar"); + template.sendDefault(0, 0, "baz"); + template.sendDefault(1, 2, "qux"); + template.flush(); + + assertThat(container.metrics()).isNotNull(); + assertThat(container.isInExpectedState()).isTrue(); + assertThat(childContainer0.isRunning()).isTrue(); + assertThat(childContainer1.isRunning()).isTrue(); + assertThat(container.isChildRunning()).isTrue(); + + assertThat(processingLatch.await(60, TimeUnit.SECONDS)).isTrue(); + + container.stop(); + + assertThat(container.isChildRunning()).isTrue(); + assertThat(container.isRunning()).isFalse(); + assertThat(childContainer0.isRunning()).isFalse(); + assertThat(childContainer1.isRunning()).isFalse(); + + assertThat(consumerStoppedEventLatch.await(30, TimeUnit.SECONDS)).isTrue(); + + assertThat(container.isChildRunning()).isTrue(); + + assertThat(listenerThreadNames).containsAnyOf("testAuto-0-C-0", "testAuto-0-C-1", + "testAuto-1-C-0", "testAuto-1-C-1"); + + assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isFalse(); + + template.sendDefault(0, 0, "FOO"); + template.sendDefault(1, 2, "BAR"); + template.sendDefault(0, 0, "BAZ"); + template.sendDefault(1, 2, "QUX"); + template.flush(); + + // permitted since stop is called prior. + container.start(); + + assertThat(container.isRunning()).isTrue(); + assertThat(container.getContainers().stream().allMatch(containerL -> containerL.isRunning())) + .isTrue(); + + firstLatch.countDown(); + + assertThat(erroHandlerLatch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(erroHandlerLatchInCallBack.await(30, TimeUnit.SECONDS)).isFalse(); + + assertThat(container.isRunning()).isTrue(); + assertThat(container.getContainers().stream().allMatch(containerL -> containerL.isRunning())) + .isTrue(); + assertThat(secondRunLatch.await(30, TimeUnit.SECONDS)).isTrue(); + container.stop(); + assertThat(container.isRunning()).isFalse(); + this.logger.info("Stop testFencedContainerFailed"); + } + }