From b0dadb3d1601517b304804ea36dc35cda09fb9dd Mon Sep 17 00:00:00 2001 From: Gavin Chen Date: Fri, 3 Sep 2021 10:39:23 -0400 Subject: [PATCH] model and config change for kafka-sidecar issue #19 implemetation --- .../kafka/common/KafkaConsumerConfig.java | 9 ++ .../kafka/consumer/KafkaConsumerManager.java | 13 +++ .../entity/DeadLetterQueueReplayResponse.java | 105 ++++++++++++++++++ .../kafka/entity/RecordProcessedResult.java | 10 ++ 4 files changed, 137 insertions(+) create mode 100644 kafka-entity/src/main/java/com/networknt/kafka/entity/DeadLetterQueueReplayResponse.java diff --git a/kafka-common/src/main/java/com/networknt/kafka/common/KafkaConsumerConfig.java b/kafka-common/src/main/java/com/networknt/kafka/common/KafkaConsumerConfig.java index 193bef9..2e77294 100644 --- a/kafka-common/src/main/java/com/networknt/kafka/common/KafkaConsumerConfig.java +++ b/kafka-common/src/main/java/com/networknt/kafka/common/KafkaConsumerConfig.java @@ -22,6 +22,7 @@ public class KafkaConsumerConfig { private String backendApiPath; private boolean deadLetterEnabled; private String deadLetterTopicExt; + private int deadLetterMessageRetry; private boolean auditEnabled; private String auditTarget; private String auditTopic; @@ -200,4 +201,12 @@ public boolean isUseNoWrappingAvro() { public void setUseNoWrappingAvro(boolean useNoWrappingAvro) { this.useNoWrappingAvro = useNoWrappingAvro; } + + public int getDeadLetterMessageRetry() { + return deadLetterMessageRetry; + } + + public void setDeadLetterMessageRetry(int deadLetterMessageRetry) { + this.deadLetterMessageRetry = deadLetterMessageRetry; + } } diff --git a/kafka-consumer/src/main/java/com/networknt/kafka/consumer/KafkaConsumerManager.java b/kafka-consumer/src/main/java/com/networknt/kafka/consumer/KafkaConsumerManager.java index 09d69d2..2e474fa 100644 --- a/kafka-consumer/src/main/java/com/networknt/kafka/consumer/KafkaConsumerManager.java +++ b/kafka-consumer/src/main/java/com/networknt/kafka/consumer/KafkaConsumerManager.java @@ -650,6 +650,19 @@ public void shutdown() { } } + public synchronized KafkaConsumerState getExistingConsumerInstance( + String group, + String instance + ) { + ConsumerInstanceId id = new ConsumerInstanceId(group, instance); + final KafkaConsumerState state = consumers.get(id); + if (state != null) { + state.updateExpiration(); + } + return state; + } + + /** * Gets the specified consumer instance or throws a not found exception. Also removes the * consumer's expiration timeout so it is not cleaned up mid-operation. diff --git a/kafka-entity/src/main/java/com/networknt/kafka/entity/DeadLetterQueueReplayResponse.java b/kafka-entity/src/main/java/com/networknt/kafka/entity/DeadLetterQueueReplayResponse.java new file mode 100644 index 0000000..e08953e --- /dev/null +++ b/kafka-entity/src/main/java/com/networknt/kafka/entity/DeadLetterQueueReplayResponse.java @@ -0,0 +1,105 @@ +package com.networknt.kafka.entity; + +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class DeadLetterQueueReplayResponse { + + private String description; + private Long records; + private java.util.List topics; + private String group; + private String instance; + + public DeadLetterQueueReplayResponse () { + } + + @JsonProperty("description") + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + @JsonProperty("records") + public Long getRecords() { + return records; + } + + public void setRecords(Long records) { + this.records = records; + } + + @JsonProperty("topics") + public java.util.List getTopics() { + return topics; + } + + public void setTopics(java.util.List topics) { + this.topics = topics; + } + + @JsonProperty("group") + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + + @JsonProperty("instance") + public String getInstance() { + return instance; + } + + public void setInstance(String instance) { + this.instance = instance; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + DeadLetterQueueReplayResponse DeadLetterQueueReplayResponse = (DeadLetterQueueReplayResponse) o; + + return Objects.equals(description, DeadLetterQueueReplayResponse.description) && + Objects.equals(records, DeadLetterQueueReplayResponse.records) && + Objects.equals(topics, DeadLetterQueueReplayResponse.topics) && + Objects.equals(group, DeadLetterQueueReplayResponse.group); + } + + @Override + public int hashCode() { + return Objects.hash(description, records, topics, group); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class DeadLetterQueueReplayResponse {\n"); + sb.append(" description: ").append(toIndentedString(description)).append("\n"); sb.append(" records: ").append(toIndentedString(records)).append("\n"); sb.append(" topics: ").append(toIndentedString(topics)).append("\n"); sb.append(" group: ").append(toIndentedString(group)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } +} diff --git a/kafka-entity/src/main/java/com/networknt/kafka/entity/RecordProcessedResult.java b/kafka-entity/src/main/java/com/networknt/kafka/entity/RecordProcessedResult.java index db321ce..ec40dfb 100644 --- a/kafka-entity/src/main/java/com/networknt/kafka/entity/RecordProcessedResult.java +++ b/kafka-entity/src/main/java/com/networknt/kafka/entity/RecordProcessedResult.java @@ -14,6 +14,8 @@ public class RecordProcessedResult { // the values from the value to identify the transaction. String key; + Integer retried = 0; + public RecordProcessedResult() { } @@ -73,4 +75,12 @@ public String getKey() { public void setKey(String key) { this.key = key; } + + public Integer getRetried() { + return retried; + } + + public void setRetried(Integer retried) { + this.retried = retried; + } }