Skip to content
Snippets Groups Projects
Commit 6a897829 authored by zsxwing's avatar zsxwing Committed by Tathagata Das
Browse files

[SPARK-4813][Streaming] Fix the issue that ContextWaiter didn't handle 'spurious wakeup'

Used `Condition` to rewrite `ContextWaiter` because it provides a convenient API `awaitNanos` for timeout.

Author: zsxwing <zsxwing@gmail.com>

Closes #3661 from zsxwing/SPARK-4813 and squashes the following commits:

52247f5 [zsxwing] Add explicit unit type
be42bcf [zsxwing] Update as per review suggestion
e06bd4f [zsxwing] Fix the issue that ContextWaiter didn't handle 'spurious wakeup'
parent 0f31992c
No related branches found
No related tags found
No related merge requests found
......@@ -17,30 +17,63 @@
package org.apache.spark.streaming
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
private[streaming] class ContextWaiter {
private val lock = new ReentrantLock()
private val condition = lock.newCondition()
// Guarded by "lock"
private var error: Throwable = null
private var stopped: Boolean = false
def notifyError(e: Throwable) = synchronized {
error = e
notifyAll()
}
// Guarded by "lock"
private var stopped: Boolean = false
def notifyStop() = synchronized {
stopped = true
notifyAll()
def notifyError(e: Throwable): Unit = {
lock.lock()
try {
error = e
condition.signalAll()
} finally {
lock.unlock()
}
}
def waitForStopOrError(timeout: Long = -1) = synchronized {
// If already had error, then throw it
if (error != null) {
throw error
def notifyStop(): Unit = {
lock.lock()
try {
stopped = true
condition.signalAll()
} finally {
lock.unlock()
}
}
// If not already stopped, then wait
if (!stopped) {
if (timeout < 0) wait() else wait(timeout)
/**
* Return `true` if it's stopped; or throw the reported error if `notifyError` has been called; or
* `false` if the waiting time detectably elapsed before return from the method.
*/
def waitForStopOrError(timeout: Long = -1): Boolean = {
lock.lock()
try {
if (timeout < 0) {
while (!stopped && error == null) {
condition.await()
}
} else {
var nanos = TimeUnit.MILLISECONDS.toNanos(timeout)
while (!stopped && error == null && nanos > 0) {
nanos = condition.awaitNanos(nanos)
}
}
// If already had error, then throw it
if (error != null) throw error
// already stopped or timeout
stopped
} finally {
lock.unlock()
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment