Skip to content

Commit

Permalink
[SPARK-4813][Streaming] Fix the issue that ContextWaiter didn't handl…
Browse files Browse the repository at this point in the history
…e 'spurious wakeup'

Used `Condition` to rewrite `ContextWaiter` because it provides a convenient API `awaitNanos` for timeout.

Author: zsxwing <zsxwing@gmail.com>

Closes apache#3661 from zsxwing/SPARK-4813 and squashes the following commits:

52247f5 [zsxwing] Add explicit unit type
be42bcf [zsxwing] Update as per review suggestion
e06bd4f [zsxwing] Fix the issue that ContextWaiter didn't handle 'spurious wakeup'
  • Loading branch information
zsxwing authored and tdas committed Dec 30, 2014
1 parent 0f31992 commit 6a89782
Showing 1 changed file with 48 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,63 @@

package org.apache.spark.streaming

import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock

private[streaming] class ContextWaiter {

private val lock = new ReentrantLock()
private val condition = lock.newCondition()

// Guarded by "lock"
private var error: Throwable = null
private var stopped: Boolean = false

def notifyError(e: Throwable) = synchronized {
error = e
notifyAll()
}
// Guarded by "lock"
private var stopped: Boolean = false

def notifyStop() = synchronized {
stopped = true
notifyAll()
def notifyError(e: Throwable): Unit = {
lock.lock()
try {
error = e
condition.signalAll()
} finally {
lock.unlock()
}
}

def waitForStopOrError(timeout: Long = -1) = synchronized {
// If already had error, then throw it
if (error != null) {
throw error
def notifyStop(): Unit = {
lock.lock()
try {
stopped = true
condition.signalAll()
} finally {
lock.unlock()
}
}

// If not already stopped, then wait
if (!stopped) {
if (timeout < 0) wait() else wait(timeout)
/**
* Return `true` if it's stopped; or throw the reported error if `notifyError` has been called; or
* `false` if the waiting time detectably elapsed before return from the method.
*/
def waitForStopOrError(timeout: Long = -1): Boolean = {
lock.lock()
try {
if (timeout < 0) {
while (!stopped && error == null) {
condition.await()
}
} else {
var nanos = TimeUnit.MILLISECONDS.toNanos(timeout)
while (!stopped && error == null && nanos > 0) {
nanos = condition.awaitNanos(nanos)
}
}
// If already had error, then throw it
if (error != null) throw error
// already stopped or timeout
stopped
} finally {
lock.unlock()
}
}
}

0 comments on commit 6a89782

Please sign in to comment.