diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 8c6f86a6c0e88..148d20ee659a2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -22,7 +22,8 @@ import java.util.concurrent.Semaphore import scala.concurrent.TimeoutException import scala.concurrent.duration._ -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite, + TestUtils} import org.apache.spark.internal.config import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils} @@ -48,12 +49,6 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { test("verify a task with all workers decommissioned succeeds") { val input = sc.parallelize(1 to 10) - // Do a count to wait for the executors to be registered. - input.count() - val sleepyRdd = input.mapPartitions{ x => - Thread.sleep(50) - x - } // Listen for the job val sem = new Semaphore(0) sc.addSparkListener(new SparkListener { @@ -61,22 +56,31 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { sem.release() } }) + TestUtils.waitUntilExecutorsUp(sc = sc, + numExecutors = 2, + timeout = 10000) // 10s + val sleepyRdd = input.mapPartitions{ x => + Thread.sleep(5000) // 5s + x + } // Start the task. val asyncCount = sleepyRdd.countAsync() // Wait for the job to have started sem.acquire(1) + // Give it time to make it to the worker otherwise we'll block + Thread.sleep(2000) // 2s // Decommission all the executors, this should not halt the current task. // decom.sh message passing is tested manually. val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() execs.foreach(execId => sched.decommissionExecutor(execId)) - val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 10.seconds) + val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) // Try and launch task after decommissioning, this should fail val postDecommissioned = input.map(x => x) val postDecomAsyncCount = postDecommissioned.countAsync() val thrown = intercept[java.util.concurrent.TimeoutException]{ - val result = ThreadUtils.awaitResult(postDecomAsyncCount, 10.seconds) + val result = ThreadUtils.awaitResult(postDecomAsyncCount, 20.seconds) } assert(postDecomAsyncCount.isCompleted === false, "After exec decommission new task could not launch")