Skip to content

Commit

Permalink
Fixing a race condition in event listener unit test
Browse files Browse the repository at this point in the history
Author: Kan Zhang <kzhang@apache.org>

Closes apache#401 from kanzhang/fix-1475 and squashes the following commits:

c6058bd [Kan Zhang] Fixing a race condition in event listener unit test
  • Loading branch information
kanzhang authored and pdeyhim committed Jun 25, 2014
1 parent d45ce3f commit 3cf7dae
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}
}

// Exposed for testing
@volatile private[spark] var stopCalled = false

/**
* Start sending events to attached listeners.
*
Expand Down Expand Up @@ -97,7 +94,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}

def stop() {
stopCalled = true
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,21 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
test("bus.stop() waits for the event queue to completely drain") {
@volatile var drained = false

// When Listener has started
val listenerStarted = new Semaphore(0)

// Tells the listener to stop blocking
val listenerWait = new Semaphore(1)
val listenerWait = new Semaphore(0)

// When stopper has started
val stopperStarted = new Semaphore(0)

// When stop has returned
val stopReturned = new Semaphore(1)
// When stopper has returned
val stopperReturned = new Semaphore(0)

class BlockingListener extends SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
listenerStarted.release()
listenerWait.acquire()
drained = true
}
Expand All @@ -97,23 +104,26 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
bus.start()
bus.post(SparkListenerJobEnd(0, JobSucceeded))

// the queue should not drain immediately
listenerStarted.acquire()
// Listener should be blocked after start
assert(!drained)

new Thread("ListenerBusStopper") {
override def run() {
stopperStarted.release()
// stop() will block until notify() is called below
bus.stop()
stopReturned.release(1)
stopperReturned.release()
}
}.start()

while (!bus.stopCalled) {
Thread.sleep(10)
}
stopperStarted.acquire()
// Listener should remain blocked after stopper started
assert(!drained)

// unblock Listener to let queue drain
listenerWait.release()
stopReturned.acquire()
stopperReturned.acquire()
assert(drained)
}

Expand Down

0 comments on commit 3cf7dae

Please sign in to comment.