Skip to content

Commit

Permalink
feat: Allow message handling of un-deserializable data
Browse files Browse the repository at this point in the history
Co-authored-by: François Delbrayelle <fdelbrayelle@gmail.com>
  • Loading branch information
ctamisier and fdelbrayelle committed Jun 1, 2020
1 parent 101f743 commit 9933831
Show file tree
Hide file tree
Showing 15 changed files with 224 additions and 59 deletions.
12 changes: 12 additions & 0 deletions generators/app/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ module.exports = class extends BaseGenerator {
}

writing() {
const vavrVersion = '0.10.3';
this.addMavenProperty('vavr.version', vavrVersion);
this.addGradleProperty('vavr_version', vavrVersion);
this.addMavenDependency('io.vavr', 'vavr', '${vavr.version}'); // eslint-disable-line no-template-curly-in-string
this.addGradleDependency('implementation', 'io.vavr', 'vavr', '${vavr_version}'); // eslint-disable-line no-template-curly-in-string

// function generate kafka application properties
this.generateKafkaProperties = function(enabled) {
this.enabled = enabled;
Expand Down Expand Up @@ -195,6 +201,12 @@ module.exports = class extends BaseGenerator {
null,
null
);
this.template(
'src/main/java/package/service/kafka/deserializer/DeserializationError.java.ejs',
`${javaDir}service/kafka/deserializer/DeserializationError.java`,
null,
null
);
}

if (this.mustGenerateComponent(entity, 'producer')) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
-%>
package <%= packageName %>.service.kafka;

import io.vavr.control.Either;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -32,15 +33,16 @@ import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class GenericConsumer<T> implements Runnable {
import <%= packageName %>.service.kafka.deserializer.DeserializationError;

public abstract class GenericConsumer<T> implements Runnable {

private final Logger log = LoggerFactory.getLogger(GenericConsumer.class);

private final AtomicBoolean closed = new AtomicBoolean(false);

private final KafkaConsumer<String, T> consumer;
private String topicName;
private final KafkaConsumer<String, Either<DeserializationError, T>> consumer;
private final String topicName;
private final int pollingTimeout;

public GenericConsumer(final String topicName, final Map<String, Object> properties, final int pollingTimeout) {
Expand All @@ -64,8 +66,8 @@ public abstract class GenericConsumer<T> implements Runnable {
try {
consumer.subscribe(Collections.singleton(topicName));
while (!closed.get()) {
final ConsumerRecords<String, T> records = consumer.poll(Duration.ofMillis(pollingTimeout));
for (final ConsumerRecord<String, T> record : records) {
final ConsumerRecords<String, Either<DeserializationError, T>> records = consumer.poll(Duration.ofMillis(pollingTimeout));
for (final ConsumerRecord<String, Either<DeserializationError, T>> record : records) {
handleMessage(record);
}
consumer.commitSync();
Expand All @@ -86,5 +88,5 @@ public abstract class GenericConsumer<T> implements Runnable {
consumer.wakeup();
}

protected abstract void handleMessage(ConsumerRecord<String, T> record);
protected abstract void handleMessage(ConsumerRecord<String, Either<DeserializationError, T>> record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
-%>
package <%= packageName %>.service.kafka.consumer;

import io.vavr.control.Either;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -28,6 +29,7 @@ import org.springframework.stereotype.Service;
import <%= packageName %>.config.KafkaProperties;
import <%= packageName %>.domain.<%= entityClass %>;
import <%= packageName %>.service.kafka.GenericConsumer;
import <%= packageName %>.service.kafka.deserializer.DeserializationError;

@Service
public class <%= entityClass %>Consumer extends GenericConsumer<<%= entityClass %>> {
Expand All @@ -39,10 +41,15 @@ public class <%= entityClass %>Consumer extends GenericConsumer<<%= entityClass
}

@Override
protected void handleMessage(final ConsumerRecord<String, <%= entityClass %>> record) {
log.info("Handling message for <%= entityClass %> record: {}", record.value());

// TODO: Here is where you can add specific code to handle your messages
protected void handleMessage(final ConsumerRecord<String, Either<DeserializationError, <%= entityClass %>>> record) {
final Either<DeserializationError, <%= entityClass %>> value = record.value();
if (value.isLeft()) {
log.error("Deserialization record failure: {}", value.getLeft());
} else {
log.info("Handling message for <%= entityClass %> record: {}", value.get());
}

// TODO: Here is where you can handle your messages
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package <%= packageName %>.service.kafka.deserializer;

public class DeserializationError {

private byte[] data;

private Exception exception;

public DeserializationError(byte[] data, Exception exception) {
this.data = data;
this.exception = exception;
}

public byte[] getData() {
return data;
}

public void setData(byte[] data) {
this.data = data;
}

public Exception getException() {
return exception;
}

public void setException(Exception exception) {
this.exception = exception;
}

@Override
public String toString() {
return "DeserializationError{" +
"data='" + new String(data) + "'" +
", exception=" + exception +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,20 @@ import com.fasterxml.jackson.databind.util.StdDateFormat;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import io.jsonwebtoken.io.SerializationException;
import io.vavr.control.Either;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import <%= packageName %>.domain.<%= entityClass %>;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

public class <%= entityClass %>Deserializer implements Deserializer<<%= entityClass %>> {
public class <%= entityClass %>Deserializer implements Deserializer<Either<DeserializationError, <%= entityClass %>>> {

private final Logger log = LoggerFactory.getLogger(<%= entityClass %>Deserializer.class);

private String encoding = "UTF8";

private ObjectMapper objectMapper;
private final ObjectMapper objectMapper;

public <%= entityClass %>Deserializer() {
this.objectMapper =
Expand All @@ -56,18 +54,11 @@ public class <%= entityClass %>Deserializer implements Deserializer<<%= entityCl
}

@Override
public <%= entityClass %> deserialize(final String topicName, final byte[] data) {
<%= entityClass %> <%= camelCaseEntityClass %> = null;
public Either<DeserializationError, <%= entityClass %>> deserialize(final String topicName, final byte[] data) {
try {
final String dataString = data == null ? null : new String(data, this.encoding);
<%= camelCaseEntityClass %> = objectMapper.readValue(dataString, <%= entityClass %>.class);
} catch (final UnsupportedEncodingException var4) {
throw new SerializationException(
"Error when deserializing byte[] to string due to unsupported encoding " + this.encoding
);
return Either.right(objectMapper.readValue(data, <%= entityClass %>.class));
} catch (final IOException e) {
log.error("Cannot read value from " + topicName + " topic", e);
return Either.left(new DeserializationError(data, e));
}
return <%= camelCaseEntityClass %>;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class <%= entityClass %>Producer {

private final KafkaProducer<String, <%= entityClass %>> kafkaProducer;

private String topicName;
private final String topicName;

public <%= entityClass %>Producer(@Value("${kafka.producer.<%= camelCaseEntityClass %>.name}") final String topicName, final KafkaProperties kafkaProperties) {
this.topicName = topicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import <%= packageName %>.domain.<%= entityClass %>;

import java.io.ByteArrayOutputStream;
Expand All @@ -38,9 +39,7 @@ public class <%= entityClass %>Serializer implements Serializer<<%= entityClass

private final Logger log = LoggerFactory.getLogger(<%= entityClass %>Serializer.class);

private String encoding = "UTF8";

private ObjectMapper objectMapper;
private final ObjectMapper objectMapper;

public <%= entityClass %>Serializer() {
this.objectMapper =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: '2'
services:
akhq:
image: tchiotludo/akhq
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "kafka:29092"
ports:
- 11817:8080
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.mycompany.myapp.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;

@Configuration
@ConfigurationProperties(prefix = "kafka")
public class KafkaProperties {

@Value("${kafka.bootstrap.servers:localhost:9092}")
private String bootstrapServers;

@Value("${kafka.polling.timeout:10000}")
private Integer pollingTimeout;

private Map<String, Map<String, Object>> consumer = new HashMap<>();

private Map<String, Map<String, Object>> producer = new HashMap<>();

@PostConstruct
public void init() {

for (String consumerKey: consumer.keySet()) {
final Map<String, Object> properties = consumer.get(consumerKey);
if (! properties.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
}
}

for (String consumerKey: producer.keySet()) {
final Map<String, Object> properties = producer.get(consumerKey);
if (! properties.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
}
}
}

public Map<String, Map<String, Object>> getConsumer() {
return this.consumer;
}

public void setConsumer(Map<String, Map<String, Object>> consumer) {
this.consumer = consumer;
}

public Map<String, Map<String, Object>> getProducer() {
return this.producer;
}

public void setProducer(Map<String, Map<String, Object>> producer) {
this.producer = producer;
}

public Integer getPollingTimeout() {
return pollingTimeout;
}

public void setPollingTimeout(Integer pollingTimeout) {
this.pollingTimeout = pollingTimeout;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.mycompany.myapp.service.kafka;

import io.vavr.control.Either;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -14,15 +15,16 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class GenericConsumer<T> implements Runnable {
import com.mycompany.myapp.service.kafka.deserializer.DeserializationError;

public abstract class GenericConsumer<T> implements Runnable {

private final Logger log = LoggerFactory.getLogger(GenericConsumer.class);

private final AtomicBoolean closed = new AtomicBoolean(false);

private final KafkaConsumer<String, T> consumer;
private String topicName;
private final KafkaConsumer<String, Either<DeserializationError, T>> consumer;
private final String topicName;
private final int pollingTimeout;

public GenericConsumer(final String topicName, final Map<String, Object> properties, final int pollingTimeout) {
Expand All @@ -46,8 +48,8 @@ public void run() {
try {
consumer.subscribe(Collections.singleton(topicName));
while (!closed.get()) {
final ConsumerRecords<String, T> records = consumer.poll(Duration.ofMillis(pollingTimeout));
for (final ConsumerRecord<String, T> record : records) {
final ConsumerRecords<String, Either<DeserializationError, T>> records = consumer.poll(Duration.ofMillis(pollingTimeout));
for (final ConsumerRecord<String, Either<DeserializationError, T>> record : records) {
handleMessage(record);
}
consumer.commitSync();
Expand All @@ -68,5 +70,5 @@ public void shutdown() {
consumer.wakeup();
}

protected abstract void handleMessage(ConsumerRecord<String, T> record);
protected abstract void handleMessage(ConsumerRecord<String, Either<DeserializationError, T>> record);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.mycompany.myapp.service.kafka.consumer;

import io.vavr.control.Either;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -10,6 +11,7 @@
import com.mycompany.myapp.config.KafkaProperties;
import com.mycompany.myapp.domain.Foo;
import com.mycompany.myapp.service.kafka.GenericConsumer;
import com.mycompany.myapp.service.kafka.deserializer.DeserializationError;

@Service
public class FooConsumer extends GenericConsumer<Foo> {
Expand All @@ -21,10 +23,15 @@ public FooConsumer(@Value("${kafka.consumer.foo.name}") final String topicName,
}

@Override
protected void handleMessage(final ConsumerRecord<String, Foo> record) {
log.info("Handling message for Foo record: {}", record.value());
protected void handleMessage(final ConsumerRecord<String, Either<DeserializationError, Foo>> record) {
final Either<DeserializationError, Foo> value = record.value();
if (value.isLeft()) {
log.error("Deserialization record failure: {}", value.getLeft());
} else {
log.info("Handling message for Foo record: {}", value.get());
}

// TODO: Here is where you can add specific code to handle your messages
// TODO: Here is where you can handle your messages
}

@Bean
Expand Down
Loading

0 comments on commit 9933831

Please sign in to comment.