Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes #141 retry seek as we might have illegal state exception during… #142

Merged
merged 1 commit into from
Feb 17, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import com.networknt.kafka.entity.*;
import com.networknt.utility.Constants;
import com.networknt.utility.Util;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Header;
Expand Down Expand Up @@ -270,6 +270,7 @@ public synchronized void seekToEnd(ConsumerSeekToRequest seekToRequest) {

/**
* Overrides the fetch offsets that the consumer will use on the next poll(timeout).
*
* @param request the consumer seek request
*/
public synchronized void seek(ConsumerSeekRequest request) {
Expand All @@ -282,69 +283,72 @@ public synchronized void seek(ConsumerSeekRequest request) {
for (ConsumerSeekRequest.PartitionOffset partition : request.getOffsets()) {
topicPartitionMap.put(partition.getTopic() + ":" + partition.getPartition(), partition);
}
ArrayDeque dequeue= (ArrayDeque)consumerRecords;
for (Iterator itr = dequeue.iterator(); itr.hasNext();) {
ArrayDeque dequeue = (ArrayDeque) consumerRecords;
for (Iterator itr = dequeue.iterator(); itr.hasNext(); ) {
// iterate all records in the queue and add any topic/partition keys that are not in the topicPartitionMap to rollback.
ConsumerRecord record = (ConsumerRecord) itr.next();
ConsumerSeekRequest.PartitionOffset partitionOffset = topicPartitionMap.get(record.topic() + ":" + record.partition());
if(partitionOffset == null) {
if (partitionOffset == null) {
partitionOffset = new ConsumerSeekRequest.PartitionOffset(record.topic(), record.partition(), record.offset(), null);
topicPartitionMap.put(record.topic() + ":" + record.partition(), partitionOffset);
if(logger.isDebugEnabled()) logger.debug("A new seek request is added for topic = " + record.topic() + " partition = " + record.partition());
if (logger.isDebugEnabled())
logger.debug("A new seek request is added for topic = " + record.topic() + " partition = " + record.partition());
} else {
// found the record in the map, set the offset if the current offset is smaller.
if(partitionOffset.getOffset() > record.offset()) {
if (partitionOffset.getOffset() > record.offset()) {
partitionOffset.setOffset(record.offset());
}
}
}
List<ConsumerSeekRequest.PartitionOffset> pos = topicPartitionMap.values().stream()
.collect(Collectors.toList());
if(pos.size() > request.getOffsets().size()) request.setOffsets(pos);
if (pos.size() > request.getOffsets().size()) request.setOffsets(pos);

for (ConsumerSeekRequest.PartitionOffset partition : request.getOffsets()) {
if(logger.isDebugEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("seek to topic = " + partition.getTopic() + " partition = " + partition.getPartition() + " offset = " + partition.getOffset());
}
while(true){
try{
consumer.seek(
new TopicPartition(partition.getTopic(), partition.getPartition()),
new OffsetAndMetadata(partition.getOffset(), partition.getMetadata()));
while (true) {
try {
consumer.seek(
new TopicPartition(partition.getTopic(), partition.getPartition()),
new OffsetAndMetadata(partition.getOffset(), partition.getMetadata()));
break;
} catch (IllegalStateException ie) {
logger.info("seeking to topic = " + partition.getTopic() + " partition = " + partition.getPartition() + " offset = " + partition.getOffset() + " caught illegal state exception will sleep for 1 sec and retry again.");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
break;
}
catch(IllegalStateException ie){
logger.info("seeking to topic = " + partition.getTopic() + " partition = " + partition.getPartition() + " offset = " + partition.getOffset() + " caught illegal state exception will sleep for 1 sec and retry again.")
Thread.sleep(1000);
}
}

Map<TopicPartition, String> metadata =
request.getTimestamps().stream()
.collect(
Collectors.toMap(
partition ->
new TopicPartition(partition.getTopic(), partition.getPartition()),
ConsumerSeekRequest.PartitionTimestamp::getMetadata));
request.getTimestamps().stream()
.collect(
Collectors.toMap(
partition ->
new TopicPartition(partition.getTopic(), partition.getPartition()),
ConsumerSeekRequest.PartitionTimestamp::getMetadata));

Map<TopicPartition, OffsetAndTimestamp> offsets =
consumer.offsetsForTimes(
request.getTimestamps().stream()
.collect(
Collectors.toMap(
partition ->
new TopicPartition(partition.getTopic(), partition.getPartition()),
partition -> partition.getTimestamp().toEpochMilli())));
consumer.offsetsForTimes(
request.getTimestamps().stream()
.collect(
Collectors.toMap(
partition ->
new TopicPartition(partition.getTopic(), partition.getPartition()),
partition -> partition.getTimestamp().toEpochMilli())));

for (Map.Entry<TopicPartition, OffsetAndTimestamp> offset : offsets.entrySet()) {
consumer.seek(
offset.getKey(),
new OffsetAndMetadata(
offset.getValue().offset(), metadata.get(offset.getKey())));
offset.getKey(),
new OffsetAndMetadata(
offset.getValue().offset(), metadata.get(offset.getKey())));
}
// clear the consumerRecords so that the Kafka record will be retrieved again based on the seek offset.
((ArrayDeque)consumerRecords).clear();
((ArrayDeque) consumerRecords).clear();
}

/**
Expand Down