Skip to content
This repository was archived by the owner on Nov 20, 2024. It is now read-only.

Commit 1b80cfc

Browse files
committed
Avoid unnecessary re-partitioning due to map calls.
Fixing streams get unnecessarily flagged for re-partitioning from map calls on KStream. Resolves #412
1 parent 0d5d092 commit 1b80cfc

File tree

3 files changed

+45
-42
lines changed

3 files changed

+45
-42
lines changed

spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamStreamListenerParameterAdapter.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616

1717
package org.springframework.cloud.stream.binder.kafka.streams;
1818

19-
import org.apache.kafka.streams.KeyValue;
2019
import org.apache.kafka.streams.kstream.KStream;
21-
import org.apache.kafka.streams.kstream.KeyValueMapper;
2220

2321
import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
2422
import org.springframework.core.MethodParameter;
@@ -52,7 +50,7 @@ public KStream adapt(KStream<?, ?> bindingTarget, MethodParameter parameter) {
5250
final Class<?> valueClass = (resolvableType.getGeneric(1).getRawClass() != null)
5351
? (resolvableType.getGeneric(1).getRawClass()) : Object.class;
5452
if (this.KafkaStreamsBindingInformationCatalogue.isUseNativeDecoding(bindingTarget)) {
55-
return bindingTarget.map((KeyValueMapper) KeyValue::new);
53+
return bindingTarget;
5654
}
5755
else {
5856
return kafkaStreamsMessageConversionDelegate.deserializeOnInbound(valueClass, bindingTarget);

spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamStreamListenerResultAdapter.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.io.Closeable;
2020
import java.io.IOException;
2121

22-
import org.apache.kafka.streams.KeyValue;
2322
import org.apache.kafka.streams.kstream.KStream;
2423

2524
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
@@ -38,7 +37,7 @@ public boolean supports(Class<?> resultType, Class<?> boundElement) {
3837
@Override
3938
@SuppressWarnings("unchecked")
4039
public Closeable adapt(KStream streamListenerResult, KStreamBoundElementFactory.KStreamWrapper boundElement) {
41-
boundElement.wrap(streamListenerResult.map(KeyValue::new));
40+
boundElement.wrap(streamListenerResult);
4241
return new NoOpCloseable();
4342
}
4443

spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/StreamToTableJoinIntegrationTests.java

+43-37
Original file line numberDiff line numberDiff line change
@@ -129,31 +129,8 @@ public void testStreamToTable() throws Exception {
129129
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId=StreamToTableJoinIntegrationTests-abc",
130130
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
131131
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString())) {
132-
// Input 1: Clicks per user (multiple records allowed per user).
133-
List<KeyValue<String, Long>> userClicks = Arrays.asList(
134-
new KeyValue<>("alice", 13L),
135-
new KeyValue<>("bob", 4L),
136-
new KeyValue<>("chao", 25L),
137-
new KeyValue<>("bob", 19L),
138-
new KeyValue<>("dave", 56L),
139-
new KeyValue<>("eve", 78L),
140-
new KeyValue<>("alice", 40L),
141-
new KeyValue<>("fang", 99L)
142-
);
143-
144-
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
145-
senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
146-
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
147132

148-
DefaultKafkaProducerFactory<String, Long> pf = new DefaultKafkaProducerFactory<>(senderProps);
149-
KafkaTemplate<String, Long> template = new KafkaTemplate<>(pf, true);
150-
template.setDefaultTopic("user-clicks-1");
151-
152-
for (KeyValue<String, Long> keyValue : userClicks) {
153-
template.sendDefault(keyValue.key, keyValue.value);
154-
}
155-
156-
// Input 2: Region per user (multiple records allowed per user).
133+
// Input 1: Region per user (multiple records allowed per user).
157134
List<KeyValue<String, String>> userRegions = Arrays.asList(
158135
new KeyValue<>("alice", "asia"), /* Alice lived in Asia originally... */
159136
new KeyValue<>("bob", "americas"),
@@ -176,6 +153,30 @@ public void testStreamToTable() throws Exception {
176153
template1.sendDefault(keyValue.key, keyValue.value);
177154
}
178155

156+
// Input 2: Clicks per user (multiple records allowed per user).
157+
List<KeyValue<String, Long>> userClicks = Arrays.asList(
158+
new KeyValue<>("alice", 13L),
159+
new KeyValue<>("bob", 4L),
160+
new KeyValue<>("chao", 25L),
161+
new KeyValue<>("bob", 19L),
162+
new KeyValue<>("dave", 56L),
163+
new KeyValue<>("eve", 78L),
164+
new KeyValue<>("alice", 40L),
165+
new KeyValue<>("fang", 99L)
166+
);
167+
168+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
169+
senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
170+
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
171+
172+
DefaultKafkaProducerFactory<String, Long> pf = new DefaultKafkaProducerFactory<>(senderProps);
173+
KafkaTemplate<String, Long> template = new KafkaTemplate<>(pf, true);
174+
template.setDefaultTopic("user-clicks-1");
175+
176+
for (KeyValue<String, Long> keyValue : userClicks) {
177+
template.sendDefault(keyValue.key, keyValue.value);
178+
}
179+
179180
List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList(
180181
new KeyValue<>("americas", 101L),
181182
new KeyValue<>("europe", 109L),
@@ -267,19 +268,6 @@ public void testGlobalStartOffsetWithLatestAndIndividualBindingWthEarliest() thr
267268
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
268269
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString())) {
269270
Thread.sleep(1000L);
270-
// Input 1: Clicks per user (multiple records allowed per user).
271-
List<KeyValue<String, Long>> userClicks1 = Arrays.asList(
272-
new KeyValue<>("bob", 4L),
273-
new KeyValue<>("chao", 25L),
274-
new KeyValue<>("bob", 19L),
275-
new KeyValue<>("dave", 56L),
276-
new KeyValue<>("eve", 78L),
277-
new KeyValue<>("fang", 99L)
278-
);
279-
280-
for (KeyValue<String, Long> keyValue : userClicks1) {
281-
template.sendDefault(keyValue.key, keyValue.value);
282-
}
283271

284272
// Input 2: Region per user (multiple records allowed per user).
285273
List<KeyValue<String, String>> userRegions = Arrays.asList(
@@ -304,6 +292,24 @@ public void testGlobalStartOffsetWithLatestAndIndividualBindingWthEarliest() thr
304292
template1.sendDefault(keyValue.key, keyValue.value);
305293
}
306294

295+
296+
297+
// Input 1: Clicks per user (multiple records allowed per user).
298+
List<KeyValue<String, Long>> userClicks1 = Arrays.asList(
299+
new KeyValue<>("bob", 4L),
300+
new KeyValue<>("chao", 25L),
301+
new KeyValue<>("bob", 19L),
302+
new KeyValue<>("dave", 56L),
303+
new KeyValue<>("eve", 78L),
304+
new KeyValue<>("fang", 99L)
305+
);
306+
307+
for (KeyValue<String, Long> keyValue : userClicks1) {
308+
template.sendDefault(keyValue.key, keyValue.value);
309+
}
310+
311+
312+
307313
List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList(
308314
new KeyValue<>("americas", 101L),
309315
new KeyValue<>("europe", 56L),

0 commit comments

Comments
 (0)