Skip to content

Commit

Permalink
[SPARK-31791][CORE][TEST] Improve cache block migration test reliability
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Increase the timeout and register the listener earlier to avoid any race condition of the job starting before the listener is registered.

### Why are the changes needed?

The test is currently semi-flaky.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?
I'm currently running the following bash script on my dev machine to verify the flakiness decreases. It has gotten to 356 iterations without any test failures so I believe issue is fixed.

```
set -ex
./build/sbt clean compile package
((failures=0))
for (( i=0;i<1000;++i )); do
  echo "Run $i"
  ((failed=0))
  ./build/sbt "core/testOnly org.apache.spark.scheduler.WorkerDecommissionSuite" || ((failed=1))
  echo "Resulted in $failed"
  ((failures=failures+failed))
  echo "Current status is failures: $failures out of $i runs"
done
```

Closes #28614 from holdenk/SPARK-31791-improve-cache-block-migration-test-reliability.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
  • Loading branch information
holdenk committed May 23, 2020
1 parent 7ca73f0 commit 721cba5
Showing 1 changed file with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.util.concurrent.Semaphore
import scala.concurrent.TimeoutException
import scala.concurrent.duration._

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite,
TestUtils}
import org.apache.spark.internal.config
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils}
Expand All @@ -48,35 +49,38 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {

test("verify a task with all workers decommissioned succeeds") {
val input = sc.parallelize(1 to 10)
// Do a count to wait for the executors to be registered.
input.count()
val sleepyRdd = input.mapPartitions{ x =>
Thread.sleep(50)
x
}
// Listen for the job
val sem = new Semaphore(0)
sc.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
sem.release()
}
})
TestUtils.waitUntilExecutorsUp(sc = sc,
numExecutors = 2,
timeout = 10000) // 10s
val sleepyRdd = input.mapPartitions{ x =>
Thread.sleep(5000) // 5s
x
}
// Start the task.
val asyncCount = sleepyRdd.countAsync()
// Wait for the job to have started
sem.acquire(1)
// Give it time to make it to the worker otherwise we'll block
Thread.sleep(2000) // 2s
// Decommission all the executors, this should not halt the current task.
// decom.sh message passing is tested manually.
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
execs.foreach(execId => sched.decommissionExecutor(execId))
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 10.seconds)
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
assert(asyncCountResult === 10)
// Try and launch task after decommissioning, this should fail
val postDecommissioned = input.map(x => x)
val postDecomAsyncCount = postDecommissioned.countAsync()
val thrown = intercept[java.util.concurrent.TimeoutException]{
val result = ThreadUtils.awaitResult(postDecomAsyncCount, 10.seconds)
val result = ThreadUtils.awaitResult(postDecomAsyncCount, 20.seconds)
}
assert(postDecomAsyncCount.isCompleted === false,
"After exec decommission new task could not launch")
Expand Down

0 comments on commit 721cba5

Please sign in to comment.