diff --git a/kafka-consumer/src/main/java/com/networknt/kafka/consumer/KafkaConsumerState.java b/kafka-consumer/src/main/java/com/networknt/kafka/consumer/KafkaConsumerState.java index eecea19..aeaf85c 100644 --- a/kafka-consumer/src/main/java/com/networknt/kafka/consumer/KafkaConsumerState.java +++ b/kafka-consumer/src/main/java/com/networknt/kafka/consumer/KafkaConsumerState.java @@ -21,8 +21,8 @@ import com.networknt.kafka.entity.*; import com.networknt.utility.Constants; import com.networknt.utility.Util; -import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.header.Header; @@ -270,6 +270,7 @@ public synchronized void seekToEnd(ConsumerSeekToRequest seekToRequest) { /** * Overrides the fetch offsets that the consumer will use on the next poll(timeout). + * * @param request the consumer seek request */ public synchronized void seek(ConsumerSeekRequest request) { @@ -282,69 +283,72 @@ public synchronized void seek(ConsumerSeekRequest request) { for (ConsumerSeekRequest.PartitionOffset partition : request.getOffsets()) { topicPartitionMap.put(partition.getTopic() + ":" + partition.getPartition(), partition); } - ArrayDeque dequeue= (ArrayDeque)consumerRecords; - for (Iterator itr = dequeue.iterator(); itr.hasNext();) { + ArrayDeque dequeue = (ArrayDeque) consumerRecords; + for (Iterator itr = dequeue.iterator(); itr.hasNext(); ) { // iterate all records in the queue and add any topic/partition keys that are not in the topicPartitionMap to rollback. ConsumerRecord record = (ConsumerRecord) itr.next(); ConsumerSeekRequest.PartitionOffset partitionOffset = topicPartitionMap.get(record.topic() + ":" + record.partition()); - if(partitionOffset == null) { + if (partitionOffset == null) { partitionOffset = new ConsumerSeekRequest.PartitionOffset(record.topic(), record.partition(), record.offset(), null); topicPartitionMap.put(record.topic() + ":" + record.partition(), partitionOffset); - if(logger.isDebugEnabled()) logger.debug("A new seek request is added for topic = " + record.topic() + " partition = " + record.partition()); + if (logger.isDebugEnabled()) + logger.debug("A new seek request is added for topic = " + record.topic() + " partition = " + record.partition()); } else { // found the record in the map, set the offset if the current offset is smaller. - if(partitionOffset.getOffset() > record.offset()) { + if (partitionOffset.getOffset() > record.offset()) { partitionOffset.setOffset(record.offset()); } } } List pos = topicPartitionMap.values().stream() .collect(Collectors.toList()); - if(pos.size() > request.getOffsets().size()) request.setOffsets(pos); - + if (pos.size() > request.getOffsets().size()) request.setOffsets(pos); + for (ConsumerSeekRequest.PartitionOffset partition : request.getOffsets()) { - if(logger.isDebugEnabled()) { + if (logger.isDebugEnabled()) { logger.debug("seek to topic = " + partition.getTopic() + " partition = " + partition.getPartition() + " offset = " + partition.getOffset()); } - while(true){ - try{ - consumer.seek( - new TopicPartition(partition.getTopic(), partition.getPartition()), - new OffsetAndMetadata(partition.getOffset(), partition.getMetadata())); + while (true) { + try { + consumer.seek( + new TopicPartition(partition.getTopic(), partition.getPartition()), + new OffsetAndMetadata(partition.getOffset(), partition.getMetadata())); + break; + } catch (IllegalStateException ie) { + logger.info("seeking to topic = " + partition.getTopic() + " partition = " + partition.getPartition() + " offset = " + partition.getOffset() + " caught illegal state exception will sleep for 1 sec and retry again."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } } - break; - } - catch(IllegalStateException ie){ - logger.info("seeking to topic = " + partition.getTopic() + " partition = " + partition.getPartition() + " offset = " + partition.getOffset() + " caught illegal state exception will sleep for 1 sec and retry again.") - Thread.sleep(1000); } - } Map metadata = - request.getTimestamps().stream() - .collect( - Collectors.toMap( - partition -> - new TopicPartition(partition.getTopic(), partition.getPartition()), - ConsumerSeekRequest.PartitionTimestamp::getMetadata)); + request.getTimestamps().stream() + .collect( + Collectors.toMap( + partition -> + new TopicPartition(partition.getTopic(), partition.getPartition()), + ConsumerSeekRequest.PartitionTimestamp::getMetadata)); Map offsets = - consumer.offsetsForTimes( - request.getTimestamps().stream() - .collect( - Collectors.toMap( - partition -> - new TopicPartition(partition.getTopic(), partition.getPartition()), - partition -> partition.getTimestamp().toEpochMilli()))); + consumer.offsetsForTimes( + request.getTimestamps().stream() + .collect( + Collectors.toMap( + partition -> + new TopicPartition(partition.getTopic(), partition.getPartition()), + partition -> partition.getTimestamp().toEpochMilli()))); for (Map.Entry offset : offsets.entrySet()) { consumer.seek( - offset.getKey(), - new OffsetAndMetadata( - offset.getValue().offset(), metadata.get(offset.getKey()))); + offset.getKey(), + new OffsetAndMetadata( + offset.getValue().offset(), metadata.get(offset.getKey()))); } // clear the consumerRecords so that the Kafka record will be retrieved again based on the seek offset. - ((ArrayDeque)consumerRecords).clear(); + ((ArrayDeque) consumerRecords).clear(); } /**