Skip to content

Commit

Permalink
GH-224: More customization for KCL
Browse files Browse the repository at this point in the history
Fixes: #224
Issue link: #224
  • Loading branch information
artembilan committed Sep 24, 2024
1 parent 086199a commit ffc0f2e
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 6 deletions.
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
<properties>
<java.version>17</java.version>
<spring-cloud-stream.version>4.0.5</spring-cloud-stream.version>
<spring-cloud-aws.version>3.0.4</spring-cloud-aws.version>
<spring-integration-aws.version>3.0.6</spring-integration-aws.version>
<amazon-kinesis-client.version>2.5.7</amazon-kinesis-client.version>
<amazon-kinesis-producer.version>0.15.10</amazon-kinesis-producer.version>
<testcontainers.version>1.19.7</testcontainers.version>
<spring-cloud-aws.version>3.0.5</spring-cloud-aws.version>
<spring-integration-aws.version>3.0.8-SNAPSHOT</spring-integration-aws.version>
<amazon-kinesis-client.version>2.5.8</amazon-kinesis-client.version>
<amazon-kinesis-producer.version>0.15.11</amazon-kinesis-producer.version>
<testcontainers.version>1.20.1</testcontainers.version>
</properties>

<modules>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,36 @@ Whether to extract headers and payload from Kinesis record data.
+
Default: `false`

emptyRecordList::
Whether to emit an empty list of records into a consumer.
Works only in `listenerMode.batch`.
+
Default: `false`

Starting with version `4.0.4` (basically since `spring-integration-aws-3.0.8`), the `KclMessageDrivenChannelAdapter` can be customized programmatically for the `ConfigsBuilder` parts.
For example, to set a custom value for the `LeaseManagementConfig.maxLeasesForWorker` property, the `ConsumerEndpointCustomizer<KclMessageDrivenChannelAdapter>` bean has to be provided:

[source,java]
----
@Bean
ConsumerEndpointCustomizer<KclMessageDrivenChannelAdapter> consumerEndpointCustomizer() {
return (endpoint, destinationName, group) ->
endpoint.setLeaseManagementConfigCustomizer(leaseManagementConfig ->
leaseManagementConfig.maxLeasesForWorker(10));
}
----

Other similar setters on the `KclMessageDrivenChannelAdapter` are:

[source,java]
----
setCoordinatorConfigCustomizer(Consumer<CoordinatorConfig> coordinatorConfigCustomizer);
setLifecycleConfigCustomizer(Consumer<LifecycleConfig> lifecycleConfigCustomizer);
setMetricsConfigCustomizer(Consumer<MetricsConfig> metricsConfigCustomizer);
----

=== Kinesis Producer Properties

The following properties are available for Kinesis producers only and must be prefixed with `spring.cloud.stream.kinesis.bindings.<bindingTarget>.producer.`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ private MessageProducerSupport createKclConsumerEndpoint(ConsumerDestination des
adapter.setListenerMode(kinesisConsumerProperties.getListenerMode());
adapter.setFanOut(kinesisConsumerProperties.isFanOut());
adapter.setMetricsLevel(kinesisConsumerProperties.getMetricsLevel());
adapter.setEmptyRecordList(kinesisConsumerProperties.isEmptyRecordList());
if (properties.getExtension().isEmbedHeaders()) {
adapter.setEmbeddedHeadersMapper(this.embeddedHeadersMapper);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,6 +67,11 @@ public class KinesisConsumerProperties {
*/
private boolean fanOut = true;

/**
* The KCL emptyRecordList option for batch listener mode.
*/
private boolean emptyRecordList = false;

private boolean embedHeaders;

/**
Expand Down Expand Up @@ -178,4 +183,12 @@ public void setMetricsLevel(MetricsLevel metricsLevel) {
this.metricsLevel = metricsLevel;
}

public boolean isEmptyRecordList() {
return this.emptyRecordList;
}

public void setEmptyRecordList(boolean emptyRecordList) {
this.emptyRecordList = emptyRecordList;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.config.ConsumerEndpointCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter;
import org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter;
import org.springframework.integration.aws.inbound.kinesis.KinesisShardOffset;
import org.springframework.integration.aws.support.AwsHeaders;
Expand Down

0 comments on commit ffc0f2e

Please sign in to comment.