Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Jun 28, 2016
1 parent 60cd959 commit 9665029
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util._
import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}

/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
Expand Down Expand Up @@ -280,7 +280,7 @@ private[spark] class TaskSchedulerImpl(
}
}
if (!launchedTask) {
taskSet.abortIfTaskSetCompletelyBlacklisted(executorsByHost)
taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
}
return launchedTask
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,8 +592,7 @@ private[spark] class TaskSetManager(
* failures (this is because the method picks on unscheduled task, and then iterates through each
* executor until it finds one that the task hasn't failed on already).
*/
private[scheduler] def abortIfTaskSetCompletelyBlacklisted(
executorsByHost: HashMap[String, HashSet[String]]): Unit = {
private[scheduler] def abortIfCompletelyBlacklisted(executors: Iterable[String]): Unit = {

def pendingTask: Option[Int] = {
// usually this will just take the last pending task, but because of the lazy removal
Expand All @@ -613,19 +612,18 @@ private[spark] class TaskSetManager(

// If no executors have registered yet, don't abort the stage, just wait. We probably
// got here because a task set was added before the executors registered.
if (executorsByHost.nonEmpty) {
if (executors.nonEmpty) {
// take any task that needs to be scheduled, and see if we can find some executor it *could*
// run on
pendingTask.foreach { taskId =>
executorsByHost.foreach { case (host, execs) =>
execs.foreach { exec =>
if (!executorIsBlacklisted(exec, taskId)) {
return
}
executors.foreach { exec =>
if (!executorIsBlacklisted(exec, taskId)) {
return
}
}
abort(s"Aborting ${taskSet} because Task $taskId (partition " +
s"${tasks(taskId).partitionId}) cannot be scheduled on any executor due to blacklists.")
val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")")
abort(s"Aborting ${taskSet} because task $taskId (partition ${tasks(taskId).partitionId})" +
s" has already failed on executors $execs, and no other executors are available.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
Await.ready(jobFuture, duration)
val pattern = ("Aborting TaskSet 0.0 because Task .* " +
"cannot be scheduled on any executor due to blacklists").r
assert(pattern.findFirstIn(failure.getMessage).isDefined)
val pattern = ("Aborting TaskSet 0.0 because task .* " +
"already failed on executors \\(.*\\), and no other executors are available").r
assert(pattern.findFirstIn(failure.getMessage).isDefined,
s"Couldn't find $pattern in ${failure.getMessage()}")
}
assertDataStructuresEmpty(noFailure = false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}

test("abort stage if executor loss results in unschedulability from previously failed tasks") {
// Make sure we can detect when a taskset becomes unschedulability from a blacklisting. This
// Make sure we can detect when a taskset becomes unschedulable from a blacklisting. This
// test explores a particular corner case -- you may have one task fail, but still be
// schedulable on another executor. However, that executor may fail later on, leaving the
// first task with no place to run.
val taskScheduler = setupScheduler(
// set this to something much longer than the test duration
// set this to something much longer than the test duration so that executors don't get
// removed from the blacklist during the test
"spark.scheduler.executorTaskBlacklistTime" -> "10000000"
)

Expand Down Expand Up @@ -327,8 +328,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(tsm.isZombie)
assert(failedTaskSet)
val idx = failedTask.index
assert(failedTaskSetReason.contains(s"Aborting TaskSet 0.0 because Task $idx (partition $idx)" +
s" cannot be scheduled on any executor due to blacklists."))
assert(failedTaskSetReason == s"Aborting TaskSet 0.0 because task $idx (partition $idx) has " +
s"already failed on executors (executor0), and no other executors are available.")
}

test("don't abort if there is an executor available, though it hasn't had scheduled tasks yet") {
Expand All @@ -338,7 +339,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// available and not bail on the job

val taskScheduler = setupScheduler(
// set this to something much longer than the test duration
// set this to something much longer than the test duration so that executors don't get
// removed from the blacklist during the test
"spark.scheduler.executorTaskBlacklistTime" -> "10000000"
)

Expand All @@ -347,8 +349,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get

val offers = Seq(
// each offer has more than enough free cores for the entire task set, so we schedule
// all tasks on one executor
// each offer has more than enough free cores for the entire task set, so when combined
// with the locality preferences, we schedule all tasks on one executor
new WorkerOffer("executor0", "host0", 4),
new WorkerOffer("executor1", "host1", 4)
)
Expand Down

0 comments on commit 9665029

Please sign in to comment.