Skip to content

Commit

Permalink
[FLINK-22479[Kinesis][Consumer] Potential lock-up under error condition
Browse files Browse the repository at this point in the history
  • Loading branch information
dannycranmer committed Apr 27, 2021
1 parent 2b7bb05 commit 23ab254
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ public enum EFORegistrationType {
public static final String SUBSCRIBE_TO_SHARD_RETRIES =
"flink.shard.subscribetoshard.maxretries";

/** A timeout when waiting for a shard subscription to be established. */
public static final String SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS =
"flink.shard.subscribetoshard.timeout";

/** The base backoff time between each subscribeToShard attempt. */
public static final String SUBSCRIBE_TO_SHARD_BACKOFF_BASE =
"flink.shard.subscribetoshard.backoff.base";
Expand Down Expand Up @@ -363,6 +367,8 @@ public enum EFORegistrationType {

public static final int DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES = 10;

public static final Duration DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT = Duration.ofSeconds(60);

public static final long DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE = 1000L;

public static final long DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX = 2000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,34 +800,59 @@ public HashMap<StreamShardMetadata, SequenceNumber> snapshotState() {
* executed and all shard consuming threads will be interrupted.
*/
public void shutdownFetcher() {
if (LOG.isInfoEnabled()) {
LOG.info(
"Starting shutdown of shard consumer threads and AWS SDK resources of subtask {} ...",
indexOfThisConsumerSubtask);
}
LOG.info(
"Starting shutdown of shard consumer threads and AWS SDK resources of subtask {} ...",
indexOfThisConsumerSubtask,
error.get());

running = false;
try {
try {
deregisterStreamConsumer();
} catch (Exception e) {
LOG.warn("Encountered exception deregistering stream consumers", e);
}

StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps, streams);

recordPublisherFactory.close();
try {
closeRecordPublisherFactory();
} catch (Exception e) {
LOG.warn("Encountered exception closing record publisher factory", e);
}
} finally {
shardConsumersExecutor.shutdownNow();

shardConsumersExecutor.shutdownNow();
if (mainThread != null) {
mainThread
.interrupt(); // the main thread may be sleeping for the discovery interval
}

if (mainThread != null) {
mainThread.interrupt(); // the main thread may be sleeping for the discovery interval
if (watermarkTracker != null) {
watermarkTracker.close();
}
this.recordEmitter.stop();
}

if (watermarkTracker != null) {
watermarkTracker.close();
}
this.recordEmitter.stop();
LOG.info(
"Shutting down the shard consumer threads of subtask {} ...",
indexOfThisConsumerSubtask);
}

if (LOG.isInfoEnabled()) {
LOG.info(
"Shutting down the shard consumer threads of subtask {} ...",
indexOfThisConsumerSubtask);
}
/**
* Closes recordRecordPublisherFactory. Allows test to override this to simulate exception for
* shutdown logic.
*/
@VisibleForTesting
protected void closeRecordPublisherFactory() {
recordPublisherFactory.close();
}

/**
* Deregisters stream consumers. Allows test to override this to simulate exception for shutdown
* logic.
*/
@VisibleForTesting
protected void deregisterStreamConsumer() {
StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps, streams);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ private RecordPublisherRunResult runWithBackoff(
final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException {
FanOutShardSubscriber fanOutShardSubscriber =
new FanOutShardSubscriber(
consumerArn, subscribedShard.getShard().getShardId(), kinesisProxy);
consumerArn,
subscribedShard.getShard().getShardId(),
kinesisProxy,
configuration.getSubscribeToShardTimeout());
boolean complete;

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class FanOutRecordPublisherConfiguration {
/** Base backoff millis for the deregister stream operation. */
private final int subscribeToShardMaxRetries;

/** A timeout when waiting for a shard subscription to be established. */
private final Duration subscribeToShardTimeout;

/** Maximum backoff millis for the subscribe to shard operation. */
private final long subscribeToShardMaxBackoffMillis;

Expand Down Expand Up @@ -156,6 +159,13 @@ public FanOutRecordPublisherConfiguration(
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES))
.map(Integer::parseInt)
.orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES);
this.subscribeToShardTimeout =
Optional.ofNullable(
configProps.getProperty(
ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_TIMEOUT_SECONDS))
.map(Integer::parseInt)
.map(Duration::ofSeconds)
.orElse(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
this.subscribeToShardBaseBackoffMillis =
Optional.ofNullable(
configProps.getProperty(
Expand Down Expand Up @@ -319,6 +329,11 @@ public int getSubscribeToShardMaxRetries() {
return subscribeToShardMaxRetries;
}

/** Get timeout when waiting for a shard subscription to be established. */
public Duration getSubscribeToShardTimeout() {
return subscribeToShardTimeout;
}

/** Get maximum backoff millis for the subscribe to shard operation. */
public long getSubscribeToShardMaxBackoffMillis() {
return subscribeToShardMaxBackoffMillis;
Expand Down
Loading

0 comments on commit 23ab254

Please sign in to comment.