Skip to content

Commit

Permalink
fixes #44 add kafka ConsumerRecord header to the response
Browse files Browse the repository at this point in the history
  • Loading branch information
stevehu committed Apr 11, 2021
1 parent ef90d8f commit 3af41f0
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public ConsumerRecordAndSize<ByteString, ByteString> createConsumerRecord(
record.topic(),
record.key() != null ? ByteString.copyFrom(record.key()) : null,
record.value() != null ? ByteString.copyFrom(record.value()) : null,
record.headers() != null ? convertHeaders(record.headers()) : null,
record.partition(),
record.offset()),
approxSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ public ConsumerRecordAndSize<Object, Object> createConsumerRecord(

return new ConsumerRecordAndSize<>(
com.networknt.kafka.entity.ConsumerRecord.create(
record.topic(), key, value, record.partition(), record.offset()), approxSize);
record.topic(), key, value,
record.headers() != null ? convertHeaders(record.headers()) : null,
record.partition(), record.offset()), approxSize);
}

private Object deserialize(byte[] data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import javax.ws.rs.InternalServerErrorException;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -422,5 +425,15 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
}

protected Map<String, String> convertHeaders(Headers headers) {
Map<String, String> headerMap = new HashMap<>();
Iterator<Header> headerIterator = headers.iterator();
while(headerIterator.hasNext()) {
Header header = headerIterator.next();
headerMap.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
}
return headerMap;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public ConsumerRecordAndSize<JsonNode, JsonNode> createConsumerRecord(
record.topic(),
keyNode.getJson(),
valueNode.getJson(),
record.headers() != null ? convertHeaders(record.headers()) : null,
record.partition(),
record.offset()),
keyNode.getSize() + valueNode.getSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,75 +18,64 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.protobuf.ByteString;
import com.networknt.config.JsonMapper;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.PositiveOrZero;
import java.util.Arrays;
import java.util.Base64;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.*;

public final class BinaryConsumerRecord {

@NotNull
@Nullable
private final String topic;

@Nullable
private final byte[] key;

@Nullable
private final byte[] value;

@PositiveOrZero
@Nullable
private final Map<String, String> headers;

private final Integer partition;

@PositiveOrZero
@Nullable
private final Long offset;

@JsonCreator
private BinaryConsumerRecord(
@JsonProperty("topic") @Nullable String topic,
@JsonProperty("key") @Nullable byte[] key,
@JsonProperty("value") @Nullable byte[] value,
@JsonProperty("partition") @Nullable Integer partition,
@JsonProperty("offset") @Nullable Long offset) {
@JsonProperty("topic") String topic,
@JsonProperty("key") byte[] key,
@JsonProperty("value") byte[] value,
@JsonProperty("headers") Map<String, String> headers,
@JsonProperty("partition") Integer partition,
@JsonProperty("offset") Long offset) {
this.topic = topic;
this.key = key;
this.value = value;
this.headers = headers;
this.partition = partition;
this.offset = offset;
}

@JsonProperty
@Nullable
public String getTopic() {
return topic;
}

@JsonProperty
@Nullable
public String getKey() {
return key != null ? new String(Base64.getEncoder().encode(key)) : null;
}

@JsonProperty
@Nullable
public String getValue() {
return value != null ? new String(Base64.getEncoder().encode(value)) : null;
}

@JsonProperty
@Nullable
public Map<String, String> getHeaders() { return headers; }

@JsonProperty
public Integer getPartition() {
return partition;
}

@JsonProperty
@Nullable
public Long getOffset() {
return offset;
}
Expand All @@ -103,6 +92,7 @@ public static BinaryConsumerRecord fromConsumerRecord(
Objects.requireNonNull(record.getTopic()),
record.getKey() != null ? record.getKey().toByteArray() : null,
record.getValue() != null ? record.getValue().toByteArray() : null,
record.getHeaders(),
record.getPartition(),
record.getOffset());
}
Expand All @@ -121,6 +111,7 @@ public ConsumerRecord<ByteString, ByteString> toConsumerRecord() {
topic,
key != null ? ByteString.copyFrom(key) : null,
value != null ? ByteString.copyFrom(value) : null,
headers,
partition,
offset);
}
Expand All @@ -137,13 +128,14 @@ public boolean equals(Object o) {
return Objects.equals(topic, that.topic)
&& Arrays.equals(key, that.key)
&& Arrays.equals(value, that.value)
&& Objects.equals(headers, that.headers)
&& Objects.equals(partition, that.partition)
&& Objects.equals(offset, that.offset);
}

@Override
public int hashCode() {
int result = Objects.hash(topic, partition, offset);
int result = Objects.hash(topic, headers, partition, offset);
result = 31 * result + Arrays.hashCode(key);
result = 31 * result + Arrays.hashCode(value);
return result;
Expand All @@ -155,6 +147,7 @@ public String toString() {
.add("topic='" + topic + "'")
.add("key=" + Arrays.toString(key))
.add("value=" + Arrays.toString(value))
.add("headers" + JsonMapper.toJson(headers))
.add("partition=" + partition)
.add("offset=" + offset)
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,25 @@

package com.networknt.kafka.entity;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;

public class ConsumerRecord<K, V> {
String topic;
K key;
V value;
int partition;
long offset;
Map<String, String> headers;

ConsumerRecord() {
}

public ConsumerRecord(String topic, K key, V value, int partition, long offset) {
public ConsumerRecord(String topic, K key, V value, Map<String, String> headers, int partition, long offset) {
this.topic = topic;
this.key = key;
this.value = value;
this.headers = headers;
this.partition = partition;
this.offset = offset;
}
Expand All @@ -39,16 +42,16 @@ public String getTopic() {
return topic;
}

@Nullable
public K getKey() {
return key;
}

@Nullable
public V getValue() {
return value;
}

public Map<String, String> getHeaders() { return headers; }

public int getPartition() {
return partition;
}
Expand All @@ -58,7 +61,7 @@ public long getOffset() {
}

public static <K, V> ConsumerRecord<K, V> create(
String topic, @Nullable K key, @Nullable V value, int partition, long offset) {
return new ConsumerRecord<>(topic, key, value, partition, offset);
String topic, K key, V value, Map<String, String> headers, int partition, long offset) {
return new ConsumerRecord<>(topic, key, value, headers, partition, offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,73 +17,66 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.networknt.config.JsonMapper;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.PositiveOrZero;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;

public final class JsonConsumerRecord {

@NotNull
@Nullable
private final String topic;

@Nullable
private final Object key;

@Nullable
private final Object value;

@PositiveOrZero
@Nullable
private final Map<String, String> headers;

private final Integer partition;

@PositiveOrZero
@Nullable
private final Long offset;

@JsonCreator
private JsonConsumerRecord(
@JsonProperty("topic") @Nullable String topic,
@JsonProperty("key") @Nullable Object key,
@JsonProperty("value") @Nullable Object value,
@JsonProperty("partition") @Nullable Integer partition,
@JsonProperty("offset") @Nullable Long offset) {
@JsonProperty("topic") String topic,
@JsonProperty("key") Object key,
@JsonProperty("value") Object value,
@JsonProperty("headers") Map<String, String> headers,
@JsonProperty("partition") Integer partition,
@JsonProperty("offset") Long offset) {
this.topic = topic;
this.key = key;
this.value = value;
this.headers = headers;
this.partition = partition;
this.offset = offset;
}

@JsonProperty
@Nullable
public String getTopic() {
return topic;
}

@JsonProperty
@Nullable
public Object getKey() {
return key;
}

@JsonProperty
@Nullable
public Object getValue() {
return value;
}

@JsonProperty
@Nullable
public Map<String, String> getHeaders() { return headers; }

@JsonProperty
public Integer getPartition() {
return partition;
}

@JsonProperty
@Nullable
public Long getOffset() {
return offset;
}
Expand All @@ -99,6 +92,7 @@ public static JsonConsumerRecord fromConsumerRecord(ConsumerRecord<Object, Objec
Objects.requireNonNull(record.getTopic()),
record.getKey(),
record.getValue(),
record.getHeaders(),
record.getPartition(),
record.getOffset());
}
Expand All @@ -113,7 +107,7 @@ public ConsumerRecord<Object, Object> toConsumerRecord() {
if (offset == null || offset < 0) {
throw new IllegalStateException();
}
return ConsumerRecord.create(topic, key, value, partition, offset);
return ConsumerRecord.create(topic, key, value, headers, partition, offset);
}

@Override
Expand All @@ -128,13 +122,14 @@ public boolean equals(Object o) {
return Objects.equals(topic, that.topic)
&& Objects.equals(key, that.key)
&& Objects.equals(value, that.value)
&& Objects.equals(headers, that.headers)
&& Objects.equals(partition, that.partition)
&& Objects.equals(offset, that.offset);
}

@Override
public int hashCode() {
return Objects.hash(topic, key, value, partition, offset);
return Objects.hash(topic, key, value, headers, partition, offset);
}

@Override
Expand All @@ -143,6 +138,7 @@ public String toString() {
.add("topic='" + topic + "'")
.add("key=" + key)
.add("value=" + value)
.add("headers=" + JsonMapper.toJson(headers))
.add("partition=" + partition)
.add("offset=" + offset)
.toString();
Expand Down
Loading

0 comments on commit 3af41f0

Please sign in to comment.