From 3b117d631e1ff387b70ed8efba229594f4594db5 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 28 Sep 2017 09:25:21 +0800 Subject: [PATCH] [SPARK-22123][CORE] Add latest failure reason for task set blacklist ## What changes were proposed in this pull request? This patch add latest failure reason for task set blacklist.Which can be showed on spark ui and let user know failure reason directly. Till now , every job which aborted by completed blacklist just show log like below which has no more information: `Aborting $taskSet because task $indexInTaskSet (partition $partition) cannot run anywhere due to node and executor blacklist. Blacklisting behavior cannot run anywhere due to node and executor blacklist.Blacklisting behavior can be configured via spark.blacklist.*."` **After modify:** ``` Aborting TaskSet 0.0 because task 0 (partition 0) cannot run anywhere due to node and executor blacklist. Most recent failure: Some(Lost task 0.1 in stage 0.0 (TID 3,xxx, executor 1): java.lang.Exception: Fake error! at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:73) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:305) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ). Blacklisting behavior can be configured via spark.blacklist.*. ``` ## How was this patch tested? Unit test and manually test. Author: zhoukang Closes #19338 from caneGuy/zhoukang/improve-blacklist. --- .../spark/scheduler/TaskSetBlacklist.scala | 14 ++++- .../spark/scheduler/TaskSetManager.scala | 15 +++-- .../scheduler/BlacklistIntegrationSuite.scala | 5 +- .../scheduler/BlacklistTrackerSuite.scala | 60 ++++++++++++------- .../scheduler/TaskSchedulerImplSuite.scala | 11 +++- .../scheduler/TaskSetBlacklistSuite.scala | 45 +++++++++----- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- 7 files changed, 104 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala index e815b7e0cf6c9..233781f3d9719 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala @@ -61,6 +61,16 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int, private val blacklistedExecs = new HashSet[String]() private val blacklistedNodes = new HashSet[String]() + private var latestFailureReason: String = null + + /** + * Get the most recent failure reason of this TaskSet. + * @return + */ + def getLatestFailureReason: String = { + latestFailureReason + } + /** * Return true if this executor is blacklisted for the given task. This does *not* * need to return true if the executor is blacklisted for the entire stage, or blacklisted @@ -94,7 +104,9 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int, private[scheduler] def updateBlacklistForFailedTask( host: String, exec: String, - index: Int): Unit = { + index: Int, + failureReason: String): Unit = { + latestFailureReason = failureReason val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host)) execFailures.updateWithFailure(index, clock.getTimeMillis()) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 3804ea863b4f9..bb867416a4fac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -670,9 +670,14 @@ private[spark] class TaskSetManager( } if (blacklistedEverywhere) { val partition = tasks(indexInTaskSet).partitionId - abort(s"Aborting $taskSet because task $indexInTaskSet (partition $partition) " + - s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior " + - s"can be configured via spark.blacklist.*.") + abort(s""" + |Aborting $taskSet because task $indexInTaskSet (partition $partition) + |cannot run anywhere due to node and executor blacklist. + |Most recent failure: + |${taskSetBlacklist.getLatestFailureReason} + | + |Blacklisting behavior can be configured via spark.blacklist.*. + |""".stripMargin) } } } @@ -837,9 +842,9 @@ private[spark] class TaskSetManager( sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info) if (!isZombie && reason.countTowardsTaskFailures) { - taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask( - info.host, info.executorId, index)) assert (null != failureReason) + taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask( + info.host, info.executorId, index, failureReason)) numFailures(index) += 1 if (numFailures(index) >= maxTaskFailures) { logError("Task %d in stage %s failed %d times; aborting job".format( diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index f6015cd51c2bd..d3bbfd11d406d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -115,8 +115,9 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM withBackend(runBackend _) { val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray) awaitJobTermination(jobFuture, duration) - val pattern = ("Aborting TaskSet 0.0 because task .* " + - "cannot run anywhere due to node and executor blacklist").r + val pattern = ( + s"""|Aborting TaskSet 0.0 because task .* + |cannot run anywhere due to node and executor blacklist""".stripMargin).r assert(pattern.findFirstIn(failure.getMessage).isDefined, s"Couldn't find $pattern in ${failure.getMessage()}") } diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index a136d69b36d6c..cd1b7a9e5ab18 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -110,7 +110,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M val taskSetBlacklist = createTaskSetBlacklist(stageId) if (stageId % 2 == 0) { // fail one task in every other taskset - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0) + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "1", index = 0, failureReason = "testing") failuresSoFar += 1 } blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures) @@ -132,7 +133,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M // for many different stages, executor 1 fails a task, and then the taskSet fails. (0 until failuresUntilBlacklisted * 10).foreach { stage => val taskSetBlacklist = createTaskSetBlacklist(stage) - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0) + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "1", index = 0, failureReason = "testing") } assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set()) } @@ -147,7 +149,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M val numFailures = math.max(conf.get(config.MAX_FAILURES_PER_EXEC), conf.get(config.MAX_FAILURES_PER_EXEC_STAGE)) (0 until numFailures).foreach { index => - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = index) + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "1", index = index, failureReason = "testing") } assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1")) assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set()) @@ -170,7 +173,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M // Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole // application. (0 until 4).foreach { partition => - taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition) + taskSetBlacklist0.updateBlacklistForFailedTask( + "hostA", exec = "1", index = partition, failureReason = "testing") } blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures) assert(blacklist.nodeBlacklist() === Set()) @@ -183,7 +187,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M // application. Since that's the second executor that is blacklisted on the same node, we also // blacklist that node. (0 until 4).foreach { partition => - taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition) + taskSetBlacklist1.updateBlacklistForFailedTask( + "hostA", exec = "2", index = partition, failureReason = "testing") } blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures) assert(blacklist.nodeBlacklist() === Set("hostA")) @@ -207,7 +212,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M // Fail one more task, but executor isn't put back into blacklist since the count of failures // on that executor should have been reset to 0. val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2) - taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = 0) + taskSetBlacklist2.updateBlacklistForFailedTask( + "hostA", exec = "1", index = 0, failureReason = "testing") blacklist.updateBlacklistForSuccessfulTaskSet(2, 0, taskSetBlacklist2.execToFailures) assert(blacklist.nodeBlacklist() === Set()) assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set()) @@ -221,7 +227,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M // Lets say that executor 1 dies completely. We get some task failures, but // the taskset then finishes successfully (elsewhere). (0 until 4).foreach { partition => - taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition) + taskSetBlacklist0.updateBlacklistForFailedTask( + "hostA", exec = "1", index = partition, failureReason = "testing") } blacklist.handleRemovedExecutor("1") blacklist.updateBlacklistForSuccessfulTaskSet( @@ -236,7 +243,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M // Now another executor gets spun up on that host, but it also dies. val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1) (0 until 4).foreach { partition => - taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition) + taskSetBlacklist1.updateBlacklistForFailedTask( + "hostA", exec = "2", index = partition, failureReason = "testing") } blacklist.handleRemovedExecutor("2") blacklist.updateBlacklistForSuccessfulTaskSet( @@ -279,7 +287,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M def failOneTaskInTaskSet(exec: String): Unit = { val taskSetBlacklist = createTaskSetBlacklist(stageId = stageId) - taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0) + taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0, "testing") blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures) stageId += 1 } @@ -354,12 +362,12 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1) val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2) // Taskset1 has one failure immediately - taskSetBlacklist1.updateBlacklistForFailedTask("host-1", "1", 0) + taskSetBlacklist1.updateBlacklistForFailedTask("host-1", "1", 0, "testing") // Then we have a *long* delay, much longer than the timeout, before any other failures or // taskset completion clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS * 5) // After the long delay, we have one failure on taskset 2, on the same executor - taskSetBlacklist2.updateBlacklistForFailedTask("host-1", "1", 0) + taskSetBlacklist2.updateBlacklistForFailedTask("host-1", "1", 0, "testing") // Finally, we complete both tasksets. Its important here to complete taskset2 *first*. We // want to make sure that when taskset 1 finishes, even though we've now got two task failures, // we realize that the task failure we just added was well before the timeout. @@ -377,16 +385,20 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M // we blacklist executors on two different hosts -- make sure that doesn't lead to any // node blacklisting val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0) - taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 0) - taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 1) + taskSetBlacklist0.updateBlacklistForFailedTask( + "hostA", exec = "1", index = 0, failureReason = "testing") + taskSetBlacklist0.updateBlacklistForFailedTask( + "hostA", exec = "1", index = 1, failureReason = "testing") blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures) assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1")) verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "1", 2)) assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set()) val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1) - taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 0) - taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 1) + taskSetBlacklist1.updateBlacklistForFailedTask( + "hostB", exec = "2", index = 0, failureReason = "testing") + taskSetBlacklist1.updateBlacklistForFailedTask( + "hostB", exec = "2", index = 1, failureReason = "testing") blacklist.updateBlacklistForSuccessfulTaskSet(1, 0, taskSetBlacklist1.execToFailures) assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2")) verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "2", 2)) @@ -395,8 +407,10 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M // Finally, blacklist another executor on the same node as the original blacklisted executor, // and make sure this time we *do* blacklist the node. val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 0) - taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 0) - taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 1) + taskSetBlacklist2.updateBlacklistForFailedTask( + "hostA", exec = "3", index = 0, failureReason = "testing") + taskSetBlacklist2.updateBlacklistForFailedTask( + "hostA", exec = "3", index = 1, failureReason = "testing") blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures) assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2", "3")) verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "3", 2)) @@ -486,7 +500,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M // Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole // application. (0 until 4).foreach { partition => - taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition) + taskSetBlacklist0.updateBlacklistForFailedTask( + "hostA", exec = "1", index = partition, failureReason = "testing") } blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures) @@ -497,7 +512,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M // application. Since that's the second executor that is blacklisted on the same node, we also // blacklist that node. (0 until 4).foreach { partition => - taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition) + taskSetBlacklist1.updateBlacklistForFailedTask( + "hostA", exec = "2", index = partition, failureReason = "testing") } blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures) @@ -512,7 +528,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M // Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole // application. (0 until 4).foreach { partition => - taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = partition) + taskSetBlacklist2.updateBlacklistForFailedTask( + "hostA", exec = "1", index = partition, failureReason = "testing") } blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures) @@ -523,7 +540,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M // application. Since that's the second executor that is blacklisted on the same node, we also // blacklist that node. (0 until 4).foreach { partition => - taskSetBlacklist3.updateBlacklistForFailedTask("hostA", exec = "2", index = partition) + taskSetBlacklist3.updateBlacklistForFailedTask( + "hostA", exec = "2", index = partition, failureReason = "testing") } blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist3.execToFailures) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index b8626bf777598..6003899bb7bef 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -660,9 +660,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(tsm.isZombie) assert(failedTaskSet) val idx = failedTask.index - assert(failedTaskSetReason === s"Aborting TaskSet 0.0 because task $idx (partition $idx) " + - s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior can be " + - s"configured via spark.blacklist.*.") + assert(failedTaskSetReason === s""" + |Aborting $taskSet because task $idx (partition $idx) + |cannot run anywhere due to node and executor blacklist. + |Most recent failure: + |${tsm.taskSetBlacklistHelperOpt.get.getLatestFailureReason} + | + |Blacklisting behavior can be configured via spark.blacklist.*. + |""".stripMargin) } test("don't abort if there is an executor available, though it hasn't had scheduled tasks yet") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala index f1392e9db6bfd..18981d5be2f94 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala @@ -37,7 +37,8 @@ class TaskSetBlacklistSuite extends SparkFunSuite { // First, mark task 0 as failed on exec1. // task 0 should be blacklisted on exec1, and nowhere else - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec1", index = 0) + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "exec1", index = 0, failureReason = "testing") for { executor <- (1 to 4).map(_.toString) index <- 0 until 10 @@ -49,17 +50,20 @@ class TaskSetBlacklistSuite extends SparkFunSuite { assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) // Mark task 1 failed on exec1 -- this pushes the executor into the blacklist - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec1", index = 1) + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "exec1", index = 1, failureReason = "testing") assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1")) assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) // Mark one task as failed on exec2 -- not enough for any further blacklisting yet. - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec2", index = 0) + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "exec2", index = 0, failureReason = "testing") assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1")) assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2")) assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) // Mark another task as failed on exec2 -- now we blacklist exec2, which also leads to // blacklisting the entire node. - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec2", index = 1) + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "exec2", index = 1, failureReason = "testing") assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1")) assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2")) assert(taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) @@ -108,34 +112,41 @@ class TaskSetBlacklistSuite extends SparkFunSuite { .set(config.MAX_FAILED_EXEC_PER_NODE_STAGE, 3) val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new SystemClock()) // Fail a task twice on hostA, exec:1 - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0) - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0) + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "1", index = 0, failureReason = "testing") + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "1", index = 0, failureReason = "testing") assert(taskSetBlacklist.isExecutorBlacklistedForTask("1", 0)) assert(!taskSetBlacklist.isNodeBlacklistedForTask("hostA", 0)) assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("1")) assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) // Fail the same task once more on hostA, exec:2 - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "2", index = 0) + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "2", index = 0, failureReason = "testing") assert(taskSetBlacklist.isNodeBlacklistedForTask("hostA", 0)) assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("2")) assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) // Fail another task on hostA, exec:1. Now that executor has failures on two different tasks, // so its blacklisted - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 1) + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "1", index = 1, failureReason = "testing") assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1")) assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) // Fail a third task on hostA, exec:2, so that exec is blacklisted for the whole task set - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "2", index = 2) + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "2", index = 2, failureReason = "testing") assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("2")) assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) // Fail a fourth & fifth task on hostA, exec:3. Now we've got three executors that are // blacklisted for the taskset, so blacklist the whole node. - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "3", index = 3) - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "3", index = 4) + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "3", index = 3, failureReason = "testing") + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "3", index = 4, failureReason = "testing") assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("3")) assert(taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) } @@ -147,13 +158,17 @@ class TaskSetBlacklistSuite extends SparkFunSuite { val conf = new SparkConf().setAppName("test").setMaster("local") .set(config.BLACKLIST_ENABLED.key, "true") val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new SystemClock()) - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0) - taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 1) + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "1", index = 0, failureReason = "testing") + taskSetBlacklist.updateBlacklistForFailedTask( + "hostA", exec = "1", index = 1, failureReason = "testing") assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1")) assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) - taskSetBlacklist.updateBlacklistForFailedTask("hostB", exec = "2", index = 0) - taskSetBlacklist.updateBlacklistForFailedTask("hostB", exec = "2", index = 1) + taskSetBlacklist.updateBlacklistForFailedTask( + "hostB", exec = "2", index = 0, failureReason = "testing") + taskSetBlacklist.updateBlacklistForFailedTask( + "hostB", exec = "2", index = 1, failureReason = "testing") assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1")) assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("2")) assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ae43f4cadc037..5c712bd6a545b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1146,7 +1146,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Make sure that the blacklist ignored all of the task failures above, since they aren't // the fault of the executor where the task was running. verify(blacklist, never()) - .updateBlacklistForFailedTask(anyString(), anyString(), anyInt()) + .updateBlacklistForFailedTask(anyString(), anyString(), anyInt(), anyString()) } test("update application blacklist for shuffle-fetch") {