Skip to content

Commit

Permalink
[SPARK-19326] Speculated task attempts do not get launched in few sce…
Browse files Browse the repository at this point in the history
…narios

## What changes were proposed in this pull request?

Add a new listener event when a speculative task is created and notify it to ExecutorAllocationManager for requesting more executor.

## How was this patch tested?

- Added Unittests.
- For the test snippet in the jira:
val n = 100
val someRDD = sc.parallelize(1 to n, n)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index == 1) {
Thread.sleep(Long.MaxValue) // fake long running task(s)
}
it.toList.map(x => index + ", " + x).iterator
}).collect
With this code change, spark indicates 101 jobs are running (99 succeeded, 2 running and 1 is speculative job)

Author: Jane Wang <janewang@fb.com>

Closes #18492 from janewangfb/speculated_task_not_launched.
  • Loading branch information
janewangfb authored and cloud-fan committed Aug 23, 2017
1 parent 3429619 commit d58a350
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
onEvent(blockUpdated);
}

@Override
public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted speculativeTask) {
onEvent(speculativeTask);
}

@Override
public void onOtherEvent(SparkListenerEvent event) {
onEvent(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,14 @@ private[spark] class ExecutorAllocationManager(
// If our target has not changed, do not send a message
// to the cluster manager and reset our exponential growth
if (delta == 0) {
numExecutorsToAdd = 1
return 0
// Check if there is any speculative jobs pending
if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 0) {
numExecutorsTarget =
math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), minNumExecutors)
} else {
numExecutorsToAdd = 1
return 0
}
}

val addRequestAcknowledged = try {
Expand Down Expand Up @@ -588,17 +594,22 @@ private[spark] class ExecutorAllocationManager(
* A listener that notifies the given allocation manager of when to add and remove executors.
*
* This class is intentionally conservative in its assumptions about the relative ordering
* and consistency of events returned by the listener. For simplicity, it does not account
* for speculated tasks.
* and consistency of events returned by the listener.
*/
private class ExecutorAllocationListener extends SparkListener {

private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
// Number of tasks currently running on the cluster. Should be 0 when no stages are active.
// Number of tasks currently running on the cluster including speculative tasks.
// Should be 0 when no stages are active.
private var numRunningTasks: Int = _

// Number of speculative tasks to be scheduled in each stage
private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int]
// The speculative tasks started in each stage
private val stageIdToSpeculativeTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]

// stageId to tuple (the number of task with locality preferences, a map where each pair is a
// node and the number of tasks that would like to be scheduled on that node) map,
// maintain the executor placement hints for each stage Id used by resource framework to better
Expand Down Expand Up @@ -637,15 +648,17 @@ private[spark] class ExecutorAllocationManager(
val stageId = stageCompleted.stageInfo.stageId
allocationManager.synchronized {
stageIdToNumTasks -= stageId
stageIdToNumSpeculativeTasks -= stageId
stageIdToTaskIndices -= stageId
stageIdToSpeculativeTaskIndices -= stageId
stageIdToExecutorPlacementHints -= stageId

// Update the executor placement hints
updateExecutorPlacementHints()

// If this is the last stage with pending tasks, mark the scheduler queue as empty
// This is needed in case the stage is aborted for any reason
if (stageIdToNumTasks.isEmpty) {
if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
if (numRunningTasks != 0) {
logWarning("No stages are running, but numRunningTasks != 0")
Expand All @@ -671,7 +684,12 @@ private[spark] class ExecutorAllocationManager(
}

// If this is the last pending task, mark the scheduler queue as empty
stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
if (taskStart.taskInfo.speculative) {
stageIdToSpeculativeTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) +=
taskIndex
} else {
stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
}
if (totalPendingTasks() == 0) {
allocationManager.onSchedulerQueueEmpty()
}
Expand Down Expand Up @@ -705,7 +723,11 @@ private[spark] class ExecutorAllocationManager(
if (totalPendingTasks() == 0) {
allocationManager.onSchedulerBacklogged()
}
stageIdToTaskIndices.get(stageId).foreach { _.remove(taskIndex) }
if (taskEnd.taskInfo.speculative) {
stageIdToSpeculativeTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
} else {
stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
}
}
}
}
Expand All @@ -726,18 +748,39 @@ private[spark] class ExecutorAllocationManager(
allocationManager.onExecutorRemoved(executorRemoved.executorId)
}

override def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted)
: Unit = {
val stageId = speculativeTask.stageId

allocationManager.synchronized {
stageIdToNumSpeculativeTasks(stageId) =
stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
allocationManager.onSchedulerBacklogged()
}
}

/**
* An estimate of the total number of pending tasks remaining for currently running stages. Does
* not account for tasks which may have failed and been resubmitted.
*
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
*/
def totalPendingTasks(): Int = {
def pendingTasks(): Int = {
stageIdToNumTasks.map { case (stageId, numTasks) =>
numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
}.sum
}

def pendingSpeculativeTasks(): Int = {
stageIdToNumSpeculativeTasks.map { case (stageId, numTasks) =>
numTasks - stageIdToSpeculativeTaskIndices.get(stageId).map(_.size).getOrElse(0)
}.sum
}

def totalPendingTasks(): Int = {
pendingTasks + pendingSpeculativeTasks
}

/**
* The number of tasks currently running across all stages.
*/
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,13 @@ class DAGScheduler(
eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception))
}

/**
* Called by the TaskSetManager when it decides a speculative task is needed.
*/
def speculativeTaskSubmitted(task: Task[_]): Unit = {
eventProcessLoop.post(SpeculativeTaskSubmitted(task))
}

private[scheduler]
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
Expand Down Expand Up @@ -812,6 +819,10 @@ class DAGScheduler(
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
}

private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = {
listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId))
}

private[scheduler] def handleTaskSetFailed(
taskSet: TaskSet,
reason: String,
Expand Down Expand Up @@ -1778,6 +1789,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)

case SpeculativeTaskSubmitted(task) =>
dagScheduler.handleSpeculativeTaskSubmitted(task)

case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,7 @@ case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Thr
extends DAGSchedulerEvent

private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent

private[scheduler]
case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent

11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: T
@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskEnd(
stageId: Int,
Expand Down Expand Up @@ -290,6 +293,11 @@ private[spark] trait SparkListenerInterface {
*/
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit

/**
* Called when a speculative task is submitted
*/
def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit

/**
* Called when other events like SQL-specific events are posted.
*/
Expand Down Expand Up @@ -354,5 +362,8 @@ abstract class SparkListener extends SparkListenerInterface {

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }

override def onSpeculativeTaskSubmitted(
speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit = { }

override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ private[spark] trait SparkListenerBus
listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
case _ => listener.onOtherEvent(event)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,7 @@ private[spark] class TaskSetManager(
"Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
.format(index, taskSet.id, info.host, threshold))
speculatableTasks += index
sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
foundTasks = true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,40 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsTarget(manager) === 10)
}

test("add executors when speculative tasks added") {
sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get

// Verify that we're capped at number of tasks including the speculative ones in the stage
sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
assert(numExecutorsTarget(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 1)
sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
assert(numExecutorsTarget(manager) === 1)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 2)
assert(numExecutorsTarget(manager) === 3)
assert(numExecutorsToAdd(manager) === 4)
assert(addExecutors(manager) === 2)
assert(numExecutorsTarget(manager) === 5)
assert(numExecutorsToAdd(manager) === 1)

// Verify that running a task doesn't affect the target
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
assert(numExecutorsTarget(manager) === 5)
assert(addExecutors(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)

// Verify that running a speculative task doesn't affect the target
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true)))
assert(numExecutorsTarget(manager) === 5)
assert(addExecutors(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
}

test("cancel pending executors when no longer needed") {
sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get
Expand Down Expand Up @@ -1031,10 +1065,15 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
taskLocalityPreferences = taskLocalityPreferences)
}

private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = {
new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative = false)
private def createTaskInfo(
taskId: Int,
taskIndex: Int,
executorId: String,
speculative: Boolean = false): TaskInfo = {
new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative)
}


/* ------------------------------------------------------- *
| Helper methods for accessing private methods and fields |
* ------------------------------------------------------- */
Expand All @@ -1061,6 +1100,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy)
private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks)
private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount)
private val _onSpeculativeTaskSubmitted = PrivateMethod[Unit]('onSpeculativeTaskSubmitted)

private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _numExecutorsToAdd()
Expand Down Expand Up @@ -1136,6 +1176,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
manager invokePrivate _onExecutorBusy(id)
}

private def onSpeculativeTaskSubmitted(manager: ExecutorAllocationManager, id: String) : Unit = {
manager invokePrivate _onSpeculativeTaskSubmitted(id)
}

private def localityAwareTasks(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _localityAwareTasks()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
exception: Option[Throwable]): Unit = {
taskScheduler.taskSetsFailed += taskSet.id
}

override def speculativeTaskSubmitted(task: Task[_]): Unit = {
taskScheduler.speculativeTasks += task.partitionId
}
}

// Get the rack for a given host
Expand Down Expand Up @@ -92,6 +96,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
val endedTasks = new mutable.HashMap[Long, TaskEndReason]
val finishedManagers = new ArrayBuffer[TaskSetManager]
val taskSetsFailed = new ArrayBuffer[String]
val speculativeTasks = new ArrayBuffer[Int]

val executors = new mutable.HashMap[String, String]
for ((execId, host) <- liveExecutors) {
Expand Down Expand Up @@ -139,6 +144,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
}
}


override def getRackForHost(value: String): Option[String] = FakeRackUtil.getRackForHost(value)
}

Expand Down Expand Up @@ -929,6 +935,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// > 0ms, so advance the clock by 1ms here.
clock.advance(1)
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set(3))

// Offer resource to start the speculative attempt for the running task
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption5.isDefined)
Expand Down Expand Up @@ -1016,6 +1024,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// > 0ms, so advance the clock by 1ms here.
clock.advance(1)
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set(3, 4))
// Offer resource to start the speculative attempt for the running task
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption5.isDefined)
Expand Down

0 comments on commit d58a350

Please sign in to comment.