Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22123][CORE] Add latest failure reason for task set blacklist #19338

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
private val blacklistedExecs = new HashSet[String]()
private val blacklistedNodes = new HashSet[String]()

var taskSetLatestFailureReason: String = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please avoid public variables here? Also why not make it less verbose to change to latestFailureReason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have thought about public problem,but we need to get this value from TaskSetManager. If i add a def getLatestFailureReason make sense? @jerryshao


/**
* Return true if this executor is blacklisted for the given task. This does *not*
* need to return true if the executor is blacklisted for the entire stage, or blacklisted
Expand Down Expand Up @@ -94,7 +96,9 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
private[scheduler] def updateBlacklistForFailedTask(
host: String,
exec: String,
index: Int): Unit = {
index: Int,
failureReason: String): Unit = {
taskSetLatestFailureReason = failureReason
val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host))
execFailures.updateWithFailure(index, clock.getTimeMillis())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,9 @@ private[spark] class TaskSetManager(
if (blacklistedEverywhere) {
val partition = tasks(indexInTaskSet).partitionId
abort(s"Aborting $taskSet because task $indexInTaskSet (partition $partition) " +
s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior " +
s"can be configured via spark.blacklist.*.")
s"cannot run anywhere due to node and executor blacklist." +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please prettify the output message a bit. From what I saw in the PR description, it is a bit messy there.

s" Most recent failure:\n${taskSetBlacklist.taskSetLatestFailureReason}\n\n" +
s" Blacklisting behavior can be configured via spark.blacklist.*.\n\n")
}
}
}
Expand Down Expand Up @@ -837,9 +838,9 @@ private[spark] class TaskSetManager(
sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)

if (!isZombie && reason.countTowardsTaskFailures) {
taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
info.host, info.executorId, index))
assert (null != failureReason)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move the assert (null != failureReason) first, and to go along with the other change, drop the Some wrapper around failureReason.

taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
info.host, info.executorId, index, failureReason))
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
logError("Task %d in stage %s failed %d times; aborting job".format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
val taskSetBlacklist = createTaskSetBlacklist(stageId)
if (stageId % 2 == 0) {
// fail one task in every other taskset
taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
taskSetBlacklist.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 0, failureReason = "testing")
failuresSoFar += 1
}
blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
Expand All @@ -132,7 +133,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// for many different stages, executor 1 fails a task, and then the taskSet fails.
(0 until failuresUntilBlacklisted * 10).foreach { stage =>
val taskSetBlacklist = createTaskSetBlacklist(stage)
taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
taskSetBlacklist.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 0, failureReason = "testing")
}
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
}
Expand All @@ -147,7 +149,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
val numFailures = math.max(conf.get(config.MAX_FAILURES_PER_EXEC),
conf.get(config.MAX_FAILURES_PER_EXEC_STAGE))
(0 until numFailures).foreach { index =>
taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = index)
taskSetBlacklist.updateBlacklistForFailedTask(
"hostA", exec = "1", index = index, failureReason = "testing")
}
assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
Expand All @@ -170,7 +173,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
// application.
(0 until 4).foreach { partition =>
taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
taskSetBlacklist0.updateBlacklistForFailedTask(
"hostA", exec = "1", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
assert(blacklist.nodeBlacklist() === Set())
Expand All @@ -183,7 +187,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// application. Since that's the second executor that is blacklisted on the same node, we also
// blacklist that node.
(0 until 4).foreach { partition =>
taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
taskSetBlacklist1.updateBlacklistForFailedTask(
"hostA", exec = "2", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)
assert(blacklist.nodeBlacklist() === Set("hostA"))
Expand All @@ -207,7 +212,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Fail one more task, but executor isn't put back into blacklist since the count of failures
// on that executor should have been reset to 0.
val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
taskSetBlacklist2.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 0, failureReason = "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(2, 0, taskSetBlacklist2.execToFailures)
assert(blacklist.nodeBlacklist() === Set())
assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
Expand All @@ -221,7 +227,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Lets say that executor 1 dies completely. We get some task failures, but
// the taskset then finishes successfully (elsewhere).
(0 until 4).foreach { partition =>
taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
taskSetBlacklist0.updateBlacklistForFailedTask(
"hostA", exec = "1", index = partition, failureReason = "testing")
}
blacklist.handleRemovedExecutor("1")
blacklist.updateBlacklistForSuccessfulTaskSet(
Expand All @@ -236,7 +243,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Now another executor gets spun up on that host, but it also dies.
val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
(0 until 4).foreach { partition =>
taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
taskSetBlacklist1.updateBlacklistForFailedTask(
"hostA", exec = "2", index = partition, failureReason = "testing")
}
blacklist.handleRemovedExecutor("2")
blacklist.updateBlacklistForSuccessfulTaskSet(
Expand Down Expand Up @@ -279,7 +287,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M

def failOneTaskInTaskSet(exec: String): Unit = {
val taskSetBlacklist = createTaskSetBlacklist(stageId = stageId)
taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0)
taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0, "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
stageId += 1
}
Expand Down Expand Up @@ -354,12 +362,12 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
// Taskset1 has one failure immediately
taskSetBlacklist1.updateBlacklistForFailedTask("host-1", "1", 0)
taskSetBlacklist1.updateBlacklistForFailedTask("host-1", "1", 0, "testing")
// Then we have a *long* delay, much longer than the timeout, before any other failures or
// taskset completion
clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS * 5)
// After the long delay, we have one failure on taskset 2, on the same executor
taskSetBlacklist2.updateBlacklistForFailedTask("host-1", "1", 0)
taskSetBlacklist2.updateBlacklistForFailedTask("host-1", "1", 0, "testing")
// Finally, we complete both tasksets. Its important here to complete taskset2 *first*. We
// want to make sure that when taskset 1 finishes, even though we've now got two task failures,
// we realize that the task failure we just added was well before the timeout.
Expand All @@ -377,16 +385,20 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// we blacklist executors on two different hosts -- make sure that doesn't lead to any
// node blacklisting
val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 1)
taskSetBlacklist0.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 0, failureReason = "testing")
taskSetBlacklist0.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 1, failureReason = "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "1", 2))
assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())

val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 0)
taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 1)
taskSetBlacklist1.updateBlacklistForFailedTask(
"hostB", exec = "2", index = 0, failureReason = "testing")
taskSetBlacklist1.updateBlacklistForFailedTask(
"hostB", exec = "2", index = 1, failureReason = "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(1, 0, taskSetBlacklist1.execToFailures)
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2"))
verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "2", 2))
Expand All @@ -395,8 +407,10 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Finally, blacklist another executor on the same node as the original blacklisted executor,
// and make sure this time we *do* blacklist the node.
val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 0)
taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 0)
taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 1)
taskSetBlacklist2.updateBlacklistForFailedTask(
"hostA", exec = "3", index = 0, failureReason = "testing")
taskSetBlacklist2.updateBlacklistForFailedTask(
"hostA", exec = "3", index = 1, failureReason = "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures)
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2", "3"))
verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "3", 2))
Expand Down Expand Up @@ -486,7 +500,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
// application.
(0 until 4).foreach { partition =>
taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
taskSetBlacklist0.updateBlacklistForFailedTask(
"hostA", exec = "1", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)

Expand All @@ -497,7 +512,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// application. Since that's the second executor that is blacklisted on the same node, we also
// blacklist that node.
(0 until 4).foreach { partition =>
taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
taskSetBlacklist1.updateBlacklistForFailedTask(
"hostA", exec = "2", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)

Expand All @@ -512,7 +528,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
// application.
(0 until 4).foreach { partition =>
taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
taskSetBlacklist2.updateBlacklistForFailedTask(
"hostA", exec = "1", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures)

Expand All @@ -523,7 +540,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// application. Since that's the second executor that is blacklisted on the same node, we also
// blacklist that node.
(0 until 4).foreach { partition =>
taskSetBlacklist3.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
taskSetBlacklist3.updateBlacklistForFailedTask(
"hostA", exec = "2", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist3.execToFailures)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,8 +661,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(failedTaskSet)
val idx = failedTask.index
assert(failedTaskSetReason === s"Aborting TaskSet 0.0 because task $idx (partition $idx) " +
s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior can be " +
s"configured via spark.blacklist.*.")
s"cannot run anywhere due to node and executor blacklist." +
s" Most recent failure:\n" +
s"${tsm.taskSetBlacklistHelperOpt.get.taskSetLatestFailureReason}\n\n" +
s" Blacklisting behavior can be configured via spark.blacklist.*.\n\n")
}

test("don't abort if there is an executor available, though it hasn't had scheduled tasks yet") {
Expand Down
Loading