Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15865][CORE] Blacklist should not result in job hanging with less than 4 executors #13603

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
3f46275
if all executors have been blacklisted for a ask, abort stage (instea…
squito Jun 10, 2016
f870bde
Merge branch 'master' into progress_w_few_execs_and_blacklist
squito Jun 20, 2016
6b80b28
another test case
squito Jun 20, 2016
64ab7fb
move check for unschedulability into TaskSetManager
squito Jun 20, 2016
3edb6fe
overloooked cleanup
squito Jun 20, 2016
6362b28
rename taskId -> partitionId for failedExecutors
squito Jun 21, 2016
3da14ee
review feedback
squito Jun 21, 2016
060dbfe
fix partitionId in FakeTask
squito Jun 21, 2016
fd48403
need to stop taskscheduler and dagscheduler
squito Jun 21, 2016
c1aabea
update tests per suggestions
squito Jun 21, 2016
9efd95d
update comment
squito Jun 21, 2016
158775c
asserts for not-serializable tasks
squito Jun 21, 2016
e880495
fix assertion
squito Jun 21, 2016
39d18ed
refactor and fix tests
squito Jun 21, 2016
195408b
make sure executorsByHost and localityLevels updated on new executors…
squito Jun 21, 2016
40b8ee7
slightly expand test
squito Jun 21, 2016
7ca39a0
Revert "rename taskId -> partitionId for failedExecutors"
squito Jun 22, 2016
39fbf72
fix the taskId / partitionId mixup
squito Jun 22, 2016
87d6185
Merge branch 'master' into progress_w_few_execs_and_blacklist
squito Jun 22, 2016
c606dc3
Merge branch 'master' into progress_w_few_execs_and_blacklist
squito Jun 22, 2016
2f83927
dont start revive thread in constructor, needs to wait until executor…
squito Jun 22, 2016
635edec
add test for blacklist + locality + schedulability
squito Jun 23, 2016
2d52962
Merge branch 'SPARK-16106_executorByHosts' into progress_w_few_execs_…
squito Jun 23, 2016
9684eb7
update comment
squito Jun 27, 2016
28a00db
Merge branch 'master' into progress_w_few_execs_and_blacklist
squito Jun 27, 2016
e8e4fe5
fix merge
squito Jun 27, 2016
a5f7eb4
Merge branch 'master' into progress_w_few_execs_and_blacklist
squito Jun 28, 2016
b5b0f3f
fix merge
squito Jun 28, 2016
6dd93af
update comment
squito Jun 28, 2016
f4e95c6
remove clock stuff, not necessary in these tests
squito Jun 28, 2016
860ee1a
unused import
squito Jun 28, 2016
60cd959
simplify now that SPARK-16106 is in
squito Jun 28, 2016
9665029
review feedback
squito Jun 28, 2016
96049cd
review feedback
squito Jun 29, 2016
ed71c99
change pendingTask to a val
squito Jun 29, 2016
ed413ce
simplify
squito Jun 29, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ private[spark] class TaskSchedulerImpl(
}
}
}
if (!launchedTask) {
taskSet.abortIfTaskSetCompletelyBlacklisted(executorsByHost)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on our discussion yesterday, should this use executorIdToHost instead of executorsByHost? (that would also be marginally easier to parse in abortIfTaskSetCompletelyBlacklisted)

}
return launchedTask
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ private[spark] class TaskSetManager(
* Is this re-execution of a failed task on an executor it already failed in before
* EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ?
*/
private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
private[scheduler] def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
if (failedExecutors.contains(taskId)) {
val failed = failedExecutors.get(taskId).get

Expand Down Expand Up @@ -575,6 +575,62 @@ private[spark] class TaskSetManager(
index
}

/**
* Check whether the given task set has been blacklisted to the point that it can't run anywhere.
*
* It is possible that this taskset has become impossible to schedule *anywhere* due to the
* blacklist. The most common scenario would be if there are fewer executors than
* spark.task.maxFailures. We need to detect this so we can fail the task set, otherwise the job
* will hang.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next par. is a little hard to understand. How about something like (also is the below correct? It's a little different from what you had):

There's a tradeoff here: we could make sure all tasks in the task set are schedulable, but that would add extra time to each iteration of the scheduling loop. Here, we take the approach of making sure at least one of the unscheduled tasks is schedulable. This means we may not detect the hang as quickly as we could have, but we'll always detect the hang eventually, and the method is faster in the typical case. In the worst case, this method can take O(maxTaskFailures) time, but it will be faster when there haven't been any task failures (this is because the method picks on unscheduled task, and then iterates through each executor until it finds one that the task hasn't failed on already).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, much better. Also good point that its only O(maxTaskFailures), I hadn't realized that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After further thought I think this is actually O(maxTaskFailures + # tasks) right? Since you might need to first iterate through all of the tasks to find one that hasn't started yet, and then iterate through some failed executors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah right, I forgot about the cost of pollPendingTask

* The check here is a balance between being sure to catch the issue, but not wasting
* too much time inside the scheduling loop. Just check if the last task is schedulable
* on any of the available executors. So this is O(numExecutors) worst-case, but it'll
* really be fast unless you've got a bunch of things blacklisted. Its possible it won't detect
* the unschedulable task immediately, but if it returns false, there is at least *some* task
* that is schedulable, and after scheduling all of those, we'll eventually find the unschedulable
* task.
*/
private[scheduler] def abortIfTaskSetCompletelyBlacklisted(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could shorten this name to eliminate "TaskSet" (which I think is leftover from when it was in TaskSchedulerImpl)

executorsByHost: HashMap[String, HashSet[String]]): Unit = {
// If no executors have registered yet, don't abort the stage, just wait. We probably
// got here because a task set was added before the executors registered.
if (executorsByHost.nonEmpty) {
// take any task that needs to be scheduled, and see if we can find some executor it *could*
// run on
pollPendingTask.foreach { task =>
executorsByHost.foreach { case (host, execs) =>
execs.foreach { exec =>
if (!executorIsBlacklisted(exec, task)) {
return
}
}
}
abort(s"Aborting ${taskSet} because it has a task which cannot be scheduled on any" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include the task ID here?

s" executor due to blacklists.")
}
}
}

/**
* Return some task which is pending, but do not remove it from the list of pending tasks.
* Used as a simple way to test if this task set is schedulable anywhere, or if it has been
* completely blacklisted.
*/
private def pollPendingTask: Option[Int] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you inline this in abortIfTaskSetCompletelyBlacklisted? It's a little confusing and I'd rather help prevent it from being used in other places (I think that also makes some of the commenting about why it's necessary unneeded)

// usually this will just take the last pending task, but because of the lazy removal
// from each list, we may need to go deeper in the list
var indexOffset = allPendingTasks.size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you start at the end here? Is the idea to find the task that's least likely to have been scheduled? (Can you add a brief comment to this effect?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be honest, I was mostly just copying dequeueTaskFromList. But now that I'm thinking about it, there is a tradeoff: If we check from the end of the list, we're more likely to find the unschedulable task (since failed tasks are put back on the end). If we check from the beginning, we'll find a task which passes copiesRunning(index) == 0 && !successful(index) more quickly, since the stacks of pending tasks are only lazily updated.

Given that we only run this check when we've failed to schedule any tasks, I think this version is fine, though I dont' feel very strongly about it.

while (indexOffset > 0) {
indexOffset -= 1
val index = allPendingTasks(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return Some(index)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure that we could add

else {
  // this task has already been scheduled from one of our other task queues, so remove it 
  // from this one as well, even though we're not actually scheduling anything here.
  allPendingTasks.remove(indexOffset)
}

But its shouldn't be necessary to do here, and I'm just nervous enough about adding it that I opted not to.

}
None
}

/**
* Marks the task as getting result and notifies the DAG Scheduler
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark._
class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{

val badHost = "host-0"
val duration = Duration(10, SECONDS)

/**
* This backend just always fails if the task is executed on a bad host, but otherwise succeeds
Expand Down Expand Up @@ -93,6 +94,30 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
assertDataStructuresEmpty(noFailure = true)
}

// Make sure that if we've failed on all executors, but haven't hit task.maxFailures yet, the job
// doesn't hang
testScheduler(
"SPARK-15865 Progress with fewer executors than maxTaskFailures",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure the previous flakiness in this test suite is unrelated to the test I'm adding here, but we should probably wait to merge this until after #13565 just to be safe. (at least, its unrelated to the delay-scheduling-revive-offers issue, which was the major previous flakiness -- potentially still suffers from the DAGScheduler / assertDataStructuresEmpty race, but that is quite rare.) I did run this test 5K times on my laptop and it passed (though that hasn't always been enough to discover some of the races).

extraConfs = Seq(
// set this to something much longer than the test duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add "so that executors don't get removed from the blacklist during the test"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you added this in the other test -- I know it's redundant here but still helpful for reference I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doh, I thought I had changed it everywhere earlier, sorry, done now

"spark.scheduler.executorTaskBlacklistTime" -> "10000000",
"spark.testing.nHosts" -> "2",
"spark.testing.nExecutorsPerHost" -> "1",
"spark.testing.nCoresPerExecutor" -> "1"
)
) {
def runBackend(): Unit = {
val (taskDescription, _) = backend.beginTask()
backend.taskFailed(taskDescription, new RuntimeException("test task failure"))
}
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
Await.ready(jobFuture, duration)
assert(failure.getMessage.contains("Aborting TaskSet 0.0 because it has a task which " +
"cannot be scheduled on any executor due to blacklists"))
}
assertDataStructuresEmpty(noFailure = false)
}
}

class MultiExecutorMockBackend(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import org.scalatest.BeforeAndAfterEach

import org.apache.spark._
import org.apache.spark.internal.Logging

Expand All @@ -27,18 +29,48 @@ class FakeSchedulerBackend extends SchedulerBackend {
def defaultParallelism(): Int = 1
}

class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with Logging {
class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach
with Logging {

test("Scheduler does not always schedule tasks on the same workers") {

var failedTaskSetExc: Option[Throwable] = None
var failedTaskSetReason: String = null
var failedTaskSet = false

override def beforeEach(): Unit = {
super.beforeEach()
failedTaskSet = false
failedTaskSetExc = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit but could you write this out as "Exception"? I kept mentally reading it as "Executor" -- and it doesn't look like it will throw any lines over 100

failedTaskSetReason = null
}

def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for eliminating this redundancy

sc = new SparkContext("local", "TaskSchedulerImplSuite")
confs.foreach { case (k, v) =>
sc.conf.set(k, v)
}
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {}
override def executorAdded(execId: String, host: String): Unit = {}
override def taskSetFailed(
taskSet: TaskSet,
reason: String,
exception: Option[Throwable]): Unit = {
// Normally the DAGScheduler puts this in the event loop, which will eventually fail
// dependent jobs
failedTaskSet = true
failedTaskSetReason = reason
failedTaskSetExc = exception
}
}
taskScheduler
}

test("Scheduler does not always schedule tasks on the same workers") {
val taskScheduler = setupScheduler()
val numFreeCores = 1
val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
new WorkerOffer("executor1", "host1", numFreeCores))
Expand All @@ -58,20 +90,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
val count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
assert(count > 0)
assert(count < numTrials)
assert(!failedTaskSet)
}

test("Scheduler correctly accounts for multiple CPUs per task") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskCpus = 2

sc.conf.set("spark.task.cpus", taskCpus.toString)
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}
val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
// Give zero core offers. Should not generate any tasks
val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0),
new WorkerOffer("executor1", "host1", 0))
Expand All @@ -96,22 +120,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
assert(1 === taskDescriptions.length)
assert("executor0" === taskDescriptions(0).executorId)
assert(!failedTaskSet)
}

test("Scheduler does not crash when tasks are not serializable") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskCpus = 2

sc.conf.set("spark.task.cpus", taskCpus.toString)
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
val dagScheduler = new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}
val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
val numFreeCores = 1
taskScheduler.setDAGScheduler(dagScheduler)
val taskSet = new TaskSet(
Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
Expand All @@ -121,24 +136,16 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
assert(0 === taskDescriptions.length)

// Now check that we can still submit tasks
// Even if one of the tasks has not-serializable tasks, the other task set should
// Even if one of the task sets has not-serializable tasks, the other task set should
// still be processed without error
taskScheduler.submitTasks(taskSet)
taskScheduler.submitTasks(FakeTask.createTaskSet(1))
taskScheduler.submitTasks(taskSet)
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
}

test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
val dagScheduler = new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}
taskScheduler.setDAGScheduler(dagScheduler)
val taskScheduler = setupScheduler()
val attempt1 = FakeTask.createTaskSet(1, 0)
val attempt2 = FakeTask.createTaskSet(1, 1)
taskScheduler.submitTasks(attempt1)
Expand All @@ -153,17 +160,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
taskScheduler.taskSetManagerForAttempt(attempt2.stageId, attempt2.stageAttemptId)
.get.isZombie = true
taskScheduler.submitTasks(attempt3)
assert(!failedTaskSet)
}

test("don't schedule more tasks after a taskset is zombie") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}
val taskScheduler = setupScheduler()

val numFreeCores = 1
val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores))
Expand Down Expand Up @@ -191,17 +192,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
assert(1 === taskDescriptions3.length)
val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId).get
assert(mgr.taskSet.stageAttemptId === 1)
assert(!failedTaskSet)
}

test("if a zombie attempt finishes, continue scheduling tasks for non-zombie attempts") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}
val taskScheduler = setupScheduler()

val numFreeCores = 10
val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores))
Expand Down Expand Up @@ -236,17 +231,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
val mgr = taskScheduler.taskIdToTaskSetManager.get(task.taskId).get
assert(mgr.taskSet.stageAttemptId === 1)
}
assert(!failedTaskSet)
}

test("tasks are not re-scheduled while executor loss reason is pending") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}
val taskScheduler = setupScheduler()

val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1))
val e1Offers = Seq(new WorkerOffer("executor1", "host0", 1))
Expand All @@ -272,6 +261,55 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
val taskDescriptions3 = taskScheduler.resourceOffers(e1Offers).flatten
assert(1 === taskDescriptions3.length)
assert("executor1" === taskDescriptions3(0).executorId)
assert(!failedTaskSet)
}

test("abort stage if executor loss results in unschedulability from previously failed tasks") {
// Make sure we can detect when a taskset becomes unschedulability from a blacklisting. This
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unschedulability -> unschedulable

// test explores a particular corner case -- you may have one task fail, but still be
// schedulable on another executor. However, that executor may fail later on, leaving the
// first task with no place to run.
val taskScheduler = setupScheduler(
// set this to something much longer than the test duration
"spark.scheduler.executorTaskBlacklistTime" -> "10000000"
)

val taskSet = FakeTask.createTaskSet(2)
taskScheduler.submitTasks(taskSet)
val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get

val firstTasks = taskScheduler.resourceOffers(Seq(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

firstTaskAttempts?

new WorkerOffer("executor0", "host0", 1),
new WorkerOffer("executor1", "host1", 1)
)).flatten
assert(Set("executor0", "executor1") === firstTasks.map{_.executorId}.toSet)

// fail one of the tasks, but leave the other running
val failedTask = firstTasks.find(_.executorId == "executor0").get
taskScheduler.handleFailedTask(tsm, failedTask.taskId, TaskState.FAILED, TaskResultLost)
// at this point, our failed task could run on the other executor, so don't give up the task
// set yet.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to verify !failedTaskSet here?


// but now we fail our second executor. The other task still has a failed executor it can
// run on, but our first failed task does not. So we should fail the task set.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little confusing that the comment here says "So we should fail the task set", because what happens in the next few lines is that the task set doesn't get failed. Maybe instead: "The other task can still run on executor1, so make an offer on that executor, and make sure that the other task (not the failed one) is assigned to run there"?

taskScheduler.executorLost("executor1", SlaveLost("oops"))
val nextTasks =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextTaskAttempts? Also fix indentation

taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1))).flatten
// Note: Its OK if some future change makes this already realize the taskset has become
// unschedulable at this point (though in the current implementation, we're sure it will not)
assert(nextTasks.size === 1)
assert(nextTasks.head.executorId === "executor0")
assert(nextTasks.head.attemptNumber === 1)
assert(nextTasks.head.index != failedTask.index)

// now we should definitely realize that our task set is unschedulable, because the only
// task left can't be scheduled on any executors due to the blacklist
taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1)))
sc.listenerBus.waitUntilEmpty(100000)
assert(tsm.isZombie)
assert(failedTaskSet)
assert(failedTaskSetReason.contains("Aborting TaskSet 0.0 because it has a task which cannot " +
"be scheduled on any executor due to blacklists"))
}

}