Skip to content

Commit

Permalink
model and config change for kafka-sidecar issue #19 implemetation
Browse files Browse the repository at this point in the history
  • Loading branch information
GavinChenYan committed Sep 3, 2021
1 parent 0561fc9 commit b0dadb3
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class KafkaConsumerConfig {
private String backendApiPath;
private boolean deadLetterEnabled;
private String deadLetterTopicExt;
private int deadLetterMessageRetry;
private boolean auditEnabled;
private String auditTarget;
private String auditTopic;
Expand Down Expand Up @@ -200,4 +201,12 @@ public boolean isUseNoWrappingAvro() {
public void setUseNoWrappingAvro(boolean useNoWrappingAvro) {
this.useNoWrappingAvro = useNoWrappingAvro;
}

public int getDeadLetterMessageRetry() {
return deadLetterMessageRetry;
}

public void setDeadLetterMessageRetry(int deadLetterMessageRetry) {
this.deadLetterMessageRetry = deadLetterMessageRetry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,19 @@ public void shutdown() {
}
}

public synchronized KafkaConsumerState<?, ?, ?, ?> getExistingConsumerInstance(
String group,
String instance
) {
ConsumerInstanceId id = new ConsumerInstanceId(group, instance);
final KafkaConsumerState state = consumers.get(id);
if (state != null) {
state.updateExpiration();
}
return state;
}


/**
* Gets the specified consumer instance or throws a not found exception. Also removes the
* consumer's expiration timeout so it is not cleaned up mid-operation.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.networknt.kafka.entity;

import java.util.Objects;

import com.fasterxml.jackson.annotation.JsonProperty;

public class DeadLetterQueueReplayResponse {

private String description;
private Long records;
private java.util.List<String> topics;
private String group;
private String instance;

public DeadLetterQueueReplayResponse () {
}

@JsonProperty("description")
public String getDescription() {
return description;
}

public void setDescription(String description) {
this.description = description;
}

@JsonProperty("records")
public Long getRecords() {
return records;
}

public void setRecords(Long records) {
this.records = records;
}

@JsonProperty("topics")
public java.util.List<String> getTopics() {
return topics;
}

public void setTopics(java.util.List<String> topics) {
this.topics = topics;
}

@JsonProperty("group")
public String getGroup() {
return group;
}

public void setGroup(String group) {
this.group = group;
}

@JsonProperty("instance")
public String getInstance() {
return instance;
}

public void setInstance(String instance) {
this.instance = instance;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

DeadLetterQueueReplayResponse DeadLetterQueueReplayResponse = (DeadLetterQueueReplayResponse) o;

return Objects.equals(description, DeadLetterQueueReplayResponse.description) &&
Objects.equals(records, DeadLetterQueueReplayResponse.records) &&
Objects.equals(topics, DeadLetterQueueReplayResponse.topics) &&
Objects.equals(group, DeadLetterQueueReplayResponse.group);
}

@Override
public int hashCode() {
return Objects.hash(description, records, topics, group);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class DeadLetterQueueReplayResponse {\n");
sb.append(" description: ").append(toIndentedString(description)).append("\n"); sb.append(" records: ").append(toIndentedString(records)).append("\n"); sb.append(" topics: ").append(toIndentedString(topics)).append("\n"); sb.append(" group: ").append(toIndentedString(group)).append("\n");
sb.append("}");
return sb.toString();
}

/**
* Convert the given object to string with each line indented by 4 spaces
* (except the first line).
*/
private String toIndentedString(Object o) {
if (o == null) {
return "null";
}
return o.toString().replace("\n", "\n ");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class RecordProcessedResult {
// the values from the value to identify the transaction.
String key;

Integer retried = 0;

public RecordProcessedResult() {
}

Expand Down Expand Up @@ -73,4 +75,12 @@ public String getKey() {
public void setKey(String key) {
this.key = key;
}

public Integer getRetried() {
return retried;
}

public void setRetried(Integer retried) {
this.retried = retried;
}
}

0 comments on commit b0dadb3

Please sign in to comment.