Skip to content

Commit

Permalink
move check for unschedulability into TaskSetManager
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Jun 20, 2016
1 parent 6b80b28 commit 64ab7fb
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,52 +280,12 @@ private[spark] class TaskSchedulerImpl(
}
}
}
if (!launchedTask && isTaskSetCompletelyBlacklisted(taskSet)) {
taskSet.abort(s"Aborting ${taskSet.taskSet} because it has a task which cannot be scheduled" +
s" on any executor due to blacklists.")
if (!launchedTask) {
taskSet.abortIfTaskSetCompletelyBlacklisted(executorsByHost)
}
return launchedTask
}

/**
* Check whether the given task set has been blacklisted to the point that it can't run anywhere.
*
* It is possible that this taskset has become impossible to schedule *anywhere* due to the
* blacklist. The most common scenario would be if there are fewer executors than
* spark.task.maxFailures. We need to detect this so we can fail the task set, otherwise the job
* will hang.
*
* The check here is a balance between being sure to catch the issue, but not wasting
* too much time inside the scheduling loop. Just check if the last task is schedulable
* on any of the available executors. So this is O(numExecutors) worst-case, but it'll
* really be fast unless you've got a bunch of things blacklisted. Its possible it won't detect
* the unschedulable task immediately, but if it returns false, there is at least *some* task
* that is schedulable, and after scheduling all of those, we'll eventually find the unschedulable
* task.
*/
private[scheduler] def isTaskSetCompletelyBlacklisted(
taskSet: TaskSetManager): Boolean = {
if (executorsByHost.nonEmpty) {
// take any task that needs to be scheduled, and see if we can find some executor it *could*
// run on
taskSet.pollPendingTask.map { task =>
logInfo(s"checking ${executorsByHost}")
executorsByHost.foreach { case (host, execs) =>
execs.foreach { exec =>
logInfo(s"checking task ${task} on exec $exec")
if (!taskSet.executorIsBlacklisted(exec, task)) {
return false
}
}
}
true
}.getOrElse(false)
} else {
// no executors have registered yet, so don't abort the stage, just wait. We probably
// got here because a task set was added before the executors registered.
false
}
}

/**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
Expand Down
76 changes: 57 additions & 19 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -403,25 +403,6 @@ private[spark] class TaskSetManager(
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
}

/**
* Return some task which is pending, but do not remove it from the list of pending tasks.
* Used as a simple way to test if this task set is schedulable anywhere, or if it has been
* completely blacklisted.
*/
private[scheduler] def pollPendingTask: Option[Int] = {
// usually this will just take the last pending task, but because of the lazy removal
// from each list, we may need to go deeper in the list
var indexOffset = allPendingTasks.size
while (indexOffset > 0) {
indexOffset -= 1
val index = allPendingTasks(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return Some(index)
}
}
None
}

/**
* Respond to an offer of a single executor from the scheduler by finding a task
*
Expand Down Expand Up @@ -594,6 +575,63 @@ private[spark] class TaskSetManager(
index
}


/**
* Check whether the given task set has been blacklisted to the point that it can't run anywhere.
*
* It is possible that this taskset has become impossible to schedule *anywhere* due to the
* blacklist. The most common scenario would be if there are fewer executors than
* spark.task.maxFailures. We need to detect this so we can fail the task set, otherwise the job
* will hang.
*
* The check here is a balance between being sure to catch the issue, but not wasting
* too much time inside the scheduling loop. Just check if the last task is schedulable
* on any of the available executors. So this is O(numExecutors) worst-case, but it'll
* really be fast unless you've got a bunch of things blacklisted. Its possible it won't detect
* the unschedulable task immediately, but if it returns false, there is at least *some* task
* that is schedulable, and after scheduling all of those, we'll eventually find the unschedulable
* task.
*/
private[scheduler] def abortIfTaskSetCompletelyBlacklisted(
executorsByHost: HashMap[String, HashSet[String]]): Unit = {
// If no executors have registered yet, don't abort the stage, just wait. We probably
// got here because a task set was added before the executors registered.
if (executorsByHost.nonEmpty) {
// take any task that needs to be scheduled, and see if we can find some executor it *could*
// run on
pollPendingTask.foreach { task =>
executorsByHost.foreach { case (host, execs) =>
execs.foreach { exec =>
if (!executorIsBlacklisted(exec, task)) {
return
}
}
}
abort(s"Aborting ${taskSet} because it has a task which cannot be scheduled on any" +
s" executor due to blacklists.")
}
}
}

/**
* Return some task which is pending, but do not remove it from the list of pending tasks.
* Used as a simple way to test if this task set is schedulable anywhere, or if it has been
* completely blacklisted.
*/
private def pollPendingTask: Option[Int] = {
// usually this will just take the last pending task, but because of the lazy removal
// from each list, we may need to go deeper in the list
var indexOffset = allPendingTasks.size
while (indexOffset > 0) {
indexOffset -= 1
val index = allPendingTasks(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return Some(index)
}
}
None
}

/**
* Marks the task as getting result and notifies the DAG Scheduler
*/
Expand Down

0 comments on commit 64ab7fb

Please sign in to comment.