Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Jun 29, 2016
1 parent 9665029 commit 96049cd
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -616,14 +616,12 @@ private[spark] class TaskSetManager(
// take any task that needs to be scheduled, and see if we can find some executor it *could*
// run on
pendingTask.foreach { taskId =>
executors.foreach { exec =>
if (!executorIsBlacklisted(exec, taskId)) {
return
}
if (executors.forall(executorIsBlacklisted(_, taskId))) {
val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")")
val partition = tasks(taskId).partitionId
abort(s"Aborting ${taskSet} because task $taskId (partition $partition)" +
s" has already failed on executors $execs, and no other executors are available.")
}
val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")")
abort(s"Aborting ${taskSet} because task $taskId (partition ${tasks(taskId).partitionId})" +
s" has already failed on executors $execs, and no other executors are available.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
testScheduler(
"With blacklist on, job will still fail if there are too many bad executors on bad host",
extraConfs = Seq(
// just set this to something much longer than the test duration
// set this to something much longer than the test duration so that executors don't get
// removed from the blacklist during the test
("spark.scheduler.executorTaskBlacklistTime", "10000000")
)
) {
Expand All @@ -74,7 +75,8 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
testScheduler(
"Bad node with multiple executors, job will still succeed with the right confs",
extraConfs = Seq(
// just set this to something much longer than the test duration
// set this to something much longer than the test duration so that executors don't get
// removed from the blacklist during the test
("spark.scheduler.executorTaskBlacklistTime", "10000000"),
// this has to be higher than the number of executors on the bad host
("spark.task.maxFailures", "5"),
Expand All @@ -96,7 +98,8 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
testScheduler(
"SPARK-15865 Progress with fewer executors than maxTaskFailures",
extraConfs = Seq(
// set this to something much longer than the test duration
// set this to something much longer than the test duration so that executors don't get
// removed from the blacklist during the test
"spark.scheduler.executorTaskBlacklistTime" -> "10000000",
"spark.testing.nHosts" -> "2",
"spark.testing.nExecutorsPerHost" -> "1",
Expand Down

0 comments on commit 96049cd

Please sign in to comment.