Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14649][CORE] DagScheduler should not run duplicate tasks on fe… #17297

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,
protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala

// HashMap for storing the epoch for the mapStatuses on the driver. This will be used to
// detect and ignore any bogus fetch failures
private val epochForMapStatus = new ConcurrentHashMap[Int, Array[Long]]().asScala
private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)

// Kept in sync with cachedSerializedStatuses explicitly
Expand Down Expand Up @@ -370,6 +373,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,
if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) {
throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
}
epochForMapStatus.put(shuffleId, new Array[Long](numMaps))
// add in advance
shuffleIdLocks.putIfAbsent(shuffleId, new Object())
}
Expand All @@ -378,6 +382,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,
val array = mapStatuses(shuffleId)
array.synchronized {
array(mapId) = status
val epochs = epochForMapStatus(shuffleId)
epochs(mapId) = epoch
}
}

Expand Down Expand Up @@ -418,6 +424,18 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,
cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
}

/** Get the epoch for map output for a shuffle, if it is available */
def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = {
if (mapId < 0) {
return None
}
for {
mapStatus <- mapStatuses.get(shuffleId).flatMap { mapStatusArray =>
Option(mapStatusArray(mapId))
}
} yield epochForMapStatus(shuffleId)(mapId)
}

/**
* Return the preferred hosts on which to run the given map output partition in a given shuffle,
* i.e. the nodes that the most outputs for that partition are on.
Expand Down
201 changes: 109 additions & 92 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,7 @@ private[scheduler]
case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable])
extends DAGSchedulerEvent

private[scheduler]
case class TasksAborted(stageId: Int, tasks: Seq[Task[_]]) extends DAGSchedulerEvent

private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
* exclusive lock on committing task output for that partition, as well as any known failed
* attempts in the stage.
*
* Entries are added to the top-level map when stages start and are removed they finish
* (either successfully or unsuccessfully).
* Entries are added to the top-level map when stages start.
*
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
*/
private val stageStates = mutable.Map[StageId, StageState]()

/**
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
*/
def isEmpty: Boolean = {
stageStates.isEmpty
}

/**
* Called by tasks to ask whether they can commit their output to HDFS.
*
Expand Down Expand Up @@ -109,13 +101,13 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
* @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e.
* the maximum possible value of `context.partitionId`).
*/
private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit = synchronized {
stageStates(stage) = new StageState(maxPartitionId + 1)
}

// Called by DAGScheduler
private[scheduler] def stageEnd(stage: StageId): Unit = synchronized {
stageStates.remove(stage)
private[scheduler] def stageStart(stage: StageId,
partitionsToCompute: Seq[Int],
maxPartitionId: Int): Unit = synchronized {
val stageState = stageStates.getOrElseUpdate(stage, new StageState(maxPartitionId + 1))
for (i <- partitionsToCompute) {
stageState.authorizedCommitters(i) = NO_AUTHORIZED_COMMITTER
}
}

// Called by DAGScheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,6 @@ private[spark] class ShuffleMapStage(

private[this] var _numAvailableOutputs: Int = 0

/**
* Partitions that either haven't yet been computed, or that were computed on an executor
* that has since been lost, so should be re-computed. This variable is used by the
* DAGScheduler to determine when a stage has completed. Task successes in both the active
* attempt for the stage or in earlier attempts for this stage can cause paritition ids to get
* removed from pendingPartitions. As a result, this variable may be inconsistent with the pending
* tasks in the TaskSetManager for the active attempt for the stage (the partitions stored here
* will always be a subset of the partitions that the TaskSetManager thinks are pending).
*/
val pendingPartitions = new HashSet[Int]

/**
* List of [[MapStatus]] for each partition. The index of the array is the map partition id,
* and each value in the array is the list of possible [[MapStatus]] for a partition
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.scheduler

import scala.collection.mutable
import scala.collection.mutable.HashSet

import org.apache.spark.executor.TaskMetrics
Expand Down Expand Up @@ -67,6 +68,12 @@ private[scheduler] abstract class Stage(
/** Set of jobs that this stage belongs to. */
val jobIds = new HashSet[Int]

/**
* Set of partitions which have been submitted to the lower-level scheduler and
* they should not be resubmitted when rerun of the stage.
*/
val pendingPartitions = new HashSet[Int]

/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,6 @@ private[spark] class TaskSchedulerImpl private[scheduler](
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note that this check is not needed anymore because the DagScheduler already keeps track of running tasks and does not submit duplicate tasks anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, that is not really the point of this check. Its just checking if one stage has two tasksets (aka stage attempts), where both are in the "non-zombie" state. It doesn't do any checks at all on what tasks are actually in those tasksets.

This is just checking an invariant which we believe to always be true, but we figure its better to fail-fast if we hit this condition, rather than proceed with some inconsistent state. This check was added because behavior gets really confusing when the invariant is violated, and though we think it should always be true, we've still hit cases where it happens.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito - That's correct, this is checking that we should not have more than one non-zombie attempts of a stage running. But in the scenario in (d) you described below, we will end up having more than two non-zombie attempts.

However, my point is there is no reason we should not allow multiple concurrent attempts of a stage to run, the only thing we should guarantee is we are running mutually exclusive tasks in those attempts. With this change, since the dag scheduler already keeps track of submitted/running tasks, it can guarantee that it will not resubmit duplicate tasks for a stage.


if (!isLocal && !hasReceivedTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ private[spark] class TaskSetManager(
// the zombie state once at least one attempt of each task has completed successfully, or if the
// task set is aborted (for example, because it was killed). TaskSetManagers remain in the zombie
// state until all tasks have finished running; we keep TaskSetManagers that are in the zombie
// state in order to continue to track and account for the running tasks.
// TODO: We should kill any running task attempts when the task set manager becomes a zombie.
// state in order to continue to track and account for the running tasks. The tasks running in the
// zombie TaskSetManagers are not rerun by the DagScheduler unless they fail.
private[scheduler] var isZombie = false

// Set of pending tasks for each executor. These collections are actually
Expand Down Expand Up @@ -767,12 +767,29 @@ private[spark] class TaskSetManager(
s" executor ${info.executorId}): ${reason.toErrorString}"
val failureException: Option[Throwable] = reason match {
case fetchFailed: FetchFailed =>
if (!isZombie) {
// Only for the first occurrence of the fetch failure, get the list
// of all non-running and non-successful tasks and notify the
// DagScheduler of their abortion so that they can be rescheduled in retry
// of the stage. Note that this does not include the fetch failed tasks,
// because that is separately handled by the DagScheduler.
val abortedTasks = new ArrayBuffer[Task[_]]
for (i <- 0 until numTasks) {
if (i != index && !successful(i) && copiesRunning(i) == 0) {
abortedTasks += taskSet.tasks(i)
}
}
if (!abortedTasks.isEmpty) {
sched.dagScheduler.tasksAborted(abortedTasks(0).stageId, abortedTasks)
}
isZombie = true
}

logWarning(failureReason)
if (!successful(index)) {
successful(index) = true
tasksSuccessful += 1
}
isZombie = true
None

case ef: ExceptionFailure =>
Expand Down Expand Up @@ -825,6 +842,14 @@ private[spark] class TaskSetManager(

sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)

if (reason != Success && !reason.isInstanceOf[FetchFailed] && isZombie) {
// If the TaskSetManager is in zombie mode, we should inform the DAGScheduler to abort
// the task in case of failure so that the DagScheduler can rerun it in the retry of
// the stage. Please note that we exclude fetch failed tasks, because they are handled
// by the DAGScheduler separately.
sched.dagScheduler.tasksAborted(tasks(index).stageId, Array(tasks(index)))
}

if (successful(index)) {
logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but the task will not" +
s" be re-executed (either because the task failed with a shuffle data fetch failure," +
Expand Down
Loading