Skip to content

Commit

Permalink
MapR [SPARK-966] Streaming application with the latest offset read 1 …
Browse files Browse the repository at this point in the history
…message from mapr stream which was produced before application start (apache#924)
  • Loading branch information
d-popkov authored and ekrivokonmapr committed Nov 6, 2023
1 parent 0556083 commit 1cd6f3d
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
ppc: PerPartitionConfig
) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {

var isFirstCompute = true

val executorKafkaParams = {
val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams)
KafkaUtils.fixKafkaParams(ekp)
Expand Down Expand Up @@ -267,7 +269,12 @@ private[spark] class DirectKafkaInputDStream[K, V](
override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
val untilOffsets = clamp(latestOffsets())
val offsetRanges = untilOffsets.map { case (tp, uo) =>
val fo = currentOffsets(tp)
var fo = currentOffsets(tp)
if (isFirstCompute && KafkaUtils.isStreams(untilOffsets) && uo.-(fo) == 1
&& consumerStrategy.executorKafkaParams.get("auto.offset.reset").toString.equals("latest")) {
fo = uo
isFirstCompute = false
}
OffsetRange(tp.topic, tp.partition, fo, uo)
}
val useConsumerCache = context.conf.get(CONSUMER_CACHE_ENABLED)
Expand Down

0 comments on commit 1cd6f3d

Please sign in to comment.