From b197cc57255fa14984a65be6efc5aa1af8bfd9b8 Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Mon, 19 Jul 2021 10:53:45 -0700 Subject: [PATCH] Added tests in DAGSchedulerSuite --- .../spark/scheduler/DAGSchedulerSuite.scala | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index ad942cfe58f48..78bca3b391566 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -3782,6 +3782,63 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assert(mapOutputTracker.getNumAvailableOutputs(shuffleDep.shuffleId) == parts) } + test("SPARK-32923: handle stage failure for indeterminate map stage with push-based shuffle") { + initPushBasedShuffleConfs(conf) + DAGSchedulerSuite.clearMergerLocs + DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) + val (shuffleId1, shuffleId2) = constructIndeterminateStageFetchFailed() + + // Check status for all failedStages + val failedStages = scheduler.failedStages.toSeq + assert(failedStages.map(_.id) == Seq(1, 2)) + // Shuffle blocks of "hostC" is lost, so first task of the `shuffleMapRdd2` needs to retry. + assert(failedStages.collect { + case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 => stage + }.head.findMissingPartitions() == Seq(0)) + // The result stage is still waiting for its 2 tasks to complete + assert(failedStages.collect { + case stage: ResultStage => stage + }.head.findMissingPartitions() == Seq(0, 1)) + + scheduler.resubmitFailedStages() + + // The first task of the `shuffleMapRdd2` failed with fetch failure + runEvent(makeCompletionEvent( + taskSets(3).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0L, 0, 0, "ignored"), + null)) + + val newFailedStages = scheduler.failedStages.toSeq + assert(newFailedStages.map(_.id) == Seq(0, 1)) + + scheduler.resubmitFailedStages() + + // First shuffle map stage resubmitted and reran all tasks. + assert(taskSets(4).stageId == 0) + assert(taskSets(4).stageAttemptId == 1) + assert(taskSets(4).tasks.length == 2) + + // Finish all stage. + completeShuffleMapStageSuccessfully(0, 1, 2) + assert(mapOutputTracker.findMissingPartitions(shuffleId1) === Some(Seq.empty)) + assert(mapOutputTracker.shuffleStatuses.get(shuffleId1).forall( + _.mergeStatuses.forall(x => x.shuffleSequenceId == 1))) + assert(mapOutputTracker.getNumAvailableMergeResults(shuffleId1) == 2) + + completeShuffleMapStageSuccessfully(1, 2, 2, Seq("hostC", "hostD")) + assert(mapOutputTracker.findMissingPartitions(shuffleId2) === Some(Seq.empty)) + assert(mapOutputTracker.shuffleStatuses.get(shuffleId2).forall( + _.mergeStatuses.forall(x => x.shuffleSequenceId == 2))) + assert(mapOutputTracker.getNumAvailableMergeResults(shuffleId2) == 2) + + complete(taskSets(6), Seq((Success, 11), (Success, 12))) + + // Job successful ended. + assert(results === Map(0 -> 11, 1 -> 12)) + results.clear() + assertDataStructuresEmpty() + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID.