Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15865][CORE] Blacklist should not result in job hanging with less than 4 executors #13603

Closed
wants to merge 36 commits into from

Conversation

squito
Copy link
Contributor

@squito squito commented Jun 10, 2016

What changes were proposed in this pull request?

Before this change, when you turn on blacklisting with spark.scheduler.executorTaskBlacklistTime, but you have fewer than spark.task.maxFailures executors, you can end with a job "hung" after some task failures.

Whenever a taskset is unable to schedule anything on resourceOfferSingleTaskSet, we check whether the last pending task can be scheduled on any known executor. If not, the taskset (and any corresponding jobs) are failed.

  • Worst case, this is O(maxTaskFailures + numTasks). But unless many executors are bad, this should be small
  • This does not fail as fast as possible -- when a task becomes unschedulable, we keep scheduling other tasks. This is to avoid an O(numPendingTasks * numExecutors) operation
  • Also, it is conceivable this fails too quickly. You may be 1 millisecond away from unblacklisting a place for a task to run, or acquiring a new executor.

How was this patch tested?

Added unit test which failed before the change, ran new test 5k times manually, ran all scheduler tests manually, and the full suite via jenkins.

val index = allPendingTasks(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return Some(index)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure that we could add

else {
  // this task has already been scheduled from one of our other task queues, so remove it 
  // from this one as well, even though we're not actually scheduling anything here.
  allPendingTasks.remove(indexOffset)
}

But its shouldn't be necessary to do here, and I'm just nervous enough about adding it that I opted not to.

@SparkQA
Copy link

SparkQA commented Jun 10, 2016

Test build #60300 has finished for PR 13603 at commit bc80e8c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jun 10, 2016

q: was blacklist merged in 2.0?

@kayousterhout
Copy link
Contributor

@rxin no, this is an old (an un-documented) feature that was added a while ago as a band-aid until we did the more complete solution (which @squito is planning to do soon, but not targeted for 2.0).

@kayousterhout
Copy link
Contributor

@squito if it's not too painful, would you mind moving the visibility stuff to a separate PR? (I suspect that PR can be merged almost immediately!).

@kayousterhout
Copy link
Contributor

Did you consider instead doing this when a task fails (on line 761 in TaskSetManager)? Instead of just checking if the number of failures is greater than maxTaskFailures, you could add a second check (if blacklisting is enabled) that checks whether the task that just failed could be scheduled anywhere, and if it can't be, fail the task set. This seems simpler to me.

The main drawback I see in that approach is that it could be the case that the task failure was caused by an executor failure, and the cluster manager is in the process of launching a new executor that the task could run on, so it's not correct to fail the task set. My sense is that it's OK-ish to fail in that case, since that seems like it will only happen for jobs that use a super small number of executors, in which case random-ish failures are less likely, so the failure is more likely to be a real issue with the job.

@squito
Copy link
Contributor Author

squito commented Jun 13, 2016

@kayousterhout sure I'll pull the visibility stuff out.

I did consider trying to do a check on task failure instead. However, I don't think that is sufficient, because you can have an executor fail. Imagine you have task 1 on executor A & task 2 on executor B. Task 1 fails, gets blacklisted from executor A -- but it can still be scheduled on executor B so you don't fail the stage. Then executor B dies. Task 2 can run on executor A, so that isn't stuck. But task 1 now can't run anywhere.

Probably unlikely, but still having the job just hang is so bad that I think we really should avoid it. Plus it becomes much more likely w/ the new blacklisting I'm working on -- in that case, executor B gets blacklisted for the bad stage because of many task failures, and now there isn't any place for the first failed tasks to run. I actually ran into that case when testing an early iteration of that change.

This is subtle enough its probably worth codifying into a test -- I'll work on adding that.

(I agree with you that its OK to fail the task set even if a new executor is just about to launch. Even this version doesn't really avoid something like that.)

* that is schedulable, and after scheduling all of those, we'll eventually find the unschedulable
* task.
*/
private[scheduler] def isTaskSetCompletelyBlacklisted(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be cleaner to add this method to the TaskSetManager class (and then you don't need the pollPendingTask method) -- and then just pass in the executorsByHost map. That also makes things a little easier to change in the future, if there gets to be some easier way of checking if a particular task set is completely blacklisted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I put it here b/c in the blacklisting stuff I'm still working on, I felt it made more sense outside TaskSetManager, since blacklisting extends beyond a single taskset (executor & node blacklisting). But I'll change it here, and we can revisit that discussion when looking at that change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it -- agree that in the long term, given the new blacklistling, it might make sense here! But let's put it in the TaskSetManager for now.

@kayousterhout
Copy link
Contributor

Ohh good point that makes sense re: lost executors. Given that, I agree that this approach seems like the right one.

@squito squito force-pushed the progress_w_few_execs_and_blacklist branch from bc80e8c to f870bde Compare June 20, 2016 15:47
@SparkQA
Copy link

SparkQA commented Jun 20, 2016

Test build #60851 has finished for PR 13603 at commit f870bde.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented Jun 20, 2016

@kayousterhout sorry for the delays on my end, I've updated with the requested changes. The check is now inside TaskSetManager, and I added a test for the "unschedulablility" case I was describing. Though the behavior is ust in TaskSetManager, I put the test in TaskSchedulerImplSuite, because I'd like to push this test up to a slightly higher, more functional level, so its easier to refactor etc. and keep the test. But if that is opening up a larger discussion, I can just push it to TaskSetManagerSuite instead.

I also found something kinda weird about TaskSchedulerImpl I'll comment on inline, mostly unrelated to this change.

}

test("Scheduler does not crash when tasks are not serializable") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this case will also trigger a failure with the msg "Aborting TaskSet ... due to blacklist". I'm pretty sure this is a bug in TaskSchedulerImpl, but I am so shocked by it I'd like a quick sanity check. What's going on here is that no tasks have been accepted, so executorsByHost never actually adds the new executor in resourceOfferSingleTaskSet. But executorsByHost has already added the host at the beginning of resourceOffers, just not the executor.

But isn't there a bug in resourceOffers -- shouldn't that loop be updating newExecAvailable even if an executor is added to an already existing host? I expect this to actually be quite common under dynamic allocation. The end result is that locality preferences aren't properly updated, and failedEpochs aren't updated correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this looks completely wrong, and that (1) resourceOffers should mark a new executor as available more times, not just when a new host is available -- and this means the HDFS cache locality hasn't been working when folks have multiple executors and (2) resourceOffers, and not resourceOffersSingleTaskSet, should add the executor to executorsByHost. Do you have time to file a JIRA / fix this? Seems like a quick fix, and would be nice to do before this PR, because of this weird failure (which seems like something that will be user-visible, since I'm guessing it's not uncommon, for new users, that their first task isn't serializable).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opened https://issues.apache.org/jira/browse/SPARK-16106. was just surprised enough that I wanted a sanity check first :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mentioned this below as well, but just to be clear -- I was mistaken, that bug doesn't effect the case where the tasks aren't serializable. That still correctly fails with an error about serialization. The error I was encountering is in a different case ("multiple CPUs per task", since there you never add the executors, just the hosts), and still needs a workaround for now, which I've added.

@SparkQA
Copy link

SparkQA commented Jun 20, 2016

Test build #60856 has finished for PR 13603 at commit 3edb6fe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 20, 2016

Test build #60855 has finished for PR 13603 at commit 64ab7fb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}
}
abort(s"Aborting ${taskSet} because it has a task which cannot be scheduled on any" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include the task ID here?

@@ -35,7 +35,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
import org.apache.spark.util._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undo this change? (nice to have the explicit imports as long as they're short)

@SparkQA
Copy link

SparkQA commented Jun 28, 2016

Test build #61385 has finished for PR 13603 at commit 60cd959.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}
abort(s"Aborting ${taskSet} because Task $taskId (partition " +
s"${tasks(taskId).partitionId}) cannot be scheduled on any executor due to blacklists.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe include the executors here (esp. since this is something users might see)?

s"Aborting ${taskSet} because task $taskId (partition ${tasks(taskId).partitionId}) has already failed on executors ${executorsByHost.values.flatten.mkString(",")}, and no other executors are available."

@kayousterhout
Copy link
Contributor

This LGTM -- just a bunch of cosmetic suggestions

@SparkQA
Copy link

SparkQA commented Jun 28, 2016

Test build #61399 has finished for PR 13603 at commit 9665029.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// take any task that needs to be scheduled, and see if we can find some executor it *could*
// run on
pendingTask.foreach { taskId =>
executors.foreach { exec =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that executors is an iterable, just do "if (executors.find(executorIsBlacklisted(_, taskId)).isEmpty) { .. abort ...}" here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, in fact I can just use executors.forall. Sorry I keep working on the new blacklist version in between and sometimes don't see some of these obvious simplifications in this version, thanks for catching them.

*/
private[scheduler] def abortIfCompletelyBlacklisted(executors: Iterable[String]): Unit = {

def pendingTask: Option[Int] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gah sorry one more tiny thing: can this just be a val?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can't just change this to a val with nothing else, b/c of the return when we find the task. Though you could make it a val by changing the inner-logic, with a var keepGoing in the while loop or something. I actually changed it once and couldn't really make up my mind which version was cleaner ... in the end I felt an inner function wasn't so bad, but happy to change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a commit with changing it to a val, so you can see both options. easy enough to back out that last commit.

Copy link
Contributor

@kayousterhout kayousterhout Jun 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about

val pendingTask: Option[Int] = allPendingTasks.lastIndexWhere { indexInTaskSet =>
copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet)
}.map(allPendingTasks(_))

(I realize we're really in the weeds here so whatever you prefer here is fine)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I didn't even know about lastIndexWhere! thanks, simpler, and despite being a minor point I appreciate learning something new :)

sorry I think I just got on the wrong track in this while thinking about doing the lazy-removal here as well, and when I decided against it never stepped back to simplify it.

@SparkQA
Copy link

SparkQA commented Jun 29, 2016

Test build #61488 has finished for PR 13603 at commit 96049cd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 29, 2016

Test build #61497 has finished for PR 13603 at commit ed71c99.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 30, 2016

Test build #61505 has finished for PR 13603 at commit ed413ce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kayousterhout
Copy link
Contributor

LGTM!

@squito
Copy link
Contributor Author

squito commented Jun 30, 2016

merged to master. thanks Kay!

@asfgit asfgit closed this in fdf9f94 Jun 30, 2016
asfgit pushed a commit that referenced this pull request Aug 30, 2016
…elyBlacklisted

This patch addresses a minor scheduler performance issue that was introduced in #13603. If you run

```
sc.parallelize(1 to 100000, 100000).map(identity).count()
```

then most of the time ends up being spent in `TaskSetManager.abortIfCompletelyBlacklisted()`:

![image](https://cloud.githubusercontent.com/assets/50748/18071032/428732b0-6e07-11e6-88b2-c9423cd61f53.png)

When processing resource offers, the scheduler uses a nested loop which considers every task set at multiple locality levels:

```scala
   for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }
```

In order to prevent jobs with globally blacklisted tasks from hanging, #13603 added a `taskSet.abortIfCompletelyBlacklisted` call inside of  `resourceOfferSingleTaskSet`; if a call to `resourceOfferSingleTaskSet` fails to schedule any tasks, then `abortIfCompletelyBlacklisted` checks whether the tasks are completely blacklisted in order to figure out whether they will ever be schedulable. The problem with this placement of the call is that the last call to `resourceOfferSingleTaskSet` in the `while` loop will return `false`, implying that  `resourceOfferSingleTaskSet` will call `abortIfCompletelyBlacklisted`, so almost every call to `resourceOffers` will trigger the `abortIfCompletelyBlacklisted` check for every task set.

Instead, I think that this call should be moved out of the innermost loop and should be called _at most_ once per task set in case none of the task set's tasks can be scheduled at any locality level.

Before this patch's changes, the microbenchmark example that I posted above took 35 seconds to run, but it now only takes 15 seconds after this change.

/cc squito and kayousterhout for review.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #14871 from JoshRosen/bail-early-if-no-cpus.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants