From 993383153e0b1159ea3f39bee50d83f696b14bf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Tamisier?= Date: Sat, 30 May 2020 18:40:16 +0200 Subject: [PATCH] feat: Allow message handling of un-deserializable data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: François Delbrayelle --- generators/app/index.js | 12 ++++ .../service/kafka/GenericConsumer.java.ejs | 14 ++-- .../kafka/consumer/EntityConsumer.java.ejs | 15 ++-- .../DeserializationError.java.ejs | 37 ++++++++++ .../deserializer/EntityDeserializer.java.ejs | 23 ++----- .../kafka/producer/EntityProducer.java.ejs | 2 +- .../serializer/EntitySerializer.java.ejs | 5 +- .../src/main/docker/akhq.yml | 13 ++++ .../myapp/config/KafkaProperties.java | 68 +++++++++++++++++++ .../myapp/service/kafka/GenericConsumer.java | 14 ++-- .../service/kafka/consumer/FooConsumer.java | 13 +++- .../deserializer/DeserializationError.java | 37 ++++++++++ .../kafka/deserializer/FooDeserializer.java | 23 ++----- .../service/kafka/producer/FooProducer.java | 2 +- .../kafka/serializer/FooSerializer.java | 5 +- 15 files changed, 224 insertions(+), 59 deletions(-) create mode 100644 generators/app/templates/src/main/java/package/service/kafka/deserializer/DeserializationError.java.ejs create mode 100644 test/templates/message-broker-with-entities-2nd-call/src/main/docker/akhq.yml create mode 100644 test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/config/KafkaProperties.java create mode 100644 test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/deserializer/DeserializationError.java diff --git a/generators/app/index.js b/generators/app/index.js index ef7ed19..61b4c0f 100644 --- a/generators/app/index.js +++ b/generators/app/index.js @@ -91,6 +91,12 @@ module.exports = class extends BaseGenerator { } writing() { + const vavrVersion = '0.10.3'; + this.addMavenProperty('vavr.version', vavrVersion); + this.addGradleProperty('vavr_version', vavrVersion); + this.addMavenDependency('io.vavr', 'vavr', '${vavr.version}'); // eslint-disable-line no-template-curly-in-string + this.addGradleDependency('implementation', 'io.vavr', 'vavr', '${vavr_version}'); // eslint-disable-line no-template-curly-in-string + // function generate kafka application properties this.generateKafkaProperties = function(enabled) { this.enabled = enabled; @@ -195,6 +201,12 @@ module.exports = class extends BaseGenerator { null, null ); + this.template( + 'src/main/java/package/service/kafka/deserializer/DeserializationError.java.ejs', + `${javaDir}service/kafka/deserializer/DeserializationError.java`, + null, + null + ); } if (this.mustGenerateComponent(entity, 'producer')) { diff --git a/generators/app/templates/src/main/java/package/service/kafka/GenericConsumer.java.ejs b/generators/app/templates/src/main/java/package/service/kafka/GenericConsumer.java.ejs index 134b5f6..897adf9 100644 --- a/generators/app/templates/src/main/java/package/service/kafka/GenericConsumer.java.ejs +++ b/generators/app/templates/src/main/java/package/service/kafka/GenericConsumer.java.ejs @@ -18,6 +18,7 @@ -%> package <%= packageName %>.service.kafka; +import io.vavr.control.Either; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -32,15 +33,16 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -public abstract class GenericConsumer implements Runnable { +import <%= packageName %>.service.kafka.deserializer.DeserializationError; +public abstract class GenericConsumer implements Runnable { private final Logger log = LoggerFactory.getLogger(GenericConsumer.class); private final AtomicBoolean closed = new AtomicBoolean(false); - private final KafkaConsumer consumer; - private String topicName; + private final KafkaConsumer> consumer; + private final String topicName; private final int pollingTimeout; public GenericConsumer(final String topicName, final Map properties, final int pollingTimeout) { @@ -64,8 +66,8 @@ public abstract class GenericConsumer implements Runnable { try { consumer.subscribe(Collections.singleton(topicName)); while (!closed.get()) { - final ConsumerRecords records = consumer.poll(Duration.ofMillis(pollingTimeout)); - for (final ConsumerRecord record : records) { + final ConsumerRecords> records = consumer.poll(Duration.ofMillis(pollingTimeout)); + for (final ConsumerRecord> record : records) { handleMessage(record); } consumer.commitSync(); @@ -86,5 +88,5 @@ public abstract class GenericConsumer implements Runnable { consumer.wakeup(); } - protected abstract void handleMessage(ConsumerRecord record); + protected abstract void handleMessage(ConsumerRecord> record); } diff --git a/generators/app/templates/src/main/java/package/service/kafka/consumer/EntityConsumer.java.ejs b/generators/app/templates/src/main/java/package/service/kafka/consumer/EntityConsumer.java.ejs index 7b2277e..87043b5 100644 --- a/generators/app/templates/src/main/java/package/service/kafka/consumer/EntityConsumer.java.ejs +++ b/generators/app/templates/src/main/java/package/service/kafka/consumer/EntityConsumer.java.ejs @@ -18,6 +18,7 @@ -%> package <%= packageName %>.service.kafka.consumer; +import io.vavr.control.Either; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +29,7 @@ import org.springframework.stereotype.Service; import <%= packageName %>.config.KafkaProperties; import <%= packageName %>.domain.<%= entityClass %>; import <%= packageName %>.service.kafka.GenericConsumer; +import <%= packageName %>.service.kafka.deserializer.DeserializationError; @Service public class <%= entityClass %>Consumer extends GenericConsumer<<%= entityClass %>> { @@ -39,10 +41,15 @@ public class <%= entityClass %>Consumer extends GenericConsumer<<%= entityClass } @Override - protected void handleMessage(final ConsumerRecord> record) { - log.info("Handling message for <%= entityClass %> record: {}", record.value()); - - // TODO: Here is where you can add specific code to handle your messages + protected void handleMessage(final ConsumerRecord>> record) { + final Either> value = record.value(); + if (value.isLeft()) { + log.error("Deserialization record failure: {}", value.getLeft()); + } else { + log.info("Handling message for <%= entityClass %> record: {}", value.get()); + } + + // TODO: Here is where you can handle your messages } @Bean diff --git a/generators/app/templates/src/main/java/package/service/kafka/deserializer/DeserializationError.java.ejs b/generators/app/templates/src/main/java/package/service/kafka/deserializer/DeserializationError.java.ejs new file mode 100644 index 0000000..8703f82 --- /dev/null +++ b/generators/app/templates/src/main/java/package/service/kafka/deserializer/DeserializationError.java.ejs @@ -0,0 +1,37 @@ +package <%= packageName %>.service.kafka.deserializer; + +public class DeserializationError { + + private byte[] data; + + private Exception exception; + + public DeserializationError(byte[] data, Exception exception) { + this.data = data; + this.exception = exception; + } + + public byte[] getData() { + return data; + } + + public void setData(byte[] data) { + this.data = data; + } + + public Exception getException() { + return exception; + } + + public void setException(Exception exception) { + this.exception = exception; + } + + @Override + public String toString() { + return "DeserializationError{" + + "data='" + new String(data) + "'" + + ", exception=" + exception + + '}'; + } +} diff --git a/generators/app/templates/src/main/java/package/service/kafka/deserializer/EntityDeserializer.java.ejs b/generators/app/templates/src/main/java/package/service/kafka/deserializer/EntityDeserializer.java.ejs index 0519a31..986cb22 100644 --- a/generators/app/templates/src/main/java/package/service/kafka/deserializer/EntityDeserializer.java.ejs +++ b/generators/app/templates/src/main/java/package/service/kafka/deserializer/EntityDeserializer.java.ejs @@ -26,22 +26,20 @@ import com.fasterxml.jackson.databind.util.StdDateFormat; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; -import io.jsonwebtoken.io.SerializationException; +import io.vavr.control.Either; import org.apache.kafka.common.serialization.Deserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import <%= packageName %>.domain.<%= entityClass %>; import java.io.IOException; -import java.io.UnsupportedEncodingException; -public class <%= entityClass %>Deserializer implements Deserializer<<%= entityClass %>> { +public class <%= entityClass %>Deserializer implements Deserializer>> { private final Logger log = LoggerFactory.getLogger(<%= entityClass %>Deserializer.class); - private String encoding = "UTF8"; - - private ObjectMapper objectMapper; + private final ObjectMapper objectMapper; public <%= entityClass %>Deserializer() { this.objectMapper = @@ -56,18 +54,11 @@ public class <%= entityClass %>Deserializer implements Deserializer<<%= entityCl } @Override - public <%= entityClass %> deserialize(final String topicName, final byte[] data) { - <%= entityClass %> <%= camelCaseEntityClass %> = null; + public Either> deserialize(final String topicName, final byte[] data) { try { - final String dataString = data == null ? null : new String(data, this.encoding); - <%= camelCaseEntityClass %> = objectMapper.readValue(dataString, <%= entityClass %>.class); - } catch (final UnsupportedEncodingException var4) { - throw new SerializationException( - "Error when deserializing byte[] to string due to unsupported encoding " + this.encoding - ); + return Either.right(objectMapper.readValue(data, <%= entityClass %>.class)); } catch (final IOException e) { - log.error("Cannot read value from " + topicName + " topic", e); + return Either.left(new DeserializationError(data, e)); } - return <%= camelCaseEntityClass %>; } } diff --git a/generators/app/templates/src/main/java/package/service/kafka/producer/EntityProducer.java.ejs b/generators/app/templates/src/main/java/package/service/kafka/producer/EntityProducer.java.ejs index b4a2c7c..8bdbcca 100644 --- a/generators/app/templates/src/main/java/package/service/kafka/producer/EntityProducer.java.ejs +++ b/generators/app/templates/src/main/java/package/service/kafka/producer/EntityProducer.java.ejs @@ -36,7 +36,7 @@ public class <%= entityClass %>Producer { private final KafkaProducer> kafkaProducer; - private String topicName; + private final String topicName; public <%= entityClass %>Producer(@Value("${kafka.producer.<%= camelCaseEntityClass %>.name}") final String topicName, final KafkaProperties kafkaProperties) { this.topicName = topicName; diff --git a/generators/app/templates/src/main/java/package/service/kafka/serializer/EntitySerializer.java.ejs b/generators/app/templates/src/main/java/package/service/kafka/serializer/EntitySerializer.java.ejs index 028d0a5..cb9b3d2 100644 --- a/generators/app/templates/src/main/java/package/service/kafka/serializer/EntitySerializer.java.ejs +++ b/generators/app/templates/src/main/java/package/service/kafka/serializer/EntitySerializer.java.ejs @@ -29,6 +29,7 @@ import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; import org.apache.kafka.common.serialization.Serializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import <%= packageName %>.domain.<%= entityClass %>; import java.io.ByteArrayOutputStream; @@ -38,9 +39,7 @@ public class <%= entityClass %>Serializer implements Serializer<<%= entityClass private final Logger log = LoggerFactory.getLogger(<%= entityClass %>Serializer.class); - private String encoding = "UTF8"; - - private ObjectMapper objectMapper; + private final ObjectMapper objectMapper; public <%= entityClass %>Serializer() { this.objectMapper = diff --git a/test/templates/message-broker-with-entities-2nd-call/src/main/docker/akhq.yml b/test/templates/message-broker-with-entities-2nd-call/src/main/docker/akhq.yml new file mode 100644 index 0000000..3dfdd82 --- /dev/null +++ b/test/templates/message-broker-with-entities-2nd-call/src/main/docker/akhq.yml @@ -0,0 +1,13 @@ +version: '2' +services: + akhq: + image: tchiotludo/akhq + environment: + AKHQ_CONFIGURATION: | + akhq: + connections: + docker-kafka-server: + properties: + bootstrap.servers: "kafka:29092" + ports: + - 11817:8080 diff --git a/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/config/KafkaProperties.java b/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/config/KafkaProperties.java new file mode 100644 index 0000000..4866cb5 --- /dev/null +++ b/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/config/KafkaProperties.java @@ -0,0 +1,68 @@ +package com.mycompany.myapp.config; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PostConstruct; +import java.util.HashMap; +import java.util.Map; + +@Configuration +@ConfigurationProperties(prefix = "kafka") +public class KafkaProperties { + + @Value("${kafka.bootstrap.servers:localhost:9092}") + private String bootstrapServers; + + @Value("${kafka.polling.timeout:10000}") + private Integer pollingTimeout; + + private Map> consumer = new HashMap<>(); + + private Map> producer = new HashMap<>(); + + @PostConstruct + public void init() { + + for (String consumerKey: consumer.keySet()) { + final Map properties = consumer.get(consumerKey); + if (! properties.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + } + + for (String consumerKey: producer.keySet()) { + final Map properties = producer.get(consumerKey); + if (! properties.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + } + } + + public Map> getConsumer() { + return this.consumer; + } + + public void setConsumer(Map> consumer) { + this.consumer = consumer; + } + + public Map> getProducer() { + return this.producer; + } + + public void setProducer(Map> producer) { + this.producer = producer; + } + + public Integer getPollingTimeout() { + return pollingTimeout; + } + + public void setPollingTimeout(Integer pollingTimeout) { + this.pollingTimeout = pollingTimeout; + } +} diff --git a/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/GenericConsumer.java b/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/GenericConsumer.java index e907938..e2510ed 100644 --- a/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/GenericConsumer.java +++ b/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/GenericConsumer.java @@ -1,5 +1,6 @@ package com.mycompany.myapp.service.kafka; +import io.vavr.control.Either; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -14,15 +15,16 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -public abstract class GenericConsumer implements Runnable { +import com.mycompany.myapp.service.kafka.deserializer.DeserializationError; +public abstract class GenericConsumer implements Runnable { private final Logger log = LoggerFactory.getLogger(GenericConsumer.class); private final AtomicBoolean closed = new AtomicBoolean(false); - private final KafkaConsumer consumer; - private String topicName; + private final KafkaConsumer> consumer; + private final String topicName; private final int pollingTimeout; public GenericConsumer(final String topicName, final Map properties, final int pollingTimeout) { @@ -46,8 +48,8 @@ public void run() { try { consumer.subscribe(Collections.singleton(topicName)); while (!closed.get()) { - final ConsumerRecords records = consumer.poll(Duration.ofMillis(pollingTimeout)); - for (final ConsumerRecord record : records) { + final ConsumerRecords> records = consumer.poll(Duration.ofMillis(pollingTimeout)); + for (final ConsumerRecord> record : records) { handleMessage(record); } consumer.commitSync(); @@ -68,5 +70,5 @@ public void shutdown() { consumer.wakeup(); } - protected abstract void handleMessage(ConsumerRecord record); + protected abstract void handleMessage(ConsumerRecord> record); } diff --git a/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/consumer/FooConsumer.java b/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/consumer/FooConsumer.java index 948d42d..f4b1429 100644 --- a/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/consumer/FooConsumer.java +++ b/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/consumer/FooConsumer.java @@ -1,5 +1,6 @@ package com.mycompany.myapp.service.kafka.consumer; +import io.vavr.control.Either; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -10,6 +11,7 @@ import com.mycompany.myapp.config.KafkaProperties; import com.mycompany.myapp.domain.Foo; import com.mycompany.myapp.service.kafka.GenericConsumer; +import com.mycompany.myapp.service.kafka.deserializer.DeserializationError; @Service public class FooConsumer extends GenericConsumer { @@ -21,10 +23,15 @@ public FooConsumer(@Value("${kafka.consumer.foo.name}") final String topicName, } @Override - protected void handleMessage(final ConsumerRecord record) { - log.info("Handling message for Foo record: {}", record.value()); + protected void handleMessage(final ConsumerRecord> record) { + final Either value = record.value(); + if (value.isLeft()) { + log.error("Deserialization record failure: {}", value.getLeft()); + } else { + log.info("Handling message for Foo record: {}", value.get()); + } - // TODO: Here is where you can add specific code to handle your messages + // TODO: Here is where you can handle your messages } @Bean diff --git a/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/deserializer/DeserializationError.java b/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/deserializer/DeserializationError.java new file mode 100644 index 0000000..de367be --- /dev/null +++ b/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/deserializer/DeserializationError.java @@ -0,0 +1,37 @@ +package com.mycompany.myapp.service.kafka.deserializer; + +public class DeserializationError { + + private byte[] data; + + private Exception exception; + + public DeserializationError(byte[] data, Exception exception) { + this.data = data; + this.exception = exception; + } + + public byte[] getData() { + return data; + } + + public void setData(byte[] data) { + this.data = data; + } + + public Exception getException() { + return exception; + } + + public void setException(Exception exception) { + this.exception = exception; + } + + @Override + public String toString() { + return "DeserializationError{" + + "data='" + new String(data) + "'" + + ", exception=" + exception + + '}'; + } +} diff --git a/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/deserializer/FooDeserializer.java b/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/deserializer/FooDeserializer.java index fc3dddf..68f1ba0 100644 --- a/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/deserializer/FooDeserializer.java +++ b/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/deserializer/FooDeserializer.java @@ -8,22 +8,20 @@ import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; -import io.jsonwebtoken.io.SerializationException; +import io.vavr.control.Either; import org.apache.kafka.common.serialization.Deserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.mycompany.myapp.domain.Foo; import java.io.IOException; -import java.io.UnsupportedEncodingException; -public class FooDeserializer implements Deserializer { +public class FooDeserializer implements Deserializer> { private final Logger log = LoggerFactory.getLogger(FooDeserializer.class); - private String encoding = "UTF8"; - - private ObjectMapper objectMapper; + private final ObjectMapper objectMapper; public FooDeserializer() { this.objectMapper = @@ -38,18 +36,11 @@ public FooDeserializer() { } @Override - public Foo deserialize(final String topicName, final byte[] data) { - Foo foo = null; + public Either deserialize(final String topicName, final byte[] data) { try { - final String dataString = data == null ? null : new String(data, this.encoding); - foo = objectMapper.readValue(dataString, Foo.class); - } catch (final UnsupportedEncodingException var4) { - throw new SerializationException( - "Error when deserializing byte[] to string due to unsupported encoding " + this.encoding - ); + return Either.right(objectMapper.readValue(data, Foo.class)); } catch (final IOException e) { - log.error("Cannot read value from " + topicName + " topic", e); + return Either.left(new DeserializationError(data, e)); } - return foo; } } diff --git a/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/producer/FooProducer.java b/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/producer/FooProducer.java index b3203ea..33b691e 100644 --- a/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/producer/FooProducer.java +++ b/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/producer/FooProducer.java @@ -18,7 +18,7 @@ public class FooProducer { private final KafkaProducer kafkaProducer; - private String topicName; + private final String topicName; public FooProducer(@Value("${kafka.producer.foo.name}") final String topicName, final KafkaProperties kafkaProperties) { this.topicName = topicName; diff --git a/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/serializer/FooSerializer.java b/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/serializer/FooSerializer.java index 599c847..ded9346 100644 --- a/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/serializer/FooSerializer.java +++ b/test/templates/message-broker-with-entities-2nd-call/src/main/java/com/mycompany/myapp/service/kafka/serializer/FooSerializer.java @@ -11,6 +11,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.mycompany.myapp.domain.Foo; import java.io.ByteArrayOutputStream; @@ -20,9 +21,7 @@ public class FooSerializer implements Serializer { private final Logger log = LoggerFactory.getLogger(FooSerializer.class); - private String encoding = "UTF8"; - - private ObjectMapper objectMapper; + private final ObjectMapper objectMapper; public FooSerializer() { this.objectMapper =