Skip to content

Commit d141ad3

Browse files
danishgargsobychacko
authored andcommitted
Changed occurances of map calls on kafka streams to mapValues
Resolves spring-attic#357
1 parent 75dd5f2 commit d141ad3

File tree

2 files changed

+13
-14
lines changed

2 files changed

+13
-14
lines changed

spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsMessageConversionDelegate.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,17 @@ public KStream serializeOnOutbound(KStream<?,?> outboundBindTarget) {
7373
String contentType = this.kstreamBindingInformationCatalogue.getContentType(outboundBindTarget);
7474
MessageConverter messageConverter = compositeMessageConverterFactory.getMessageConverterForAllRegistered();
7575

76-
return outboundBindTarget.map((k, v) -> {
76+
return outboundBindTarget.mapValues((v) -> {
7777
Message<?> message = v instanceof Message<?> ? (Message<?>) v :
7878
MessageBuilder.withPayload(v).build();
7979
Map<String, Object> headers = new HashMap<>(message.getHeaders());
8080
if (!StringUtils.isEmpty(contentType)) {
8181
headers.put(MessageHeaders.CONTENT_TYPE, contentType);
8282
}
8383
MessageHeaders messageHeaders = new MessageHeaders(headers);
84-
return new KeyValue<>(k,
84+
return
8585
messageConverter.toMessage(message.getPayload(),
86-
messageHeaders).getPayload());
86+
messageHeaders).getPayload();
8787
});
8888
}
8989

@@ -137,10 +137,10 @@ else if (o2 instanceof String || o2 instanceof byte[]) {
137137
processErrorFromDeserialization(bindingTarget, branch[1]);
138138

139139
//first branch above is the branch where the messages are converted, let it go through further processing.
140-
return branch[0].map((o, o2) -> {
141-
KeyValue<Object, Object> objectObjectKeyValue = keyValueThreadLocal.get();
140+
return branch[0].mapValues((o2) -> {
141+
Object objectValue = keyValueThreadLocal.get().value;
142142
keyValueThreadLocal.remove();
143-
return objectObjectKeyValue;
143+
return objectValue;
144144
});
145145
}
146146

spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsStreamListenerSetupMethodOrchestrator.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.kafka.common.serialization.Serde;
2828
import org.apache.kafka.common.utils.Bytes;
2929
import org.apache.kafka.streams.Consumed;
30-
import org.apache.kafka.streams.KeyValue;
3130
import org.apache.kafka.streams.StreamsBuilder;
3231
import org.apache.kafka.streams.StreamsConfig;
3332
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
@@ -300,18 +299,18 @@ private <K,V> KTable<K,V> materializedAs(StreamsBuilder streamsBuilder, String d
300299
else {
301300
LOG.info("Native decoding is disabled for " + inboundName + ". Inbound message conversion done by Spring Cloud Stream.");
302301
}
303-
stream = stream.map((key, value) -> {
304-
KeyValue<Object, Object> keyValue;
302+
303+
stream = stream.mapValues(value -> {
304+
Object returnValue;
305305
String contentType = bindingProperties.getContentType();
306306
if (!StringUtils.isEmpty(contentType) && !nativeDecoding) {
307307
Message<?> message = MessageBuilder.withPayload(value)
308308
.setHeader(MessageHeaders.CONTENT_TYPE, contentType).build();
309-
keyValue = new KeyValue<>(key, message);
310-
}
311-
else {
312-
keyValue = new KeyValue<>(key, value);
309+
returnValue = message;
310+
} else {
311+
returnValue = value;
313312
}
314-
return keyValue;
313+
return returnValue;
315314
});
316315
return stream;
317316
}

0 commit comments

Comments
 (0)