Skip to content

Commit

Permalink
[SPARK-4964] code cleanup per tdas
Browse files Browse the repository at this point in the history
  • Loading branch information
koeninger committed Dec 30, 2014
1 parent 8bfd6c0 commit 1d50749
Showing 1 changed file with 1 addition and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ class DeterministicKafkaInputDStream[
}
}

// TODO based on the design of InputDStream's lastValidTime, it appears there isn't a
// thread safety concern with private mutable state, but is this certain?
private var currentOffsets = fromOffsets

@tailrec
Expand All @@ -99,9 +97,7 @@ class DeterministicKafkaInputDStream[
maxMessagesPerPartition.map { mmp =>
leaderOffsets.map { kv =>
val (k, v) = kv
val curr = currentOffsets(k)
val diff = v - curr
if (diff > mmp) (k, curr + mmp) else (k, v)
k -> Math.min(currentOffsets(k) + mmp, v)
}
}.getOrElse(leaderOffsets)
}
Expand Down

0 comments on commit 1d50749

Please sign in to comment.