From 1cd6f3d51e8bc6cd376d1baf28d0b2b2e71b0648 Mon Sep 17 00:00:00 2001 From: Dmitry Popkov <91957973+d-popkov@users.noreply.github.com> Date: Thu, 9 Dec 2021 13:35:47 +0200 Subject: [PATCH] MapR [SPARK-966] Streaming application with the latest offset read 1 message from mapr stream which was produced before application start (#924) --- .../streaming/kafka010/DirectKafkaInputDStream.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index efca1fb47d113..8fc5592640e83 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -59,6 +59,8 @@ private[spark] class DirectKafkaInputDStream[K, V]( ppc: PerPartitionConfig ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets { + var isFirstCompute = true + val executorKafkaParams = { val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams) KafkaUtils.fixKafkaParams(ekp) @@ -267,7 +269,12 @@ private[spark] class DirectKafkaInputDStream[K, V]( override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { val untilOffsets = clamp(latestOffsets()) val offsetRanges = untilOffsets.map { case (tp, uo) => - val fo = currentOffsets(tp) + var fo = currentOffsets(tp) + if (isFirstCompute && KafkaUtils.isStreams(untilOffsets) && uo.-(fo) == 1 + && consumerStrategy.executorKafkaParams.get("auto.offset.reset").toString.equals("latest")) { + fo = uo + isFirstCompute = false + } OffsetRange(tp.topic, tp.partition, fo, uo) } val useConsumerCache = context.conf.get(CONSUMER_CACHE_ENABLED)