Skip to content

Commit

Permalink
Added tests in DAGSchedulerSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
venkata91 committed Jul 19, 2021
1 parent 060b17a commit b197cc5
Showing 1 changed file with 57 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3782,6 +3782,63 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
assert(mapOutputTracker.getNumAvailableOutputs(shuffleDep.shuffleId) == parts)
}

test("SPARK-32923: handle stage failure for indeterminate map stage with push-based shuffle") {
initPushBasedShuffleConfs(conf)
DAGSchedulerSuite.clearMergerLocs
DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
val (shuffleId1, shuffleId2) = constructIndeterminateStageFetchFailed()

// Check status for all failedStages
val failedStages = scheduler.failedStages.toSeq
assert(failedStages.map(_.id) == Seq(1, 2))
// Shuffle blocks of "hostC" is lost, so first task of the `shuffleMapRdd2` needs to retry.
assert(failedStages.collect {
case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 => stage
}.head.findMissingPartitions() == Seq(0))
// The result stage is still waiting for its 2 tasks to complete
assert(failedStages.collect {
case stage: ResultStage => stage
}.head.findMissingPartitions() == Seq(0, 1))

scheduler.resubmitFailedStages()

// The first task of the `shuffleMapRdd2` failed with fetch failure
runEvent(makeCompletionEvent(
taskSets(3).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0L, 0, 0, "ignored"),
null))

val newFailedStages = scheduler.failedStages.toSeq
assert(newFailedStages.map(_.id) == Seq(0, 1))

scheduler.resubmitFailedStages()

// First shuffle map stage resubmitted and reran all tasks.
assert(taskSets(4).stageId == 0)
assert(taskSets(4).stageAttemptId == 1)
assert(taskSets(4).tasks.length == 2)

// Finish all stage.
completeShuffleMapStageSuccessfully(0, 1, 2)
assert(mapOutputTracker.findMissingPartitions(shuffleId1) === Some(Seq.empty))
assert(mapOutputTracker.shuffleStatuses.get(shuffleId1).forall(
_.mergeStatuses.forall(x => x.shuffleSequenceId == 1)))
assert(mapOutputTracker.getNumAvailableMergeResults(shuffleId1) == 2)

completeShuffleMapStageSuccessfully(1, 2, 2, Seq("hostC", "hostD"))
assert(mapOutputTracker.findMissingPartitions(shuffleId2) === Some(Seq.empty))
assert(mapOutputTracker.shuffleStatuses.get(shuffleId2).forall(
_.mergeStatuses.forall(x => x.shuffleSequenceId == 2)))
assert(mapOutputTracker.getNumAvailableMergeResults(shuffleId2) == 2)

complete(taskSets(6), Seq((Success, 11), (Success, 12)))

// Job successful ended.
assert(results === Map(0 -> 11, 1 -> 12))
results.clear()
assertDataStructuresEmpty()
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
Expand Down

0 comments on commit b197cc5

Please sign in to comment.