Skip to content

Commit

Permalink
Merge pull request #2442 from yma96/master
Browse files Browse the repository at this point in the history
Add reconnect backoff config for kafka stream to avoid repeatedly connecting to host in a tight loop.
  • Loading branch information
yma96 authored Jul 23, 2024
2 parents 5b99d8c + d1ba41c commit 9fbdf7e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ private Properties setKafkaProps()
props.putIfAbsent( StreamsConfig.APPLICATION_ID_CONFIG, config.getGroup() );
props.putIfAbsent( StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers() );
props.putIfAbsent( StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, config.getRecordsPerPartition() );
props.putIfAbsent( StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, config.getReconnectBackoff() );
props.putIfAbsent( StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, config.getReconnectBackoffMax() );

logger.info( "Kafka props: {}", props );
return props;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public class KafkaConfig

private static final Integer DEFAULT_RECORDS_PER_PARTITION = 1000;

private static final Long DEFAULT_RECONNECT_BACKOFF_MS = 60 * 1000L;

private static final Long DEFAULT_RECONNECT_BACKOFF_MAX_MS = 30 * 60 * 1000L;

private static final boolean DEFAULT_ENABLED = true;

private static final boolean DEFAULT_TRACE = false;
Expand All @@ -58,6 +62,10 @@ public class KafkaConfig

private Boolean tracing;

private Long reconnectBackoff;

private Long reconnectBackoffMax;

public KafkaConfig()
{
}
Expand Down Expand Up @@ -146,6 +154,28 @@ public void setTracing( boolean tracing )
this.tracing = tracing;
}

public Long getReconnectBackoff()
{
return reconnectBackoff == null ? DEFAULT_RECONNECT_BACKOFF_MS : reconnectBackoff;
}

@ConfigName( "kafka.reconnect.backoff" )
public void setReconnectBackoff( Long reconnectBackoff )
{
this.reconnectBackoff = reconnectBackoff;
}

public Long getReconnectBackoffMax()
{
return reconnectBackoffMax == null ? DEFAULT_RECONNECT_BACKOFF_MAX_MS : reconnectBackoffMax;
}

@ConfigName( "kafka.reconnect.backoff.max" )
public void setReconnectBackoffMax( Long reconnectBackoffMax )
{
this.reconnectBackoffMax = reconnectBackoffMax;
}

@Override
public String getDefaultConfigFileName()
{
Expand Down

0 comments on commit 9fbdf7e

Please sign in to comment.