Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15783][CORE] Fix Flakiness in BlacklistIntegrationSuite #13565

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -1471,8 +1471,10 @@ class DAGScheduler(
}

if (ableToCancelStages) {
job.listener.jobFailed(error)
// SPARK-15783 important to cleanup state first, just for tests where we have some asserts
// against the state. Otherwise we have a *little* bit of flakiness in the tests.
cleanupStateForJobAndIndependentStages(job)
job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark._
class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{

val badHost = "host-0"
val duration = Duration(10, SECONDS)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure that such a long duration isn't really necessary, but I don't think it hurts to make it longer just in case.


/**
* This backend just always fails if the task is executed on a bad host, but otherwise succeeds
Expand All @@ -41,20 +42,19 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM

// Test demonstrating the issue -- without a config change, the scheduler keeps scheduling
// according to locality preferences, and so the job fails
ignore("If preferred node is bad, without blacklist job will fail") {
testScheduler("If preferred node is bad, without blacklist job will fail") {
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
awaitJobTermination(jobFuture, duration)
}
assertDataStructuresEmpty(noFailure = false)
}

// even with the blacklist turned on, if maxTaskFailures is not more than the number
// of executors on the bad node, then locality preferences will lead to us cycling through
// the executors on the bad node, and still failing the job
ignoreScheduler(
testScheduler(
"With blacklist on, job will still fail if there are too many bad executors on bad host",
extraConfs = Seq(
// just set this to something much longer than the test duration
Expand All @@ -64,15 +64,14 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
val duration = Duration(3, SECONDS)
Await.ready(jobFuture, duration)
awaitJobTermination(jobFuture, duration)
}
assertDataStructuresEmpty(noFailure = false)
}

// Here we run with the blacklist on, and maxTaskFailures high enough that we'll eventually
// schedule on a good node and succeed the job
ignoreScheduler(
testScheduler(
"Bad node with multiple executors, job will still succeed with the right confs",
extraConfs = Seq(
// just set this to something much longer than the test duration
Expand All @@ -86,8 +85,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
awaitJobTermination(jobFuture, duration)
}
assert(results === (0 until 10).map { _ -> 42 }.toMap)
assertDataStructuresEmpty(noFailure = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
results.clear()
securityMgr = new SecurityManager(conf)
broadcastManager = new BroadcastManager(true, conf, securityMgr)
mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true)
mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) {
override def sendTracker(message: Any): Unit = {
// no-op, just so we can stop this to avoid leaking threads
}
}
scheduler = new DAGScheduler(
sc,
taskScheduler,
Expand All @@ -228,6 +232,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def afterEach(): Unit = {
try {
scheduler.stop()
dagEventProcessLoopTester.stop()
mapOutputTracker.stop()
broadcastManager.stop()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

broadcastManager actually isn't leaking any threads given the current implemenation of TorrentBroadcastFactory, but still we should probably stop it. Also worth noting that netty does not clean up the threads for the shuffle server and client immediately, so those still linger on between tests for a little while, but they do get cleaned up eventually.

} finally {
super.afterEach()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package org.apache.spark.scheduler

import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.{Await, Future}
Expand All @@ -31,7 +32,7 @@ import org.apache.spark._
import org.apache.spark.TaskState._
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{CallSite, Utils}
import org.apache.spark.util.{CallSite, ThreadUtils, Utils}

/**
* Tests for the entire scheduler code -- DAGScheduler, TaskSchedulerImpl, TaskSets,
Expand All @@ -54,6 +55,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
results.clear()
failure = null
backendException.set(null)
super.beforeEach()
}

Expand Down Expand Up @@ -89,11 +91,6 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
}

// still a few races to work out in the blacklist tests, so ignore some tests
def ignoreScheduler(name: String, extraConfs: Seq[(String, String)])(testBody: => Unit): Unit = {
ignore(name)(testBody)
}

/**
* A map from partition -> results for all tasks of a job when you call this test framework's
* [[submit]] method. Two important considerations:
Expand Down Expand Up @@ -166,6 +163,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
assert(failure != null)
}
assert(scheduler.activeJobs.isEmpty)
assert(backendException.get() == null)
}

/**
Expand Down Expand Up @@ -203,6 +201,8 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
new MockRDD(sc, nParts, shuffleDeps)
}

val backendException = new AtomicReference[Exception](null)

/**
* Helper which makes it a little easier to setup a test, which starts a mock backend in another
* thread, responding to tasks with your custom function. You also supply the "body" of your
Expand All @@ -217,7 +217,17 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
override def run(): Unit = {
while (backendContinue.get()) {
if (backend.hasTasksWaitingToRun) {
backendFunc()
try {
backendFunc()
} catch {
case ex: Exception =>
// Try to do a little error handling around exceptions that might occur here --
// otherwise it can just look like a TimeoutException in the test itself.
logError("Exception in mock backend:", ex)
backendException.set(ex)
backendContinue.set(false)
throw ex
}
} else {
Thread.sleep(10)
}
Expand All @@ -233,6 +243,25 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
}

/**
* Helper to do a little extra error checking while waiting for the job to terminate. Primarily
* just does a little extra error handling if there is an exception from the backend.
*/
def awaitJobTermination(jobFuture: Future[_], duration: Duration): Unit = {
try {
Await.ready(jobFuture, duration)
} catch {
case te: TimeoutException if backendException.get() != null =>
val msg = raw"""
| ----- Begin Backend Failure Msg -----
| ${Utils.exceptionString(backendException.get())}
| ----- End Backend Failure Msg ----
""".
stripMargin

fail(s"Future timed out after ${duration}, likely because of failure in backend: $msg")
}
}
}

/**
Expand All @@ -244,6 +273,17 @@ private[spark] abstract class MockBackend(
conf: SparkConf,
val taskScheduler: TaskSchedulerImpl) extends SchedulerBackend with Logging {

// Periodically revive offers to allow delay scheduling to work
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
private val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "10ms")

reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
reviveOffers()
}
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)

/**
* Test backends should call this to get a task that has been assigned to them by the scheduler.
* Each task should be responded to with either [[taskSuccess]] or [[taskFailed]].
Expand Down Expand Up @@ -309,7 +349,9 @@ private[spark] abstract class MockBackend(

override def start(): Unit = {}

override def stop(): Unit = {}
override def stop(): Unit = {
reviveThread.shutdown()
}

val env = SparkEnv.get

Expand All @@ -333,8 +375,9 @@ private[spark] abstract class MockBackend(
}

/**
* This is called by the scheduler whenever it has tasks it would like to schedule. It gets
* called in the scheduling thread, not the backend thread.
* This is called by the scheduler whenever it has tasks it would like to schedule, when a tasks
* completes (which will be in a result-getter thread), and by the reviveOffers thread for delay
* scheduling.
*/
override def reviveOffers(): Unit = {
val offers: Seq[WorkerOffer] = generateOffers()
Expand Down Expand Up @@ -483,7 +526,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
awaitJobTermination(jobFuture, duration)
}
assert(results === (0 until 10).map { _ -> 42 }.toMap)
assertDataStructuresEmpty()
Expand Down Expand Up @@ -517,10 +560,11 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor

// make sure the required map output is available
task.stageId match {
case 1 => assertMapOutputAvailable(b)
case 3 => assertMapOutputAvailable(c)
case 4 => assertMapOutputAvailable(d)
case _ => // no shuffle map input, nothing to check
case _ =>
// we can't check for the output for the two intermediate stages, unfortunately,
// b/c the stage numbering is non-deterministic, so stage number alone doesn't tell
// us what to check
}

(task.stageId, task.stageAttemptId, task.partitionId) match {
Expand All @@ -534,7 +578,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
withBackend(runBackend _) {
val jobFuture = submit(d, (0 until 30).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
awaitJobTermination(jobFuture, duration)
}
assert(results === (0 until 30).map { idx => idx -> (4321 + idx) }.toMap)
assertDataStructuresEmpty()
Expand All @@ -556,11 +600,9 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
val (taskDescription, task) = backend.beginTask()
stageToAttempts.getOrElseUpdate(task.stageId, new HashSet()) += task.stageAttemptId

// make sure the required map output is available
task.stageId match {
case 1 => assertMapOutputAvailable(shuffledRdd)
case _ => // no shuffle map input, nothing to check
}
// We cannot check if shuffle output is available, because the failed fetch will clear the
// shuffle output. Then we'd have a race, between the already-started task from the first
// attempt, and when the failure clears out the map output status.

(task.stageId, task.stageAttemptId, task.partitionId) match {
case (0, _, _) =>
Expand All @@ -576,7 +618,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
withBackend(runBackend _) {
val jobFuture = submit(shuffledRdd, (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
awaitJobTermination(jobFuture, duration)
}
assert(results === (0 until 10).map { idx => idx -> (42 + idx) }.toMap)
assert(stageToAttempts === Map(0 -> Set(0, 1), 1 -> Set(0, 1)))
Expand All @@ -591,7 +633,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
awaitJobTermination(jobFuture, duration)
failure.getMessage.contains("test task failure")
}
assertDataStructuresEmpty(noFailure = false)
Expand Down