diff --git a/subsys/kafka/src/main/java/org/commonjava/indy/subsys/kafka/boot/KafkaStreamBooter.java b/subsys/kafka/src/main/java/org/commonjava/indy/subsys/kafka/boot/KafkaStreamBooter.java index 4d83016207..e7b86bd7de 100644 --- a/subsys/kafka/src/main/java/org/commonjava/indy/subsys/kafka/boot/KafkaStreamBooter.java +++ b/subsys/kafka/src/main/java/org/commonjava/indy/subsys/kafka/boot/KafkaStreamBooter.java @@ -59,10 +59,6 @@ public class KafkaStreamBooter private static final long DEFAULT_KAFKA_STREAM_CLOSE_TIMEOUT = 5 * 60 * 3000L; - private static final long RECONNECT_BACKOFF_MS = 60 * 1000L; - - private static final long RECONNECT_BACKOFF_MAX_MS = 30 * 60 * 1000L; - private KafkaStreams streams; @Override @@ -137,8 +133,8 @@ private Properties setKafkaProps() props.putIfAbsent( StreamsConfig.APPLICATION_ID_CONFIG, config.getGroup() ); props.putIfAbsent( StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers() ); props.putIfAbsent( StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, config.getRecordsPerPartition() ); - props.putIfAbsent( StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MS ); - props.putIfAbsent( StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, RECONNECT_BACKOFF_MAX_MS ); + props.putIfAbsent( StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, config.getReconnectBackoff() ); + props.putIfAbsent( StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, config.getReconnectBackoffMax() ); logger.info( "Kafka props: {}", props ); return props; diff --git a/subsys/kafka/src/main/java/org/commonjava/indy/subsys/kafka/conf/KafkaConfig.java b/subsys/kafka/src/main/java/org/commonjava/indy/subsys/kafka/conf/KafkaConfig.java index 84f2c29b6f..29b6b8639a 100644 --- a/subsys/kafka/src/main/java/org/commonjava/indy/subsys/kafka/conf/KafkaConfig.java +++ b/subsys/kafka/src/main/java/org/commonjava/indy/subsys/kafka/conf/KafkaConfig.java @@ -40,6 +40,10 @@ public class KafkaConfig private static final Integer DEFAULT_RECORDS_PER_PARTITION = 1000; + private static final Long DEFAULT_RECONNECT_BACKOFF_MS = 60 * 1000L; + + private static final Long DEFAULT_RECONNECT_BACKOFF_MAX_MS = 30 * 60 * 1000L; + private static final boolean DEFAULT_ENABLED = true; private static final boolean DEFAULT_TRACE = false; @@ -58,6 +62,10 @@ public class KafkaConfig private Boolean tracing; + private Long reconnectBackoff; + + private Long reconnectBackoffMax; + public KafkaConfig() { } @@ -140,6 +148,16 @@ public boolean isTracing() return tracing == null ? DEFAULT_TRACE : tracing; } + public Long getReconnectBackoff() + { + return reconnectBackoff == null ? DEFAULT_RECONNECT_BACKOFF_MS : reconnectBackoff; + } + + public Long getReconnectBackoffMax() + { + return reconnectBackoffMax == null ? DEFAULT_RECONNECT_BACKOFF_MAX_MS : reconnectBackoffMax; + } + @ConfigName( "kafka.trace" ) public void setTracing( boolean tracing ) {