-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-15865][CORE] Blacklist should not result in job hanging with less than 4 executors #13603
Changes from 33 commits
3f46275
f870bde
6b80b28
64ab7fb
3edb6fe
6362b28
3da14ee
060dbfe
fd48403
c1aabea
9efd95d
158775c
e880495
39d18ed
195408b
40b8ee7
7ca39a0
39fbf72
87d6185
c606dc3
2f83927
635edec
2d52962
9684eb7
28a00db
e8e4fe5
a5f7eb4
b5b0f3f
6dd93af
f4e95c6
860ee1a
60cd959
9665029
96049cd
ed71c99
ed413ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -83,7 +83,7 @@ private[spark] class TaskSetManager( | |
val copiesRunning = new Array[Int](numTasks) | ||
val successful = new Array[Boolean](numTasks) | ||
private val numFailures = new Array[Int](numTasks) | ||
// key is taskId, value is a Map of executor id to when it failed | ||
// key is taskId (aka TaskInfo.index), value is a Map of executor id to when it failed | ||
private val failedExecutors = new HashMap[Int, HashMap[String, Long]]() | ||
|
||
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) | ||
|
@@ -270,7 +270,7 @@ private[spark] class TaskSetManager( | |
* Is this re-execution of a failed task on an executor it already failed in before | ||
* EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ? | ||
*/ | ||
private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = { | ||
private[scheduler] def executorIsBlacklisted(execId: String, taskId: Int): Boolean = { | ||
if (failedExecutors.contains(taskId)) { | ||
val failed = failedExecutors.get(taskId).get | ||
|
||
|
@@ -575,6 +575,59 @@ private[spark] class TaskSetManager( | |
index | ||
} | ||
|
||
/** | ||
* Check whether the given task set has been blacklisted to the point that it can't run anywhere. | ||
* | ||
* It is possible that this taskset has become impossible to schedule *anywhere* due to the | ||
* blacklist. The most common scenario would be if there are fewer executors than | ||
* spark.task.maxFailures. We need to detect this so we can fail the task set, otherwise the job | ||
* will hang. | ||
* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The next par. is a little hard to understand. How about something like (also is the below correct? It's a little different from what you had): There's a tradeoff here: we could make sure all tasks in the task set are schedulable, but that would add extra time to each iteration of the scheduling loop. Here, we take the approach of making sure at least one of the unscheduled tasks is schedulable. This means we may not detect the hang as quickly as we could have, but we'll always detect the hang eventually, and the method is faster in the typical case. In the worst case, this method can take O(maxTaskFailures) time, but it will be faster when there haven't been any task failures (this is because the method picks on unscheduled task, and then iterates through each executor until it finds one that the task hasn't failed on already). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, much better. Also good point that its only O(maxTaskFailures), I hadn't realized that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After further thought I think this is actually O(maxTaskFailures + # tasks) right? Since you might need to first iterate through all of the tasks to find one that hasn't started yet, and then iterate through some failed executors? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah right, I forgot about the cost of pollPendingTask |
||
* There's a tradeoff here: we could make sure all tasks in the task set are schedulable, but that | ||
* would add extra time to each iteration of the scheduling loop. Here, we take the approach of | ||
* making sure at least one of the unscheduled tasks is schedulable. This means we may not detect | ||
* the hang as quickly as we could have, but we'll always detect the hang eventually, and the | ||
* method is faster in the typical case. In the worst case, this method can take | ||
* O(maxTaskFailures + numTasks) time, but it will be faster when there haven't been any task | ||
* failures (this is because the method picks on unscheduled task, and then iterates through each | ||
* executor until it finds one that the task hasn't failed on already). | ||
*/ | ||
private[scheduler] def abortIfCompletelyBlacklisted(executors: Iterable[String]): Unit = { | ||
|
||
def pendingTask: Option[Int] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gah sorry one more tiny thing: can this just be a val? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can't just change this to a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed a commit with changing it to a val, so you can see both options. easy enough to back out that last commit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about val pendingTask: Option[Int] = allPendingTasks.lastIndexWhere { indexInTaskSet => (I realize we're really in the weeds here so whatever you prefer here is fine) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh I didn't even know about lastIndexWhere! thanks, simpler, and despite being a minor point I appreciate learning something new :) sorry I think I just got on the wrong track in this while thinking about doing the lazy-removal here as well, and when I decided against it never stepped back to simplify it. |
||
// usually this will just take the last pending task, but because of the lazy removal | ||
// from each list, we may need to go deeper in the list. We poll from the end because | ||
// failed tasks are put back at the end of allPendingTasks, so we're more likely to find | ||
// an unschedulable task this way. | ||
var indexOffset = allPendingTasks.size | ||
while (indexOffset > 0) { | ||
indexOffset -= 1 | ||
val indexInTaskSet = allPendingTasks(indexOffset) | ||
if (copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet)) { | ||
return Some(indexInTaskSet) | ||
} | ||
} | ||
None | ||
} | ||
|
||
// If no executors have registered yet, don't abort the stage, just wait. We probably | ||
// got here because a task set was added before the executors registered. | ||
if (executors.nonEmpty) { | ||
// take any task that needs to be scheduled, and see if we can find some executor it *could* | ||
// run on | ||
pendingTask.foreach { taskId => | ||
executors.foreach { exec => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now that executors is an iterable, just do "if (executors.find(executorIsBlacklisted(_, taskId)).isEmpty) { .. abort ...}" here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point, in fact I can just use |
||
if (!executorIsBlacklisted(exec, taskId)) { | ||
return | ||
} | ||
} | ||
val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")") | ||
abort(s"Aborting ${taskSet} because task $taskId (partition ${tasks(taskId).partitionId})" + | ||
s" has already failed on executors $execs, and no other executors are available.") | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Marks the task as getting result and notifies the DAG Scheduler | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,6 +91,32 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM | |
assertDataStructuresEmpty(noFailure = true) | ||
} | ||
|
||
// Make sure that if we've failed on all executors, but haven't hit task.maxFailures yet, the job | ||
// doesn't hang | ||
testScheduler( | ||
"SPARK-15865 Progress with fewer executors than maxTaskFailures", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm pretty sure the previous flakiness in this test suite is unrelated to the test I'm adding here, but we should probably wait to merge this until after #13565 just to be safe. (at least, its unrelated to the delay-scheduling-revive-offers issue, which was the major previous flakiness -- potentially still suffers from the DAGScheduler / assertDataStructuresEmpty race, but that is quite rare.) I did run this test 5K times on my laptop and it passed (though that hasn't always been enough to discover some of the races). |
||
extraConfs = Seq( | ||
// set this to something much longer than the test duration | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add "so that executors don't get removed from the blacklist during the test" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like you added this in the other test -- I know it's redundant here but still helpful for reference I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doh, I thought I had changed it everywhere earlier, sorry, done now |
||
"spark.scheduler.executorTaskBlacklistTime" -> "10000000", | ||
"spark.testing.nHosts" -> "2", | ||
"spark.testing.nExecutorsPerHost" -> "1", | ||
"spark.testing.nCoresPerExecutor" -> "1" | ||
) | ||
) { | ||
def runBackend(): Unit = { | ||
val (taskDescription, _) = backend.beginTask() | ||
backend.taskFailed(taskDescription, new RuntimeException("test task failure")) | ||
} | ||
withBackend(runBackend _) { | ||
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray) | ||
Await.ready(jobFuture, duration) | ||
val pattern = ("Aborting TaskSet 0.0 because task .* " + | ||
"already failed on executors \\(.*\\), and no other executors are available").r | ||
assert(pattern.findFirstIn(failure.getMessage).isDefined, | ||
s"Couldn't find $pattern in ${failure.getMessage()}") | ||
} | ||
assertDataStructuresEmpty(noFailure = false) | ||
} | ||
} | ||
|
||
class MultiExecutorMockBackend( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -281,6 +281,98 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B | |
assert(!failedTaskSet) | ||
} | ||
|
||
test("abort stage if executor loss results in unschedulability from previously failed tasks") { | ||
// Make sure we can detect when a taskset becomes unschedulable from a blacklisting. This | ||
// test explores a particular corner case -- you may have one task fail, but still be | ||
// schedulable on another executor. However, that executor may fail later on, leaving the | ||
// first task with no place to run. | ||
val taskScheduler = setupScheduler( | ||
// set this to something much longer than the test duration so that executors don't get | ||
// removed from the blacklist during the test | ||
"spark.scheduler.executorTaskBlacklistTime" -> "10000000" | ||
) | ||
|
||
val taskSet = FakeTask.createTaskSet(2) | ||
taskScheduler.submitTasks(taskSet) | ||
val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get | ||
|
||
val firstTaskAttempts = taskScheduler.resourceOffers(Seq( | ||
new WorkerOffer("executor0", "host0", 1), | ||
new WorkerOffer("executor1", "host1", 1) | ||
)).flatten | ||
assert(Set("executor0", "executor1") === firstTaskAttempts.map(_.executorId).toSet) | ||
|
||
// fail one of the tasks, but leave the other running | ||
val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get | ||
taskScheduler.handleFailedTask(tsm, failedTask.taskId, TaskState.FAILED, TaskResultLost) | ||
// at this point, our failed task could run on the other executor, so don't give up the task | ||
// set yet. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does it make sense to verify !failedTaskSet here? |
||
assert(!failedTaskSet) | ||
|
||
// Now we fail our second executor. The other task can still run on executor1, so make an offer | ||
// on that executor, and make sure that the other task (not the failed one) is assigned there | ||
taskScheduler.executorLost("executor1", SlaveLost("oops")) | ||
val nextTaskAttempts = | ||
taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1))).flatten | ||
// Note: Its OK if some future change makes this already realize the taskset has become | ||
// unschedulable at this point (though in the current implementation, we're sure it will not) | ||
assert(nextTaskAttempts.size === 1) | ||
assert(nextTaskAttempts.head.executorId === "executor0") | ||
assert(nextTaskAttempts.head.attemptNumber === 1) | ||
assert(nextTaskAttempts.head.index != failedTask.index) | ||
|
||
// now we should definitely realize that our task set is unschedulable, because the only | ||
// task left can't be scheduled on any executors due to the blacklist | ||
taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1))) | ||
sc.listenerBus.waitUntilEmpty(100000) | ||
assert(tsm.isZombie) | ||
assert(failedTaskSet) | ||
val idx = failedTask.index | ||
assert(failedTaskSetReason == s"Aborting TaskSet 0.0 because task $idx (partition $idx) has " + | ||
s"already failed on executors (executor0), and no other executors are available.") | ||
} | ||
|
||
test("don't abort if there is an executor available, though it hasn't had scheduled tasks yet") { | ||
// interaction of SPARK-15865 & SPARK-16106 | ||
// if we have a small number of tasks, we might be able to schedule them all on the first | ||
// executor. But if those tasks fail, we should still realize there is another executor | ||
// available and not bail on the job | ||
|
||
val taskScheduler = setupScheduler( | ||
// set this to something much longer than the test duration so that executors don't get | ||
// removed from the blacklist during the test | ||
"spark.scheduler.executorTaskBlacklistTime" -> "10000000" | ||
) | ||
|
||
val taskSet = FakeTask.createTaskSet(2, (0 until 2).map { _ => Seq(TaskLocation("host0")) }: _*) | ||
taskScheduler.submitTasks(taskSet) | ||
val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get | ||
|
||
val offers = Seq( | ||
// each offer has more than enough free cores for the entire task set, so when combined | ||
// with the locality preferences, we schedule all tasks on one executor | ||
new WorkerOffer("executor0", "host0", 4), | ||
new WorkerOffer("executor1", "host1", 4) | ||
) | ||
val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten | ||
assert(firstTaskAttempts.size == 2) | ||
firstTaskAttempts.foreach { taskAttempt => assert("executor0" === taskAttempt.executorId) } | ||
|
||
// fail all the tasks on the bad executor | ||
firstTaskAttempts.foreach { taskAttempt => | ||
taskScheduler.handleFailedTask(tsm, taskAttempt.taskId, TaskState.FAILED, TaskResultLost) | ||
} | ||
|
||
// Here is the main check of this test -- we have the same offers again, and we schedule it | ||
// successfully. Because the scheduler first tries to schedule with locality in mind, at first | ||
// it won't schedule anything on executor1. But despite that, we don't abort the job. Then the | ||
// scheduler tries for ANY locality, and successfully schedules tasks on executor1. | ||
val secondTaskAttempts = taskScheduler.resourceOffers(offers).flatten | ||
assert(secondTaskAttempts.size == 2) | ||
secondTaskAttempts.foreach { taskAttempt => assert("executor1" === taskAttempt.executorId) } | ||
assert(!failedTaskSet) | ||
} | ||
|
||
test("SPARK-16106 locality levels updated if executor added to existing host") { | ||
val taskScheduler = setupScheduler() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here (and
executorIsBlacklisted
) is what got me confused about the meaning of these identifiers. I knew it was defintiely not the globally unique taskId, aka TID.