From 28fe9f1e9f7402672eb26a7bce786d3734c1c8b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Tamisier?= Date: Sat, 30 May 2020 18:40:16 +0200 Subject: [PATCH] feat: Allow message handling of un-deserializable data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: François Delbrayelle --- generators/app/index.js | 12 ++++++ .../service/kafka/GenericConsumer.java.ejs | 11 +++--- .../kafka/consumer/EntityConsumer.java.ejs | 14 +++++-- .../deserializer/DeserializerError.java.ejs | 37 +++++++++++++++++++ .../deserializer/EntityDeserializer.java.ejs | 22 +++-------- 5 files changed, 71 insertions(+), 25 deletions(-) create mode 100644 generators/app/templates/src/main/java/package/service/kafka/deserializer/DeserializerError.java.ejs diff --git a/generators/app/index.js b/generators/app/index.js index ef7ed19..450b956 100644 --- a/generators/app/index.js +++ b/generators/app/index.js @@ -91,6 +91,12 @@ module.exports = class extends BaseGenerator { } writing() { + const vavrVersion = '0.10.3'; + this.addMavenProperty('vavr.version', vavrVersion); + this.addGradleProperty('vavr_version', vavrVersion); + this.addMavenDependency('io.vavr', 'vavr', '${vavr.version}'); // eslint-disable-line no-template-curly-in-string + this.addGradleDependency('implementation', 'io.vavr', 'vavr', '${vavr_version}'); // eslint-disable-line no-template-curly-in-string + // function generate kafka application properties this.generateKafkaProperties = function(enabled) { this.enabled = enabled; @@ -195,6 +201,12 @@ module.exports = class extends BaseGenerator { null, null ); + this.template( + 'src/main/java/package/service/kafka/deserializer/DeserializerError.java.ejs', + `${javaDir}service/kafka/deserializer/DeserializerError.java`, + null, + null + ); } if (this.mustGenerateComponent(entity, 'producer')) { diff --git a/generators/app/templates/src/main/java/package/service/kafka/GenericConsumer.java.ejs b/generators/app/templates/src/main/java/package/service/kafka/GenericConsumer.java.ejs index 134b5f6..a173514 100644 --- a/generators/app/templates/src/main/java/package/service/kafka/GenericConsumer.java.ejs +++ b/generators/app/templates/src/main/java/package/service/kafka/GenericConsumer.java.ejs @@ -18,6 +18,8 @@ -%> package <%= packageName %>.service.kafka; +import <%= packageName %>.service.kafka.deserializer.DeserializerError; +import io.vavr.control.Either; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -34,12 +36,11 @@ import java.util.concurrent.atomic.AtomicBoolean; public abstract class GenericConsumer implements Runnable { - private final Logger log = LoggerFactory.getLogger(GenericConsumer.class); private final AtomicBoolean closed = new AtomicBoolean(false); - private final KafkaConsumer consumer; + private final KafkaConsumer> consumer; private String topicName; private final int pollingTimeout; @@ -64,8 +65,8 @@ public abstract class GenericConsumer implements Runnable { try { consumer.subscribe(Collections.singleton(topicName)); while (!closed.get()) { - final ConsumerRecords records = consumer.poll(Duration.ofMillis(pollingTimeout)); - for (final ConsumerRecord record : records) { + final ConsumerRecords> records = consumer.poll(Duration.ofMillis(pollingTimeout)); + for (final ConsumerRecord> record : records) { handleMessage(record); } consumer.commitSync(); @@ -86,5 +87,5 @@ public abstract class GenericConsumer implements Runnable { consumer.wakeup(); } - protected abstract void handleMessage(ConsumerRecord record); + protected abstract void handleMessage(ConsumerRecord> record); } diff --git a/generators/app/templates/src/main/java/package/service/kafka/consumer/EntityConsumer.java.ejs b/generators/app/templates/src/main/java/package/service/kafka/consumer/EntityConsumer.java.ejs index 7b2277e..1d0ad28 100644 --- a/generators/app/templates/src/main/java/package/service/kafka/consumer/EntityConsumer.java.ejs +++ b/generators/app/templates/src/main/java/package/service/kafka/consumer/EntityConsumer.java.ejs @@ -18,6 +18,8 @@ -%> package <%= packageName %>.service.kafka.consumer; +import <%= packageName %>.service.kafka.deserializer.DeserializerError; +import io.vavr.control.Either; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,10 +41,14 @@ public class <%= entityClass %>Consumer extends GenericConsumer<<%= entityClass } @Override - protected void handleMessage(final ConsumerRecord> record) { - log.info("Handling message for <%= entityClass %> record: {}", record.value()); - - // TODO: Here is where you can add specific code to handle your messages + protected void handleMessage(final ConsumerRecord>> record) { + Either> value = record.value(); + if (value.isLeft()) { + log.error("Deserialization record failure: {}", value.getLeft()); + } else { + log.info("Handling message for Foo record: {}", value.get()); + } + // TODO: Here is where you can handle your messages } @Bean diff --git a/generators/app/templates/src/main/java/package/service/kafka/deserializer/DeserializerError.java.ejs b/generators/app/templates/src/main/java/package/service/kafka/deserializer/DeserializerError.java.ejs new file mode 100644 index 0000000..b0d3e54 --- /dev/null +++ b/generators/app/templates/src/main/java/package/service/kafka/deserializer/DeserializerError.java.ejs @@ -0,0 +1,37 @@ +package <%= packageName %>.service.kafka.deserializer; + +public class DeserializerError { + + private byte[] data; + + private Exception exception; + + public DeserializerError(byte[] data, Exception exception) { + this.data = data; + this.exception = exception; + } + + public byte[] getData() { + return data; + } + + public void setData(byte[] data) { + this.data = data; + } + + public Exception getException() { + return exception; + } + + public void setException(Exception exception) { + this.exception = exception; + } + + @Override + public String toString() { + return "DeserializerError{" + + "data='" + new String(data) + "'" + + ", exception=" + exception + + '}'; + } +} diff --git a/generators/app/templates/src/main/java/package/service/kafka/deserializer/EntityDeserializer.java.ejs b/generators/app/templates/src/main/java/package/service/kafka/deserializer/EntityDeserializer.java.ejs index 0519a31..029016a 100644 --- a/generators/app/templates/src/main/java/package/service/kafka/deserializer/EntityDeserializer.java.ejs +++ b/generators/app/templates/src/main/java/package/service/kafka/deserializer/EntityDeserializer.java.ejs @@ -26,21 +26,18 @@ import com.fasterxml.jackson.databind.util.StdDateFormat; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; -import io.jsonwebtoken.io.SerializationException; +import io.vavr.control.Either; import org.apache.kafka.common.serialization.Deserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import <%= packageName %>.domain.<%= entityClass %>; import java.io.IOException; -import java.io.UnsupportedEncodingException; -public class <%= entityClass %>Deserializer implements Deserializer<<%= entityClass %>> { +public class <%= entityClass %>Deserializer implements Deserializer>> { private final Logger log = LoggerFactory.getLogger(<%= entityClass %>Deserializer.class); - private String encoding = "UTF8"; - private ObjectMapper objectMapper; public <%= entityClass %>Deserializer() { @@ -56,18 +53,11 @@ public class <%= entityClass %>Deserializer implements Deserializer<<%= entityCl } @Override - public <%= entityClass %> deserialize(final String topicName, final byte[] data) { - <%= entityClass %> <%= camelCaseEntityClass %> = null; + public Either> deserialize(final String topicName, final byte[] data) { try { - final String dataString = data == null ? null : new String(data, this.encoding); - <%= camelCaseEntityClass %> = objectMapper.readValue(dataString, <%= entityClass %>.class); - } catch (final UnsupportedEncodingException var4) { - throw new SerializationException( - "Error when deserializing byte[] to string due to unsupported encoding " + this.encoding - ); - } catch (final IOException e) { - log.error("Cannot read value from " + topicName + " topic", e); + return Either.right(objectMapper.readValue(data, <%= entityClass %>.class)); + } catch (IOException e) { + return Either.left(new DeserializerError(data, e)); } - return <%= camelCaseEntityClass %>; } }