Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22123][CORE] Add latest failure reason for task set blacklist #19338

Closed
wants to merge 8 commits into from

Conversation

caneGuy
Copy link
Contributor

@caneGuy caneGuy commented Sep 25, 2017

What changes were proposed in this pull request?

This patch add latest failure reason for task set blacklist.Which can be showed on spark ui and let user know failure reason directly.
Till now , every job which aborted by completed blacklist just show log like below which has no more information:
Aborting $taskSet because task $indexInTaskSet (partition $partition) cannot run anywhere due to node and executor blacklist. Blacklisting behavior cannot run anywhere due to node and executor blacklist.Blacklisting behavior can be configured via spark.blacklist.*."
After modify:

Aborting TaskSet 0.0 because task 0 (partition 0)
cannot run anywhere due to node and executor blacklist.
Most recent failure:
Some(Lost task 0.1 in stage 0.0 (TID 3,xxx, executor 1): java.lang.Exception: Fake error!
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:73)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:305)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
). 

Blacklisting behavior can be configured via spark.blacklist.*.

How was this patch tested?

Unit test and manually test.

@caneGuy
Copy link
Contributor Author

caneGuy commented Sep 25, 2017

@squito Could you help review this?

@squito
Copy link
Contributor

squito commented Sep 25, 2017

@caneGuy thanks for working on this, looks very reasonable to me, I am going to take a closer look at a couple of details. But can you make a couple of updates in the meantime:

  1. Can you open a new jira for this, and put that in the commit summary? SPARK-21539 is referring to something else entirely
  2. Can you reformat the new exception to look a bit more like the formatting for when there are too many failures of a specific task? maybe like this:
User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Aborting TaskSet 0.0 because task 0 (partition 0) cannot run anywhere due to node and executor blacklist. Most recent failure:
Lost task 0.1 in stage 0.0 (TID 3,xxx, executor 1): java.lang.Exception: Fake error!
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:73)
 at org.apache.spark.scheduler.Task.run(Task.scala:99)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:305)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745) 

Blacklisting behavior can be configured via spark.blacklist.*.

Driver Stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1458)
...

@squito
Copy link
Contributor

squito commented Sep 25, 2017

Jenkins, ok to test

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @caneGuy overall looks good, just some minor stuff to fix.

@@ -94,7 +96,9 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
private[scheduler] def updateBlacklistForFailedTask(
host: String,
exec: String,
index: Int): Unit = {
index: Int,
failureReason: Option[String] = None): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failureReason should always be present in this call, so it shouldn't be an Option as an arg to this method.

(I realize this is a bit of a pain as you have to modify all the call sites in tests, sorry about that).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually , you are right.For feature completion i should modify this.

@@ -838,7 +840,7 @@ private[spark] class TaskSetManager(

if (!isZombie && reason.countTowardsTaskFailures) {
taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
info.host, info.executorId, index))
info.host, info.executorId, index, Some(failureReason)))
assert (null != failureReason)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move the assert (null != failureReason) first, and to go along with the other change, drop the Some wrapper around failureReason.

@SparkQA
Copy link

SparkQA commented Sep 25, 2017

Test build #82152 has finished for PR 19338 at commit 8134142.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@caneGuy
Copy link
Contributor Author

caneGuy commented Sep 26, 2017

Thanks for your time @squito I will open an other jira for this pr.And update code as soon as possible.

@caneGuy caneGuy changed the title [SPARK-21539][CORE] Add latest failure reason for task set blacklist [SPARK-22123][CORE] Add latest failure reason for task set blacklist Sep 26, 2017
@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82169 has finished for PR 19338 at commit 57190ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -671,8 +671,9 @@ private[spark] class TaskSetManager(
if (blacklistedEverywhere) {
val partition = tasks(indexInTaskSet).partitionId
abort(s"Aborting $taskSet because task $indexInTaskSet (partition $partition) " +
s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior " +
s"can be configured via spark.blacklist.*.")
s"cannot run anywhere due to node and executor blacklist." +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please prettify the output message a bit. From what I saw in the PR description, it is a bit messy there.

@@ -61,6 +61,8 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
private val blacklistedExecs = new HashSet[String]()
private val blacklistedNodes = new HashSet[String]()

var taskSetLatestFailureReason: String = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please avoid public variables here? Also why not make it less verbose to change to latestFailureReason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have thought about public problem,but we need to get this value from TaskSetManager. If i add a def getLatestFailureReason make sense? @jerryshao

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82190 has finished for PR 19338 at commit 2147450.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@caneGuy can you update the PR description to match the new formatting of the error msg?

@jerryshao I think this is fine, do you have more concerns?

* Get the most recent failure reason of this TaskSet.
* @return
*/
def getLatestFailureReason: String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao the whole class is private[scheduler], so I think is OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito yes from scope level it is fine. My thought is that this exposes the class member to other class unnecessarily. Yeah it is not a big deal, just my personal preference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jerryshao @squito Could you help trigger an other jenkins test?Since last one has pySpark failure.

@caneGuy
Copy link
Contributor Author

caneGuy commented Sep 27, 2017

Thanks @squito i have updated the description

@@ -671,8 +671,10 @@ private[spark] class TaskSetManager(
if (blacklistedEverywhere) {
val partition = tasks(indexInTaskSet).partitionId
abort(s"Aborting $taskSet because task $indexInTaskSet (partition $partition) " +
s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior " +
s"can be configured via spark.blacklist.*.")
s"cannot run anywhere due to node and executor blacklist.\n" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change to Scala's triple quoted string interpolation here? you can refer to here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice @jerryshao i will update as soon as possible.

|Aborting $taskSet because task $indexInTaskSet (partition $partition)
|cannot run anywhere due to node and executor blacklist.
|Most recent failure:
|${taskSetBlacklist.getLatestFailureReason}\n
Copy link
Contributor

@jerryshao jerryshao Sep 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to add "\n", and this "\n" will not be escaped in triple quoted format. Please trying to get understand the basic of this API before modifying it.

Also verified in locally before pushing a new commit, and update the PR description to reflect your new format accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually i tested locally with something like below:

scala> val s = s"""
     | sss\n
     | sss"""
scala> print(s)

sss

sss

And i found that there is no need to add \n since triple quoted designed to avoid such character.Sorry for that,and i have updated before your newest comment .Thanks @jerryshao

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure?

scala> val s = """fdsafdsa\n\nfdsafdsa"""
s: String = fdsafdsa\n\nfdsafdsa

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM, looks like whether using string interpolation the result is different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is my fault,scala version affected.My default scala is 2.10.4.And below is the result i tested with scala 2.11.2:

scala> val s="""ss\nss
     | sss\n"""
s: String =
ss\nss
sss\n

scala> print(s)
ss\nss
sss\n

@SparkQA
Copy link

SparkQA commented Sep 27, 2017

Test build #82215 has finished for PR 19338 at commit 4d906b7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 27, 2017

Test build #82217 has finished for PR 19338 at commit 05adc2a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor

There's one related test failure, can you please check.

@SparkQA
Copy link

SparkQA commented Sep 27, 2017

Test build #82220 has finished for PR 19338 at commit 9a2d7ba.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Sep 27, 2017

Test build #82226 has finished for PR 19338 at commit 9a2d7ba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val pattern = ("Aborting TaskSet 0.0 because task .* " +
"cannot run anywhere due to node and executor blacklist").r
val pattern = (s"""
|Aborting TaskSet 0.0 because task .*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here it should be two space indent, also for the below line.

@SparkQA
Copy link

SparkQA commented Sep 27, 2017

Test build #82241 has finished for PR 19338 at commit d01e112.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor

lGTM, merging to master.

@asfgit asfgit closed this in 3b117d6 Sep 28, 2017
@caneGuy caneGuy deleted the zhoukang/improve-blacklist branch September 28, 2017 01:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants