Skip to content

Commit

Permalink
[SPARK-22956][SS] Bug fix for 2 streams union failover scenario
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This problem reported by yanlin-Lynn ivoson and LiangchangZ. Thanks!

When we union 2 streams from kafka or other sources, while one of them have no continues data coming and in the same time task restart, this will cause an `IllegalStateException`. This mainly cause because the code in [MicroBatchExecution](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L190) , while one stream has no continues data, its comittedOffset same with availableOffset during `populateStartOffsets`, and `currentPartitionOffsets` not properly handled in KafkaSource. Also, maybe we should also consider this scenario in other Source.

## How was this patch tested?

Add a UT in KafkaSourceSuite.scala

Author: Yuanjian Li <xyliyuanjian@gmail.com>

Closes #20150 from xuanyuanking/SPARK-22956.
  • Loading branch information
xuanyuanking authored and zsxwing committed Jan 16, 2018
1 parent c7572b7 commit 07ae39d
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,14 @@ private[kafka010] class KafkaSource(

logInfo(s"GetBatch called with start = $start, end = $end")
val untilPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(end)
// On recovery, getBatch will get called before getOffset
if (currentPartitionOffsets.isEmpty) {
currentPartitionOffsets = Some(untilPartitionOffsets)
}
if (start.isDefined && start.get == end) {
return sqlContext.internalCreateDataFrame(
sqlContext.sparkContext.emptyRDD, schema, isStreaming = true)
}
val fromPartitionOffsets = start match {
case Some(prevBatchEndOffset) =>
KafkaSourceOffset.getPartitionOffsets(prevBatchEndOffset)
Expand Down Expand Up @@ -305,11 +313,6 @@ private[kafka010] class KafkaSource(
logInfo("GetBatch generating RDD of offset range: " +
offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))

// On recovery, getBatch will get called before getOffset
if (currentPartitionOffsets.isEmpty) {
currentPartitionOffsets = Some(untilPartitionOffsets)
}

sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,71 @@ class KafkaSourceSuite extends KafkaSourceTest {
)
}

test("SPARK-22956: currentPartitionOffsets should be set when no new data comes in") {
def getSpecificDF(range: Range.Inclusive): org.apache.spark.sql.Dataset[Int] = {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 1)
testUtils.sendMessages(topic, range.map(_.toString).toArray, Some(0))

val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("maxOffsetsPerTrigger", 5)
.option("subscribe", topic)
.option("startingOffsets", "earliest")

reader.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.map(k => k.toInt)
}

val df1 = getSpecificDF(0 to 9)
val df2 = getSpecificDF(100 to 199)

val kafka = df1.union(df2)

val clock = new StreamManualClock

val waitUntilBatchProcessed = AssertOnQuery { q =>
eventually(Timeout(streamingTimeout)) {
if (!q.exception.isDefined) {
assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
}
}
if (q.exception.isDefined) {
throw q.exception.get
}
true
}

testStream(kafka)(
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
// 5 from smaller topic, 5 from bigger one
CheckLastBatch((0 to 4) ++ (100 to 104): _*),
AdvanceManualClock(100),
waitUntilBatchProcessed,
// 5 from smaller topic, 5 from bigger one
CheckLastBatch((5 to 9) ++ (105 to 109): _*),
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smaller topic empty, 5 from bigger one
CheckLastBatch(110 to 114: _*),
StopStream,
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
// smallest now empty, 5 from bigger one
CheckLastBatch(115 to 119: _*),
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smallest now empty, 5 from bigger one
CheckLastBatch(120 to 124: _*)
)
}

test("cannot stop Kafka stream") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,8 @@ class MicroBatchExecution(
* batch will be executed before getOffset is called again. */
availableOffsets.foreach {
case (source: Source, end: Offset) =>
if (committedOffsets.get(source).map(_ != end).getOrElse(true)) {
val start = committedOffsets.get(source)
source.getBatch(start, end)
}
val start = committedOffsets.get(source)
source.getBatch(start, end)
case nonV1Tuple =>
// The V2 API does not have the same edge case requiring getBatch to be called
// here, so we do nothing here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,15 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
val newBlocks = synchronized {
val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
assert(sliceStart <= sliceEnd, s"sliceStart: $sliceStart sliceEnd: $sliceEnd")
batches.slice(sliceStart, sliceEnd)
}

if (newBlocks.isEmpty) {
return sqlContext.internalCreateDataFrame(
sqlContext.sparkContext.emptyRDD, schema, isStreaming = true)
}

logDebug(generateDebugString(newBlocks, startOrdinal, endOrdinal))

newBlocks
Expand Down

0 comments on commit 07ae39d

Please sign in to comment.