diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 3140e6eb1f47b..4ea0bd82af687 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -217,6 +217,10 @@ public enum EFORegistrationType { public static final String SUBSCRIBE_TO_SHARD_RETRIES = "flink.shard.subscribetoshard.maxretries"; + /** A timeout when waiting for a shard subscription to be established. */ + public static final String SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS = + "flink.shard.subscribetoshard.timeout"; + /** The base backoff time between each subscribeToShard attempt. */ public static final String SUBSCRIBE_TO_SHARD_BACKOFF_BASE = "flink.shard.subscribetoshard.backoff.base"; @@ -363,6 +367,8 @@ public enum EFORegistrationType { public static final int DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES = 10; + public static final Duration DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT = Duration.ofSeconds(60); + public static final long DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE = 1000L; public static final long DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX = 2000L; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index 163cd046bfdab..9aee3e47a5a4e 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -800,34 +800,59 @@ public HashMap snapshotState() { * executed and all shard consuming threads will be interrupted. */ public void shutdownFetcher() { - if (LOG.isInfoEnabled()) { - LOG.info( - "Starting shutdown of shard consumer threads and AWS SDK resources of subtask {} ...", - indexOfThisConsumerSubtask); - } + LOG.info( + "Starting shutdown of shard consumer threads and AWS SDK resources of subtask {} ...", + indexOfThisConsumerSubtask, + error.get()); running = false; + try { + try { + deregisterStreamConsumer(); + } catch (Exception e) { + LOG.warn("Encountered exception deregistering stream consumers", e); + } - StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps, streams); - - recordPublisherFactory.close(); + try { + closeRecordPublisherFactory(); + } catch (Exception e) { + LOG.warn("Encountered exception closing record publisher factory", e); + } + } finally { + shardConsumersExecutor.shutdownNow(); - shardConsumersExecutor.shutdownNow(); + if (mainThread != null) { + mainThread + .interrupt(); // the main thread may be sleeping for the discovery interval + } - if (mainThread != null) { - mainThread.interrupt(); // the main thread may be sleeping for the discovery interval + if (watermarkTracker != null) { + watermarkTracker.close(); + } + this.recordEmitter.stop(); } - if (watermarkTracker != null) { - watermarkTracker.close(); - } - this.recordEmitter.stop(); + LOG.info( + "Shutting down the shard consumer threads of subtask {} ...", + indexOfThisConsumerSubtask); + } - if (LOG.isInfoEnabled()) { - LOG.info( - "Shutting down the shard consumer threads of subtask {} ...", - indexOfThisConsumerSubtask); - } + /** + * Closes recordRecordPublisherFactory. Allows test to override this to simulate exception for + * shutdown logic. + */ + @VisibleForTesting + protected void closeRecordPublisherFactory() { + recordPublisherFactory.close(); + } + + /** + * Deregisters stream consumers. Allows test to override this to simulate exception for shutdown + * logic. + */ + @VisibleForTesting + protected void deregisterStreamConsumer() { + StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps, streams); } /** diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java index d3a177aa95575..c322362c8e262 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java @@ -143,7 +143,10 @@ private RecordPublisherRunResult runWithBackoff( final Consumer eventConsumer) throws InterruptedException { FanOutShardSubscriber fanOutShardSubscriber = new FanOutShardSubscriber( - consumerArn, subscribedShard.getShard().getShardId(), kinesisProxy); + consumerArn, + subscribedShard.getShard().getShardId(), + kinesisProxy, + configuration.getSubscribeToShardTimeout()); boolean complete; try { diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java index 36679c371379d..cd46876da2ba1 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java @@ -52,6 +52,9 @@ public class FanOutRecordPublisherConfiguration { /** Base backoff millis for the deregister stream operation. */ private final int subscribeToShardMaxRetries; + /** A timeout when waiting for a shard subscription to be established. */ + private final Duration subscribeToShardTimeout; + /** Maximum backoff millis for the subscribe to shard operation. */ private final long subscribeToShardMaxBackoffMillis; @@ -156,6 +159,13 @@ public FanOutRecordPublisherConfiguration( ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES)) .map(Integer::parseInt) .orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES); + this.subscribeToShardTimeout = + Optional.ofNullable( + configProps.getProperty( + ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS)) + .map(Integer::parseInt) + .map(Duration::ofSeconds) + .orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT); this.subscribeToShardBaseBackoffMillis = Optional.ofNullable( configProps.getProperty( @@ -319,6 +329,11 @@ public int getSubscribeToShardMaxRetries() { return subscribeToShardMaxRetries; } + /** Get timeout when waiting for a shard subscription to be established. */ + public Duration getSubscribeToShardTimeout() { + return subscribeToShardTimeout; + } + /** Get maximum backoff millis for the subscribe to shard operation. */ public long getSubscribeToShardMaxBackoffMillis() { return subscribeToShardMaxBackoffMillis; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java index e06923fd1e6d1..be4df59c6b4ec 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface; import org.apache.flink.util.Preconditions; @@ -35,15 +36,18 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; +import java.time.Duration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; /** * This class is responsible for acquiring an Enhanced Fan Out subscription and consuming records @@ -88,12 +92,10 @@ public class FanOutShardSubscriber { /** * Read timeout will occur after 30 seconds, a sanity timeout to prevent lockup in unexpected - * error states. If the consumer does not receive a new event within the DEQUEUE_WAIT_SECONDS it - * will backoff and resubscribe. Under normal conditions heartbeat events are received even when - * there are no records to consume, so it is not expected for this timeout to occur under normal - * conditions. + * error states. If the consumer does not receive a new event within the QUEUE_TIMEOUT_SECONDS + * it will backoff and resubscribe. */ - private static final int DEQUEUE_WAIT_SECONDS = 35; + private static final Duration DEFAULT_QUEUE_TIMEOUT = Duration.ofSeconds(35); private final BlockingQueue queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); @@ -107,18 +109,49 @@ public class FanOutShardSubscriber { private final String shardId; + private final Duration subscribeToShardTimeout; + + private final Duration queueWaitTimeout; + /** - * Create a new Fan Out subscriber. + * Create a new Fan Out Shard subscriber. * * @param consumerArn the stream consumer ARN * @param shardId the shard ID to subscribe to * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2 + * @param subscribeToShardTimeout A timeout when waiting for a shard subscription to be + * established */ FanOutShardSubscriber( - final String consumerArn, final String shardId, final KinesisProxyV2Interface kinesis) { + final String consumerArn, + final String shardId, + final KinesisProxyV2Interface kinesis, + final Duration subscribeToShardTimeout) { + this(consumerArn, shardId, kinesis, subscribeToShardTimeout, DEFAULT_QUEUE_TIMEOUT); + } + + /** + * Create a new Fan Out Shard Subscriber. + * + * @param consumerArn the stream consumer ARN + * @param shardId the shard ID to subscribe to + * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2 + * @param subscribeToShardTimeout A timeout when waiting for a shard subscription to be + * established + * @param queueWaitTimeout A timeout when enqueuing/de-queueing + */ + @VisibleForTesting + FanOutShardSubscriber( + final String consumerArn, + final String shardId, + final KinesisProxyV2Interface kinesis, + final Duration subscribeToShardTimeout, + final Duration queueWaitTimeout) { this.kinesis = Preconditions.checkNotNull(kinesis); this.consumerArn = Preconditions.checkNotNull(consumerArn); this.shardId = Preconditions.checkNotNull(shardId); + this.subscribeToShardTimeout = subscribeToShardTimeout; + this.queueWaitTimeout = queueWaitTimeout; } /** @@ -139,8 +172,9 @@ boolean subscribeToShardAndConsumeRecords( throws InterruptedException, FanOutSubscriberException { LOG.debug("Subscribing to shard {} ({})", shardId, consumerArn); + final FanOutShardSubscription subscription; try { - openSubscriptionToShard(startingPosition); + subscription = openSubscriptionToShard(startingPosition); } catch (FanOutSubscriberException ex) { // The only exception that should cause a failure is a ResourceNotFoundException // Rethrow the exception to trigger the application to terminate @@ -151,7 +185,7 @@ boolean subscribeToShardAndConsumeRecords( throw ex; } - return consumeAllRecordsFromKinesisShard(eventConsumer); + return consumeAllRecordsFromKinesisShard(eventConsumer, subscription); } /** @@ -163,7 +197,7 @@ boolean subscribeToShardAndConsumeRecords( * @param startingPosition the position in which to start consuming from * @throws FanOutSubscriberException when an exception is propagated from the networking stack */ - private void openSubscriptionToShard(final StartingPosition startingPosition) + private FanOutShardSubscription openSubscriptionToShard(final StartingPosition startingPosition) throws FanOutSubscriberException, InterruptedException { SubscribeToShardRequest request = SubscribeToShardRequest.builder() @@ -196,7 +230,18 @@ private void openSubscriptionToShard(final StartingPosition startingPosition) kinesis.subscribeToShard(request, responseHandler); - waitForSubscriptionLatch.await(); + boolean subscriptionEstablished = + waitForSubscriptionLatch.await( + subscribeToShardTimeout.toMillis(), TimeUnit.MILLISECONDS); + + if (!subscriptionEstablished) { + final String errorMessage = + "Timed out acquiring subscription - " + shardId + " (" + consumerArn + ")"; + LOG.error(errorMessage); + subscription.cancelSubscription(); + handleError( + new RecoverableFanOutSubscriberException(new TimeoutException(errorMessage))); + } Throwable throwable = exception.get(); if (throwable != null) { @@ -208,6 +253,8 @@ private void openSubscriptionToShard(final StartingPosition startingPosition) // Request the first record to kick off consumption // Following requests are made by the FanOutShardSubscription on the netty thread subscription.requestRecord(); + + return subscription; } /** @@ -234,6 +281,8 @@ private void handleError(final Throwable throwable) throws FanOutSubscriberExcep if (isInterrupted(throwable)) { throw new FanOutSubscriberInterruptedException(throwable); + } else if (cause instanceof FanOutSubscriberException) { + throw (FanOutSubscriberException) cause; } else if (cause instanceof ReadTimeoutException) { // ReadTimeoutException occurs naturally under backpressure scenarios when full batches // take longer to @@ -271,24 +320,26 @@ private boolean isInterrupted(final Throwable throwable) { * while consuming records, indicated by a {@link SubscriptionErrorEvent} * * @param eventConsumer the event consumer to deliver records to + * @param subscription the subscription we are subscribed to * @return true if there are no more messages (complete), false if a subsequent subscription * should be obtained * @throws FanOutSubscriberException when an exception is propagated from the networking stack * @throws InterruptedException when the thread is interrupted */ private boolean consumeAllRecordsFromKinesisShard( - final Consumer eventConsumer) + final Consumer eventConsumer, + final FanOutShardSubscription subscription) throws InterruptedException, FanOutSubscriberException { String continuationSequenceNumber; + boolean result = true; do { FanOutSubscriptionEvent subscriptionEvent; - if (queue.isEmpty() && subscriptionErrorEvent.get() != null) { + if (subscriptionErrorEvent.get() != null) { subscriptionEvent = subscriptionErrorEvent.get(); } else { - // Read timeout will occur after 30 seconds, add a sanity timeout here to prevent - // lockup - subscriptionEvent = queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS); + // Read timeout occurs after 30 seconds, add a sanity timeout to prevent lockup + subscriptionEvent = queue.poll(queueWaitTimeout.toMillis(), MILLISECONDS); } if (subscriptionEvent == null) { @@ -296,7 +347,8 @@ private boolean consumeAllRecordsFromKinesisShard( "Timed out polling events from network, reacquiring subscription - {} ({})", shardId, consumerArn); - return false; + result = false; + break; } else if (subscriptionEvent.isSubscribeToShardEvent()) { SubscribeToShardEvent event = subscriptionEvent.getSubscribeToShardEvent(); continuationSequenceNumber = event.continuationSequenceNumber(); @@ -304,19 +356,18 @@ private boolean consumeAllRecordsFromKinesisShard( eventConsumer.accept(event); } } else if (subscriptionEvent.isSubscriptionComplete()) { - if (subscriptionErrorEvent.get() != null) { - handleError(subscriptionErrorEvent.get().getThrowable()); - } - // The subscription is complete, but the shard might not be, so we return incomplete - return false; + result = false; + break; } else { handleError(subscriptionEvent.getThrowable()); - return false; + result = false; + break; } } while (continuationSequenceNumber != null); - return true; + subscription.cancelSubscription(); + return result; } /** @@ -376,15 +427,19 @@ public void onError(Throwable throwable) { consumerArn, throwable); + SubscriptionErrorEvent subscriptionErrorEvent = new SubscriptionErrorEvent(throwable); + if (FanOutShardSubscriber.this.subscriptionErrorEvent.get() == null) { + FanOutShardSubscriber.this.subscriptionErrorEvent.set(subscriptionErrorEvent); + } else { + LOG.warn("Error already queued. Ignoring subsequent exception.", throwable); + } + // Cancel the subscription to signal the onNext to stop requesting data cancelSubscription(); - if (subscriptionErrorEvent.get() == null) { - subscriptionErrorEvent.set(new SubscriptionErrorEvent(throwable)); - } else { - LOG.warn( - "Previous error passed to consumer for processing. Ignoring subsequent exception.", - throwable); + // If there is space in the queue, insert the error to wake up blocked thread + if (queue.isEmpty()) { + queue.offer(subscriptionErrorEvent); } } @@ -395,8 +450,12 @@ public void onComplete() { } private void cancelSubscription() { - if (!cancelled) { - cancelled = true; + if (cancelled) { + return; + } + cancelled = true; + + if (subscription != null) { subscription.cancel(); } } @@ -407,8 +466,25 @@ private void cancelSubscription() { * @param event the event to enqueue */ private void enqueueEvent(final FanOutSubscriptionEvent event) { + if (cancelled) { + return; + } + try { - queue.put(event); + if (!queue.offer(event, queueWaitTimeout.toMillis(), TimeUnit.MILLISECONDS)) { + final String errorMessage = + "Timed out enqueuing event " + + event.getClass().getSimpleName() + + " - " + + shardId + + " (" + + consumerArn + + ")"; + LOG.error(errorMessage); + onError( + new RecoverableFanOutSubscriberException( + new TimeoutException(errorMessage))); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java index 2cd5f33c9e9dd..aea01b013bf05 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java @@ -72,10 +72,6 @@ import java.util.stream.Collectors; import static java.util.Collections.singletonList; -import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.NONE; -import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_REGISTRATION_TYPE; -import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE; -import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -96,12 +92,13 @@ public void testIsRunning() { assertTrue(fetcher.isRunning()); } - @Test - public void testIsRunningFalseAfterShutDown() { + @Test(timeout = 10000) + public void testIsRunningFalseAfterShutDown() throws InterruptedException { KinesisDataFetcher fetcher = createTestDataFetcherWithNoShards(10, 2, "test-stream"); fetcher.shutdownFetcher(); + fetcher.awaitTermination(); assertFalse(fetcher.isRunning()); } @@ -990,19 +987,15 @@ public void go() throws Exception { fetcher.wasInterrupted); } - @Test - public void testRecordPublisherFactoryIsTornDown() { - Properties config = TestUtils.getStandardProperties(); - config.setProperty(RECORD_PUBLISHER_TYPE, EFO.name()); - config.setProperty(EFO_REGISTRATION_TYPE, NONE.name()); - + @Test(timeout = 1000L) + public void testRecordPublisherFactoryIsTornDown() throws InterruptedException { KinesisProxyV2Interface kinesisV2 = mock(KinesisProxyV2Interface.class); TestableKinesisDataFetcher fetcher = new TestableKinesisDataFetcher( singletonList("fakeStream1"), new TestSourceContext<>(), - config, + TestUtils.efoProperties(), new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()), 10, 2, @@ -1014,7 +1007,64 @@ public void testRecordPublisherFactoryIsTornDown() { fetcher.shutdownFetcher(); + fetcher.awaitTermination(); + } + + @Test(timeout = 10000) + public void testRecordPublisherFactoryIsTornDownWhenDeregisterStreamConsumerThrowsException() + throws InterruptedException { + KinesisProxyV2Interface kinesisV2 = mock(KinesisProxyV2Interface.class); + + TestableKinesisDataFetcher fetcher = + new TestableKinesisDataFetcher( + singletonList("fakeStream1"), + new TestSourceContext<>(), + TestUtils.efoProperties(), + new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()), + 10, + 2, + new AtomicReference<>(), + new LinkedList<>(), + new HashMap<>(), + mock(KinesisProxyInterface.class), + kinesisV2) { + @Override + protected void deregisterStreamConsumer() { + throw new RuntimeException(); + } + }; + + fetcher.shutdownFetcher(); + verify(kinesisV2).close(); + fetcher.awaitTermination(); + } + + @Test(timeout = 10000) + public void testExecutorServiceShutDownWhenCloseRecordPublisherFactoryThrowsException() + throws InterruptedException { + TestableKinesisDataFetcher fetcher = + new TestableKinesisDataFetcher( + singletonList("fakeStream1"), + new TestSourceContext<>(), + TestUtils.efoProperties(), + new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()), + 10, + 2, + new AtomicReference<>(), + new LinkedList<>(), + new HashMap<>(), + mock(KinesisProxyInterface.class), + mock(KinesisProxyV2Interface.class)) { + @Override + protected void closeRecordPublisherFactory() { + throw new RuntimeException(); + } + }; + + fetcher.shutdownFetcher(); + + fetcher.awaitTermination(); } private KinesisDataFetcher createTestDataFetcherWithNoShards( diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java index a4395c2ebe69f..a558ffbdba511 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java @@ -24,9 +24,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import java.time.Duration; import java.util.ArrayList; @@ -46,18 +43,18 @@ import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.REGISTER_STREAM_TIMEOUT_SECONDS; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS; import static org.junit.Assert.assertEquals; /** Tests for {@link FanOutRecordPublisherConfiguration}. */ -@RunWith(PowerMockRunner.class) -@PrepareForTest(FanOutRecordPublisherConfiguration.class) public class FanOutRecordPublisherConfigurationTest extends TestLogger { - @Rule private ExpectedException exception = ExpectedException.none(); + + @Rule public ExpectedException thrown = ExpectedException.none(); @Test public void testPollingRecordPublisher() { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Only efo record publisher can register a FanOutProperties."); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Only efo record publisher can register a FanOutProperties."); Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(RECORD_PUBLISHER_TYPE, RecordPublisherType.POLLING.toString()); @@ -82,8 +79,8 @@ public void testEagerStrategyWithConsumerName() { public void testEagerStrategyWithNoConsumerName() { String msg = "No valid enhanced fan-out consumer name is set through " + EFO_CONSUMER_NAME; - exception.expect(IllegalArgumentException.class); - exception.expectMessage(msg); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(msg); Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.toString()); @@ -115,8 +112,8 @@ public void testNoneStrategyWithNoStreams() { String msg = "Invalid efo consumer arn settings for not providing consumer arns: flink.stream.efo.consumerarn.fakedstream1, flink.stream.efo.consumerarn.fakedstream2"; - exception.expect(IllegalArgumentException.class); - exception.expectMessage(msg); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(msg); Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.toString()); @@ -131,8 +128,8 @@ public void testNoneStrategyWithNotEnoughStreams() { String msg = "Invalid efo consumer arn settings for not providing consumer arns: flink.stream.efo.consumerarn.fakedstream2"; - exception.expect(IllegalArgumentException.class); - exception.expectMessage(msg); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(msg); Properties testConfig = TestUtils.getStandardProperties(); testConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.toString()); @@ -169,4 +166,29 @@ public void testParseDeregisterStreamConsumerTimeout() { assertEquals(Duration.ofSeconds(60), configuration.getRegisterStreamConsumerTimeout()); assertEquals(Duration.ofSeconds(240), configuration.getDeregisterStreamConsumerTimeout()); } + + @Test + public void testParseSubscribeToShardTimeout() { + Properties testConfig = TestUtils.getStandardProperties(); + testConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.toString()); + testConfig.setProperty(EFO_CONSUMER_NAME, "name"); + testConfig.setProperty(SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS, "123"); + + FanOutRecordPublisherConfiguration configuration = + new FanOutRecordPublisherConfiguration(testConfig, Collections.emptyList()); + + assertEquals(Duration.ofSeconds(123), configuration.getSubscribeToShardTimeout()); + } + + @Test + public void testDefaultSubscribeToShardTimeout() { + Properties testConfig = TestUtils.getStandardProperties(); + testConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.toString()); + testConfig.setProperty(EFO_CONSUMER_NAME, "name"); + + FanOutRecordPublisherConfiguration configuration = + new FanOutRecordPublisherConfiguration(testConfig, Collections.emptyList()); + + assertEquals(Duration.ofSeconds(60), configuration.getSubscribeToShardTimeout()); + } } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java index 0061cdc80d832..c5962df49a62a 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface; import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory; import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SubscriptionErrorKinesisV2; @@ -27,6 +28,10 @@ import org.junit.rules.ExpectedException; import software.amazon.awssdk.services.kinesis.model.StartingPosition; +import java.time.Duration; + +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT; + /** Tests for {@link FanOutShardSubscriber}. */ public class FanOutShardSubscriberTest { @@ -42,7 +47,11 @@ public void testRecoverableErrorThrownToConsumer() throws Exception { ReadTimeoutException.INSTANCE); FanOutShardSubscriber subscriber = - new FanOutShardSubscriber("consumerArn", "shardId", errorKinesisV2); + new FanOutShardSubscriber( + "consumerArn", + "shardId", + errorKinesisV2, + DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT); software.amazon.awssdk.services.kinesis.model.StartingPosition startingPosition = software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build(); @@ -59,7 +68,11 @@ public void testRetryableErrorThrownToConsumer() throws Exception { FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(error); FanOutShardSubscriber subscriber = - new FanOutShardSubscriber("consumerArn", "shardId", errorKinesisV2); + new FanOutShardSubscriber( + "consumerArn", + "shardId", + errorKinesisV2, + DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT); software.amazon.awssdk.services.kinesis.model.StartingPosition startingPosition = software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build(); @@ -75,7 +88,11 @@ public void testInterruptedErrorThrownToConsumer() throws Exception { FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(error); FanOutShardSubscriber subscriber = - new FanOutShardSubscriber("consumerArn", "shardId", errorKinesisV2); + new FanOutShardSubscriber( + "consumerArn", + "shardId", + errorKinesisV2, + DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT); software.amazon.awssdk.services.kinesis.model.StartingPosition startingPosition = software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build(); @@ -93,9 +110,56 @@ public void testMultipleErrorsThrownPassesFirstErrorToConsumer() throws Exceptio FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(error1, error2); FanOutShardSubscriber subscriber = - new FanOutShardSubscriber("consumerArn", "shardId", errorKinesisV2); + new FanOutShardSubscriber( + "consumerArn", + "shardId", + errorKinesisV2, + DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT); StartingPosition startingPosition = StartingPosition.builder().build(); subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {}); } + + @Test + public void testTimeoutSubscribingToShard() throws Exception { + thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class); + thrown.expectMessage("Timed out acquiring subscription"); + + KinesisProxyV2Interface kinesis = + FakeKinesisFanOutBehavioursFactory.failsToAcquireSubscription(); + + FanOutShardSubscriber subscriber = + new FanOutShardSubscriber("consumerArn", "shardId", kinesis, Duration.ofMillis(1)); + + StartingPosition startingPosition = StartingPosition.builder().build(); + subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {}); + } + + @Test + public void testTimeoutEnqueuingEvent() throws Exception { + thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class); + thrown.expectMessage("Timed out enqueuing event SubscriptionNextEvent"); + + KinesisProxyV2Interface kinesis = + FakeKinesisFanOutBehavioursFactory.boundedShard().withBatchCount(5).build(); + + FanOutShardSubscriber subscriber = + new FanOutShardSubscriber( + "consumerArn", + "shardId", + kinesis, + DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT, + Duration.ofMillis(100)); + + StartingPosition startingPosition = StartingPosition.builder().build(); + subscriber.subscribeToShardAndConsumeRecords( + startingPosition, + event -> { + try { + Thread.sleep(120); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + } } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java index 87cc518630ca8..1107efb88a7a1 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java @@ -96,6 +96,10 @@ public static SubscriptionErrorKinesisV2 alternatingSuccessErrorDuringSubscripti return new AlternatingSubscriptionErrorKinesisV2(LimitExceededException.builder().build()); } + public static KinesisProxyV2Interface failsToAcquireSubscription() { + return new FailsToAcquireSubscriptionKinesis(); + } + // ------------------------------------------------------------------------ // Behaviours related to describing streams // ------------------------------------------------------------------------ @@ -122,6 +126,18 @@ public static StreamConsumerFakeKinesis registerExistingConsumerAndWaitToBecomeA return new StreamConsumerFakeKinesis.Builder().withStreamConsumerStatus(CREATING).build(); } + /** A dummy EFO implementation that fails to acquire subscription (no response). */ + private static class FailsToAcquireSubscriptionKinesis extends KinesisProxyV2InterfaceAdapter { + + @Override + public CompletableFuture subscribeToShard( + final SubscribeToShardRequest request, + final SubscribeToShardResponseHandler responseHandler) { + + return CompletableFuture.supplyAsync(() -> null); + } + } + public static AbstractSingleShardFanOutKinesisV2 emptyBatchFollowedBySingleRecord() { return new AbstractSingleShardFanOutKinesisV2(2) { private int subscription = 0; @@ -189,6 +205,12 @@ private SubscriptionErrorKinesisV2(final Throwable... throwables) { @Override void sendEvents(Subscriber subscriber) { sendEventBatch(subscriber); + try { + // Add an artificial delay to allow records to flush + Thread.sleep(200); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } for (Throwable throwable : throwables) { subscriber.onError(throwable); } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java index aadb3fa12cba1..d517cc352734e 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java @@ -28,6 +28,7 @@ import org.mockito.invocation.InvocationOnMock; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -49,6 +50,7 @@ public class TestableKinesisDataFetcher extends KinesisDataFetcher { private final OneShotLatch shutdownWaiter; private volatile boolean running; + private volatile boolean executorServiceShutdownNowCalled; public TestableKinesisDataFetcher( List fakeStreams, @@ -128,14 +130,22 @@ public void waitUntilShutdown(long timeout, TimeUnit timeUnit) throws Exception @Override protected ExecutorService createShardConsumersThreadPool(String subtaskName) { // this is just a dummy fetcher, so no need to create a thread pool for shard consumers - ExecutorService mockExecutor = mock(ExecutorService.class); - when(mockExecutor.isTerminated()).thenAnswer((InvocationOnMock invocation) -> !running); + ExecutorService mockExecutorService = mock(ExecutorService.class); + when(mockExecutorService.isTerminated()) + .thenAnswer((InvocationOnMock invocation) -> !running); + when(mockExecutorService.shutdownNow()) + .thenAnswer( + invocationOnMock -> { + executorServiceShutdownNowCalled = true; + return Collections.emptyList(); + }); try { - when(mockExecutor.awaitTermination(anyLong(), any())).thenReturn(!running); + when(mockExecutorService.awaitTermination(anyLong(), any())) + .thenAnswer(invocationOnMock -> !running && executorServiceShutdownNowCalled); } catch (InterruptedException e) { // We're just trying to stub the method. Must acknowledge the checked exception. } - return mockExecutor; + return mockExecutorService; } @Override