Skip to content

Commit

Permalink
feat: Allow message handling of un-deserializable data
Browse files Browse the repository at this point in the history
Co-authored-by: François Delbrayelle <fdelbrayelle@gmail.com>
  • Loading branch information
ctamisier and fdelbrayelle committed May 31, 2020
1 parent ea21510 commit 28fe9f1
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 25 deletions.
12 changes: 12 additions & 0 deletions generators/app/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ module.exports = class extends BaseGenerator {
}

writing() {
const vavrVersion = '0.10.3';
this.addMavenProperty('vavr.version', vavrVersion);
this.addGradleProperty('vavr_version', vavrVersion);
this.addMavenDependency('io.vavr', 'vavr', '${vavr.version}'); // eslint-disable-line no-template-curly-in-string
this.addGradleDependency('implementation', 'io.vavr', 'vavr', '${vavr_version}'); // eslint-disable-line no-template-curly-in-string

// function generate kafka application properties
this.generateKafkaProperties = function(enabled) {
this.enabled = enabled;
Expand Down Expand Up @@ -195,6 +201,12 @@ module.exports = class extends BaseGenerator {
null,
null
);
this.template(
'src/main/java/package/service/kafka/deserializer/DeserializerError.java.ejs',
`${javaDir}service/kafka/deserializer/DeserializerError.java`,
null,
null
);
}

if (this.mustGenerateComponent(entity, 'producer')) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
-%>
package <%= packageName %>.service.kafka;

import <%= packageName %>.service.kafka.deserializer.DeserializerError;
import io.vavr.control.Either;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -34,12 +36,11 @@ import java.util.concurrent.atomic.AtomicBoolean;

public abstract class GenericConsumer<T> implements Runnable {


private final Logger log = LoggerFactory.getLogger(GenericConsumer.class);

private final AtomicBoolean closed = new AtomicBoolean(false);

private final KafkaConsumer<String, T> consumer;
private final KafkaConsumer<String, Either<DeserializerError, T>> consumer;
private String topicName;
private final int pollingTimeout;

Expand All @@ -64,8 +65,8 @@ public abstract class GenericConsumer<T> implements Runnable {
try {
consumer.subscribe(Collections.singleton(topicName));
while (!closed.get()) {
final ConsumerRecords<String, T> records = consumer.poll(Duration.ofMillis(pollingTimeout));
for (final ConsumerRecord<String, T> record : records) {
final ConsumerRecords<String, Either<DeserializerError, T>> records = consumer.poll(Duration.ofMillis(pollingTimeout));
for (final ConsumerRecord<String, Either<DeserializerError, T>> record : records) {
handleMessage(record);
}
consumer.commitSync();
Expand All @@ -86,5 +87,5 @@ public abstract class GenericConsumer<T> implements Runnable {
consumer.wakeup();
}

protected abstract void handleMessage(ConsumerRecord<String, T> record);
protected abstract void handleMessage(ConsumerRecord<String, Either<DeserializerError, T>> record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
-%>
package <%= packageName %>.service.kafka.consumer;

import <%= packageName %>.service.kafka.deserializer.DeserializerError;
import io.vavr.control.Either;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,10 +41,14 @@ public class <%= entityClass %>Consumer extends GenericConsumer<<%= entityClass
}

@Override
protected void handleMessage(final ConsumerRecord<String, <%= entityClass %>> record) {
log.info("Handling message for <%= entityClass %> record: {}", record.value());

// TODO: Here is where you can add specific code to handle your messages
protected void handleMessage(final ConsumerRecord<String, Either<DeserializerError, <%= entityClass %>>> record) {
Either<DeserializerError, <%= entityClass %>> value = record.value();
if (value.isLeft()) {
log.error("Deserialization record failure: {}", value.getLeft());
} else {
log.info("Handling message for Foo record: {}", value.get());
}
// TODO: Here is where you can handle your messages
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package <%= packageName %>.service.kafka.deserializer;

public class DeserializerError {

private byte[] data;

private Exception exception;

public DeserializerError(byte[] data, Exception exception) {
this.data = data;
this.exception = exception;
}

public byte[] getData() {
return data;
}

public void setData(byte[] data) {
this.data = data;
}

public Exception getException() {
return exception;
}

public void setException(Exception exception) {
this.exception = exception;
}

@Override
public String toString() {
return "DeserializerError{" +
"data='" + new String(data) + "'" +
", exception=" + exception +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,18 @@ import com.fasterxml.jackson.databind.util.StdDateFormat;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import io.jsonwebtoken.io.SerializationException;
import io.vavr.control.Either;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import <%= packageName %>.domain.<%= entityClass %>;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

public class <%= entityClass %>Deserializer implements Deserializer<<%= entityClass %>> {
public class <%= entityClass %>Deserializer implements Deserializer<Either<DeserializerError, <%= entityClass %>>> {

private final Logger log = LoggerFactory.getLogger(<%= entityClass %>Deserializer.class);

private String encoding = "UTF8";

private ObjectMapper objectMapper;

public <%= entityClass %>Deserializer() {
Expand All @@ -56,18 +53,11 @@ public class <%= entityClass %>Deserializer implements Deserializer<<%= entityCl
}

@Override
public <%= entityClass %> deserialize(final String topicName, final byte[] data) {
<%= entityClass %> <%= camelCaseEntityClass %> = null;
public Either<DeserializerError, <%= entityClass %>> deserialize(final String topicName, final byte[] data) {
try {
final String dataString = data == null ? null : new String(data, this.encoding);
<%= camelCaseEntityClass %> = objectMapper.readValue(dataString, <%= entityClass %>.class);
} catch (final UnsupportedEncodingException var4) {
throw new SerializationException(
"Error when deserializing byte[] to string due to unsupported encoding " + this.encoding
);
} catch (final IOException e) {
log.error("Cannot read value from " + topicName + " topic", e);
return Either.right(objectMapper.readValue(data, <%= entityClass %>.class));
} catch (IOException e) {
return Either.left(new DeserializerError(data, e));
}
return <%= camelCaseEntityClass %>;
}
}

0 comments on commit 28fe9f1

Please sign in to comment.