diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala index 0cfb0d55e8164..a7cd48ba7989f 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala @@ -73,8 +73,6 @@ class DeterministicKafkaInputDStream[ } } - // TODO based on the design of InputDStream's lastValidTime, it appears there isn't a - // thread safety concern with private mutable state, but is this certain? private var currentOffsets = fromOffsets @tailrec @@ -99,9 +97,7 @@ class DeterministicKafkaInputDStream[ maxMessagesPerPartition.map { mmp => leaderOffsets.map { kv => val (k, v) = kv - val curr = currentOffsets(k) - val diff = v - curr - if (diff > mmp) (k, curr + mmp) else (k, v) + k -> Math.min(currentOffsets(k) + mmp, v) } }.getOrElse(leaderOffsets) }