Skip to content

Commit

Permalink
[SPARK-22123][CORE] Add latest failure reason for task set blacklist
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
This patch add latest failure reason for task set blacklist.Which can be showed on spark ui and let user know failure reason directly.
Till now , every job which aborted by completed blacklist just show log like below which has no more information:
`Aborting $taskSet because task $indexInTaskSet (partition $partition) cannot run anywhere due to node and executor blacklist.  Blacklisting behavior cannot run anywhere due to node and executor blacklist.Blacklisting behavior can be configured via spark.blacklist.*."`
**After modify:**
```
Aborting TaskSet 0.0 because task 0 (partition 0)
cannot run anywhere due to node and executor blacklist.
Most recent failure:
Some(Lost task 0.1 in stage 0.0 (TID 3,xxx, executor 1): java.lang.Exception: Fake error!
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:73)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:305)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
).

Blacklisting behavior can be configured via spark.blacklist.*.

```

## How was this patch tested?

Unit test and manually test.

Author: zhoukang <zhoukang199191@gmail.com>

Closes #19338 from caneGuy/zhoukang/improve-blacklist.
  • Loading branch information
caneGuy authored and jerryshao committed Sep 28, 2017
1 parent 7bf4da8 commit 3b117d6
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
private val blacklistedExecs = new HashSet[String]()
private val blacklistedNodes = new HashSet[String]()

private var latestFailureReason: String = null

/**
* Get the most recent failure reason of this TaskSet.
* @return
*/
def getLatestFailureReason: String = {
latestFailureReason
}

/**
* Return true if this executor is blacklisted for the given task. This does *not*
* need to return true if the executor is blacklisted for the entire stage, or blacklisted
Expand Down Expand Up @@ -94,7 +104,9 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
private[scheduler] def updateBlacklistForFailedTask(
host: String,
exec: String,
index: Int): Unit = {
index: Int,
failureReason: String): Unit = {
latestFailureReason = failureReason
val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host))
execFailures.updateWithFailure(index, clock.getTimeMillis())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,9 +670,14 @@ private[spark] class TaskSetManager(
}
if (blacklistedEverywhere) {
val partition = tasks(indexInTaskSet).partitionId
abort(s"Aborting $taskSet because task $indexInTaskSet (partition $partition) " +
s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior " +
s"can be configured via spark.blacklist.*.")
abort(s"""
|Aborting $taskSet because task $indexInTaskSet (partition $partition)
|cannot run anywhere due to node and executor blacklist.
|Most recent failure:
|${taskSetBlacklist.getLatestFailureReason}
|
|Blacklisting behavior can be configured via spark.blacklist.*.
|""".stripMargin)
}
}
}
Expand Down Expand Up @@ -837,9 +842,9 @@ private[spark] class TaskSetManager(
sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)

if (!isZombie && reason.countTowardsTaskFailures) {
taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
info.host, info.executorId, index))
assert (null != failureReason)
taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
info.host, info.executorId, index, failureReason))
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
logError("Task %d in stage %s failed %d times; aborting job".format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
awaitJobTermination(jobFuture, duration)
val pattern = ("Aborting TaskSet 0.0 because task .* " +
"cannot run anywhere due to node and executor blacklist").r
val pattern = (
s"""|Aborting TaskSet 0.0 because task .*
|cannot run anywhere due to node and executor blacklist""".stripMargin).r
assert(pattern.findFirstIn(failure.getMessage).isDefined,
s"Couldn't find $pattern in ${failure.getMessage()}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
val taskSetBlacklist = createTaskSetBlacklist(stageId)
if (stageId % 2 == 0) {
// fail one task in every other taskset
taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
taskSetBlacklist.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 0, failureReason = "testing")
failuresSoFar += 1
}
blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
Expand All @@ -132,7 +133,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// for many different stages, executor 1 fails a task, and then the taskSet fails.
(0 until failuresUntilBlacklisted * 10).foreach { stage =>
val taskSetBlacklist = createTaskSetBlacklist(stage)
taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
taskSetBlacklist.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 0, failureReason = "testing")
}
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
}
Expand All @@ -147,7 +149,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
val numFailures = math.max(conf.get(config.MAX_FAILURES_PER_EXEC),
conf.get(config.MAX_FAILURES_PER_EXEC_STAGE))
(0 until numFailures).foreach { index =>
taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = index)
taskSetBlacklist.updateBlacklistForFailedTask(
"hostA", exec = "1", index = index, failureReason = "testing")
}
assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
Expand All @@ -170,7 +173,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
// application.
(0 until 4).foreach { partition =>
taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
taskSetBlacklist0.updateBlacklistForFailedTask(
"hostA", exec = "1", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
assert(blacklist.nodeBlacklist() === Set())
Expand All @@ -183,7 +187,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// application. Since that's the second executor that is blacklisted on the same node, we also
// blacklist that node.
(0 until 4).foreach { partition =>
taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
taskSetBlacklist1.updateBlacklistForFailedTask(
"hostA", exec = "2", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)
assert(blacklist.nodeBlacklist() === Set("hostA"))
Expand All @@ -207,7 +212,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Fail one more task, but executor isn't put back into blacklist since the count of failures
// on that executor should have been reset to 0.
val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
taskSetBlacklist2.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 0, failureReason = "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(2, 0, taskSetBlacklist2.execToFailures)
assert(blacklist.nodeBlacklist() === Set())
assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
Expand All @@ -221,7 +227,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Lets say that executor 1 dies completely. We get some task failures, but
// the taskset then finishes successfully (elsewhere).
(0 until 4).foreach { partition =>
taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
taskSetBlacklist0.updateBlacklistForFailedTask(
"hostA", exec = "1", index = partition, failureReason = "testing")
}
blacklist.handleRemovedExecutor("1")
blacklist.updateBlacklistForSuccessfulTaskSet(
Expand All @@ -236,7 +243,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Now another executor gets spun up on that host, but it also dies.
val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
(0 until 4).foreach { partition =>
taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
taskSetBlacklist1.updateBlacklistForFailedTask(
"hostA", exec = "2", index = partition, failureReason = "testing")
}
blacklist.handleRemovedExecutor("2")
blacklist.updateBlacklistForSuccessfulTaskSet(
Expand Down Expand Up @@ -279,7 +287,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M

def failOneTaskInTaskSet(exec: String): Unit = {
val taskSetBlacklist = createTaskSetBlacklist(stageId = stageId)
taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0)
taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0, "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
stageId += 1
}
Expand Down Expand Up @@ -354,12 +362,12 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
// Taskset1 has one failure immediately
taskSetBlacklist1.updateBlacklistForFailedTask("host-1", "1", 0)
taskSetBlacklist1.updateBlacklistForFailedTask("host-1", "1", 0, "testing")
// Then we have a *long* delay, much longer than the timeout, before any other failures or
// taskset completion
clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS * 5)
// After the long delay, we have one failure on taskset 2, on the same executor
taskSetBlacklist2.updateBlacklistForFailedTask("host-1", "1", 0)
taskSetBlacklist2.updateBlacklistForFailedTask("host-1", "1", 0, "testing")
// Finally, we complete both tasksets. Its important here to complete taskset2 *first*. We
// want to make sure that when taskset 1 finishes, even though we've now got two task failures,
// we realize that the task failure we just added was well before the timeout.
Expand All @@ -377,16 +385,20 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// we blacklist executors on two different hosts -- make sure that doesn't lead to any
// node blacklisting
val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 1)
taskSetBlacklist0.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 0, failureReason = "testing")
taskSetBlacklist0.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 1, failureReason = "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "1", 2))
assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())

val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 0)
taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 1)
taskSetBlacklist1.updateBlacklistForFailedTask(
"hostB", exec = "2", index = 0, failureReason = "testing")
taskSetBlacklist1.updateBlacklistForFailedTask(
"hostB", exec = "2", index = 1, failureReason = "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(1, 0, taskSetBlacklist1.execToFailures)
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2"))
verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "2", 2))
Expand All @@ -395,8 +407,10 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Finally, blacklist another executor on the same node as the original blacklisted executor,
// and make sure this time we *do* blacklist the node.
val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 0)
taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 0)
taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 1)
taskSetBlacklist2.updateBlacklistForFailedTask(
"hostA", exec = "3", index = 0, failureReason = "testing")
taskSetBlacklist2.updateBlacklistForFailedTask(
"hostA", exec = "3", index = 1, failureReason = "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures)
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2", "3"))
verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "3", 2))
Expand Down Expand Up @@ -486,7 +500,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
// application.
(0 until 4).foreach { partition =>
taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
taskSetBlacklist0.updateBlacklistForFailedTask(
"hostA", exec = "1", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)

Expand All @@ -497,7 +512,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// application. Since that's the second executor that is blacklisted on the same node, we also
// blacklist that node.
(0 until 4).foreach { partition =>
taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
taskSetBlacklist1.updateBlacklistForFailedTask(
"hostA", exec = "2", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)

Expand All @@ -512,7 +528,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
// application.
(0 until 4).foreach { partition =>
taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
taskSetBlacklist2.updateBlacklistForFailedTask(
"hostA", exec = "1", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures)

Expand All @@ -523,7 +540,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// application. Since that's the second executor that is blacklisted on the same node, we also
// blacklist that node.
(0 until 4).foreach { partition =>
taskSetBlacklist3.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
taskSetBlacklist3.updateBlacklistForFailedTask(
"hostA", exec = "2", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist3.execToFailures)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,9 +660,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(tsm.isZombie)
assert(failedTaskSet)
val idx = failedTask.index
assert(failedTaskSetReason === s"Aborting TaskSet 0.0 because task $idx (partition $idx) " +
s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior can be " +
s"configured via spark.blacklist.*.")
assert(failedTaskSetReason === s"""
|Aborting $taskSet because task $idx (partition $idx)
|cannot run anywhere due to node and executor blacklist.
|Most recent failure:
|${tsm.taskSetBlacklistHelperOpt.get.getLatestFailureReason}
|
|Blacklisting behavior can be configured via spark.blacklist.*.
|""".stripMargin)
}

test("don't abort if there is an executor available, though it hasn't had scheduled tasks yet") {
Expand Down
Loading

0 comments on commit 3b117d6

Please sign in to comment.