Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17675] [CORE] Expand Blacklist for TaskSets #15249

Closed
wants to merge 65 commits into from

Conversation

squito
Copy link
Contributor

@squito squito commented Sep 27, 2016

What changes were proposed in this pull request?

This is a step along the way to SPARK-8425.

To enable incremental review, the first step proposed here is to expand the blacklisting within tasksets. In particular, this will enable blacklisting for

  • (task, executor) pairs (this already exists via an undocumented config)
  • (task, node)
  • (taskset, executor)
  • (taskset, node)

Adding (task, node) is critical to making spark fault-tolerant of one-bad disk in a cluster, without requiring careful tuning of "spark.task.maxFailures". The other additions are also important to avoid many misleading task failures and long scheduling delays when there is one bad node on a large cluster.

Note that some of the code changes here aren't really required for just this -- they put pieces in place for SPARK-8425 even though they are not used yet (eg. the BlacklistTracker helper is a little out of place, TaskSetBlacklist holds onto a little more info than it needs to for just this change, and ExecutorFailuresInTaskSet is more complex than it needs to be).

How was this patch tested?

Added unit tests, run tests via jenkins.

wei-mao-intel and others added 30 commits July 6, 2016 16:52
1. create new BlacklistTracker and BlacklistStrategy interface to
support
complex use case for blacklist mechanism.
2. make Yarn allocator aware of node blacklist information
3. three strategies implemented for convenience, also user can define
his own strategy
SingleTaskStrategy: remain default behavior before this change.
AdvanceSingleTaskStrategy: enhance SingleTaskStrategy by supporting
stage level node blacklist
ExecutorAndNodeStrategy: different taskSet can share blacklist
information.
@squito
Copy link
Contributor Author

squito commented Oct 6, 2016

@tgravescs yeah there is an interaction w/ locality, but I think it can wait for a follow up. This was in the design doc in the follow up section, though I didn't file a jira for it.

Delay-Scheduling takes blacklisting into account. If all executors at the preferred level are blacklisted, don’t wait and immediately move on to the next level.

@mridulm
Copy link
Contributor

mridulm commented Oct 6, 2016

@kayousterhout

Agree with (1) - permanent blacklist will effectively work the same way for executor shutdown.

Re(2) - A task failure is not necessarily only due to resource restriction or (1) : it could also be a byzantine failure, interaction (not necessarily contention) with other tasks running on the executor/node, issues up/down the stack (particularly MT-safety), external library issues, etc.

If it is recoverable, then a timeout + retry will alleviate it without needing computation on a different executor/node.
If it is not recoverable (within reasonable time) then current logic of permanent blacklist works.

Unfortunately, determining which is the problem. As @tgravescs mentioned, a resource contention can be a long lived issue as well at times.

Ideally, if blacklist timeout is < scheduler delay, then retry can help - if not, it depends on job characterstics (how many partitions, etc).

@mridulm
Copy link
Contributor

mridulm commented Oct 6, 2016

@tgravescs re(1): It was typically observed when yarn is killing the executor.
Usually when it run over the memory limits (not sure if it was happening during pre-emption also).

@tgravescs
Copy link
Contributor

For preemption Spark is not counting those as task failures anymore.

So I'm not sure if we decided on what to do. Are we leaving the old functionality as is or adding a new config for time between attempt retries or other?

@kayousterhout
Copy link
Contributor

@tgravescs no decision here yet.

@mridulm the main question for (2), though, is are the consequences a deal-breaker? It doesn't seem disastrous if a task needs to run on a non-local machine instead of getting re-tried on a machine where it already failed but might succeed later on. Also, it seems likely that the task has a higher probability of completing sooner if it runs on another machine compared to re-running (after a delay) on a machine where it already failed. What are the situations you're most concerned about with the new approach?

If we leave the existing mechanism in, one concern (besides the additional complexity) is the interaction between the new host-level blacklisting and the old executor-level blacklisting. There could be a scenario where the executor-level timeout keeps tasks from getting re-tried on the same executor for some period of time, so they run on other executors on the same host, which causes the host to be permanently blacklisted, so the fact that the executor blacklist would eventually re-allow the task is irrelevant. I think we'd need to change the old executor blacklist timeout to be a host blacklist timeout for this to work.

@squito
Copy link
Contributor Author

squito commented Oct 6, 2016

to be clear, when I proposed leaving the old feature in place, my intent was not to make them interact nicely at all. you wouldn't even be able to use the two features together. The idea was just to not break old use-cases, if we decided it was really important to still support. Users that really needed that old feature would still have it, but they couldn't take advantage of new blacklisting until they moved off of it.

Definitely not ideal, but I think it would be OK just b/c the old feature wasn't documented at all. We'd obviously need to follow up with the right fix for that use-case so that they were compatible. But we wouldn't need to rush the "right" fix.

(I also feel like its OK to just drop the old feature completely, and deal w/ the small performance regression in these cases.)

@mridulm
Copy link
Contributor

mridulm commented Oct 6, 2016

@squito I am hoping we can remove the old code/functionality actually (it is klunky very specific to single executor resource contention/shutdown usecase - unfortunately common enough to warrant its introduction), and subsume it with a better design/impl - perhaps as part of your work (in this and other pr's).

@kayousterhout I believe my concern with (2) is that the blacklist is (currently) permanent for task/taskset on an executor/node. For jobs running on larger number of executors, this will perhaps not be too much of an issue (other than a degradation in performance); but as the executor/node count decreases, we increase probability of job failures even if the transient failures are recoverable.

@SparkQA
Copy link

SparkQA commented Oct 6, 2016

Test build #66462 has finished for PR 15249 at commit 34eff27.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kayousterhout
Copy link
Contributor

@mridulm re: job failures, can you elaborate on the job failure scenario you're concerned about?

Jobs can only fail when some tasks are unschedulable, which can happen if a task is permanently blacklisted on all available nodes. This can only happen when the number of nodes is smaller than the maximum number of failures for a particular tax attempt, and also seems like it's very similar to existing behavior: currently, if a task is blacklisted (even though the blacklist is temporary) on all nodes, the job will be failed (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L595).

@mridulm
Copy link
Contributor

mridulm commented Oct 7, 2016

If I understood the change correctly, a node can get blacklisted for a
taskset if sufficient (even different) tasks fail on executers on it.
Which can potentially cause all nodes to be blacklisted.

Or do you think this is contrived scenario that can't occur in practice? I
don't have sufficient context for motivating usecases/scenarios for this change

On Oct 6, 2016 3:54 PM, "Kay Ousterhout" notifications@github.com wrote:

@mridulm https://github.com/mridulm re: job failures, can you elaborate
on the job failure scenario you're concerned about?

Jobs can only fail when some tasks are unschedulable, which can happen if
a task is permanently blacklisted on all available nodes. This can only
happen when the number of nodes is smaller than the maximum number of
failures for a particular tax attempt, and also seems like it's very
similar to existing behavior: currently, if a task is blacklisted (even
though the blacklist is temporary) on all nodes, the job will be failed (
https://github.com/apache/spark/blob/master/core/src/
main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L595).


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#15249 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/ABhJlJRsIFR8Ng_USz1B-TmOZ3cOY1tcks5qxXwXgaJpZM4KHLvv
.

@mridulm
Copy link
Contributor

mridulm commented Oct 7, 2016

Thinking more, and based on what @squito mentioned, I was considering the following :

Since we are primarily dealing with executor or nodes which are 'bad' as opposed to recoverable failures due to resource contention, prevention of degenerate corner cases which existing blacklist is for, etc :

Can we assume a successful task execution on a node will imply healthy node ?
What about at executor level ?

Proposal is to keep the pr as is for the most part, but :

  • Clear nodeToExecsWithFailures when an task on an node succeeds. Same for nodeToBlacklistedTaskIndexes.
  • Not sure if we want to reset execToFailures for an executor (not clearing would imply we are handling resource starvation case implicitly imo).
  • If possible - allow for speculative tasks to be scheduled on blacklisted nodes/executors if it is possible for countTowardsTaskFailures to be overriden to false in those cases (if not, ignore this - since it will add towards number of failures per app).

The rationale behind this is that successful tasks indicate past failures were not indicative of bad nodes/executors, but rather transient failures. And speculative tasks also sort of work as probe tasks to determine if the node/executor has recovered and is healthy.
With this, the timeout handling might become unnecessary.

I hope I am not missing anything - any thoughts @squito, @kayousterhout, @tgravescs ?

@squito
Copy link
Contributor Author

squito commented Oct 7, 2016

@mridulm we had considered that approach earlier on as well -- I don't think it works because you can also have resources which are not totally broken, but are flaky for a long period of time. Simplest example is one bad disk out of many; some tasks may succeed though a bunch will fail. I've seen users hit this. But could be even more nuanced even, eg. a bad sector, flaky network connection, etc.

Because of those cases, its intentional that in this implementation, one success does not un-blacklist anything.

@tgravescs
Copy link
Contributor

The question to me comes down to how many and how often do you expect temporary resource issues. At some point if its just from that much skew you should probably fix your configs and it would be by luck if you work or don't (based on other tasks finishing in executor) before you hit the max task failures. If its transient temporary network issues, retry could work, if they are long lived network issue I expect a lot of things to fail.

Unfortunately I don't have enough data on how often this happens or what exactly happens on Spark to know what would definitely help. I know on MR that allowing multiple attempts on the same node sometimes works for at least some of these temporary conditions. I would think the same would apply to Spark although as mentioned the difference might be in the re-launch container time vs just send task retry. Perhaps we should just change the defaults to allow more then one task attempt per executor and accordingly increase maxTaskAttemptsPerNode. then lets run with it and see what happens so we can get some more data and enhance from there. If we want to be paranoid, leave the existing blacklisting functionality in as a fallback in case new doesn't work.

Also if we are worried about resource limits, ie we blacklist to many executors/nodes for to long then perhaps we should add in a fail safe that is like a max % blacklisted. Once you hit that percent you don't blacklist anymore.

Copy link
Contributor

@kayousterhout kayousterhout left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

This looks awesome! One last request, in addition to the tiny comments: can you run a quick benchmark like the one Josh ran in #14871 to make sure that the changes to abortIfCompletelyBlacklisted aren't causing a performance regression (if you haven't already)?

@@ -108,7 +109,9 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
}

test("repeatedly failing task") {
sc = new SparkContext(clusterUrl, "test")
val conf = new SparkConf().setAppName("test").setMaster(clusterUrl)
.set(BLACKLIST_ENABLED, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the changes in this file are needed given the false default

override def testScheduler(name: String)(body: => Unit): Unit = {
// in these simple tests, we only have one executor, so it doens't make sense to turn on the
// blacklist.
testScheduler(name, extraConfs = Seq(BLACKLIST_ENABLED.key -> "false"))(body)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think you need this with the new false default?

</td>
</tr>
<tr>
<td><code>spark.blacklist.timeout</code></td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you eliminate the timeout description here, since it's not used yet?

@kayousterhout
Copy link
Contributor

kayousterhout commented Oct 7, 2016

@tgravescs @mridulm To avoid being stuck in analysis paralysis for this feature, I'd propose the following:

(1) We merge this PR. I think we're mostly in agreement that the behavior here is, for the most part, a big step in the right direction. There are some cases Mridul brought up where there is a concern about being too eager in permanently blacklisting hosts / executors, but it seems like we don't know if (/ don't think!) the scenarios where this is problematic are very common.

(2) Someone (Imran?) email the dev list describing the new functionality (and change from the old behavior) and asking folks who are running large clusters (and see failures more often) to try it out. I think having folks try this out will be more helpful than us guessing when it will and won't be useful. We can use the feedback to refine the behavior and decide which of the many proposed improvements are most important.

We've been discussing pros and cons of this approach for quite a while, and I don't think we're ever going to get to a point where we think the approach is perfect. I think we should expect to iterate on this more in the future as folks use it.

Thoughts?

@mridulm
Copy link
Contributor

mridulm commented Oct 7, 2016

As I mentioned before, this is definitely a huge step in the right direction !

Having said that, I want to ensure we dont aggressively blacklist executors and nodes - at scale, I have seen enough tasks fail which are completely recoverable on retry. I dont have access to those jobs or infra anymore - so cant run a validation run unfortunately.

If the consensus is that we can risk the change and get broader testing to iron out the kinks, I am fine with that.

@SparkQA
Copy link

SparkQA commented Oct 10, 2016

Test build #66663 has finished for PR 15249 at commit c805a0b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented Oct 11, 2016

@kayousterhout good idea about running performance tests, I hadn't run them on a recent rev. I confirmed that the issue in #14871 was no longer present (just to be sure, I also ran a test where I re-introduced the issue, and did see a big drop performance).

I agree with your assessments of the current situation, and appreciate you driving this forward. I'll send an email to the dev list shortly -- please add anything I may overlooked.

thanks!

@SparkQA
Copy link

SparkQA commented Oct 12, 2016

Test build #66820 has finished for PR 15249 at commit 445cc97.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 12, 2016

Test build #66822 has finished for PR 15249 at commit 4501e6c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 9ce7d3e Oct 12, 2016
@squito
Copy link
Contributor Author

squito commented Oct 12, 2016

merged to master, thanks everyone

@kayousterhout
Copy link
Contributor

Awesome nice work!! Exciting to see this in! Let me know when the other component, which blacklists across different stages, is ready for review.

uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

This is a step along the way to SPARK-8425.

To enable incremental review, the first step proposed here is to expand the blacklisting within tasksets. In particular, this will enable blacklisting for
* (task, executor) pairs (this already exists via an undocumented config)
* (task, node)
* (taskset, executor)
* (taskset, node)

Adding (task, node) is critical to making spark fault-tolerant of one-bad disk in a cluster, without requiring careful tuning of "spark.task.maxFailures". The other additions are also important to avoid many misleading task failures and long scheduling delays when there is one bad node on a large cluster.

Note that some of the code changes here aren't really required for just this -- they put pieces in place for SPARK-8425 even though they are not used yet (eg. the `BlacklistTracker` helper is a little out of place, `TaskSetBlacklist` holds onto a little more info than it needs to for just this change, and `ExecutorFailuresInTaskSet` is more complex than it needs to be).

## How was this patch tested?

Added unit tests, run tests via jenkins.

Author: Imran Rashid <irashid@cloudera.com>
Author: mwws <wei.mao@intel.com>

Closes apache#15249 from squito/taskset_blacklist_only.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants