Skip to content
Snippets Groups Projects
Commit 4cf4cba0 authored by zsxwing's avatar zsxwing Committed by Tathagata Das
Browse files

[SPARK-5379][Streaming] Add awaitTerminationOrTimeout

Added `awaitTerminationOrTimeout` to return if the waiting time elapsed:
* `true` if it's stopped.
* `false` if the waiting time elapsed before returning from the method.
* throw the reported error if it's thrown during the execution.

Also deprecated `awaitTermination(timeout: Long)`.

Author: zsxwing <zsxwing@gmail.com>

Closes #4171 from zsxwing/SPARK-5379 and squashes the following commits:

c9e660b [zsxwing] Add a unit test for awaitTerminationOrTimeout
8a89f92 [zsxwing] Add awaitTerminationOrTimeout to python
cdc820b [zsxwing] Add awaitTerminationOrTimeout
parent 6aed719e
No related branches found
No related tags found
No related merge requests found
......@@ -191,6 +191,15 @@ class StreamingContext(object):
else:
self._jssc.awaitTermination(int(timeout * 1000))
def awaitTerminationOrTimeout(self, timeout):
"""
Wait for the execution to stop. Return `true` if it's stopped; or
throw the reported error during the execution; or `false` if the
waiting time elapsed before returning from the method.
@param timeout: time to wait in seconds
"""
self._jssc.awaitTerminationOrTimeout(int(timeout * 1000))
def stop(self, stopSparkContext=True, stopGraceFully=False):
"""
Stop the execution of the streams, with option of ensuring all
......
......@@ -526,10 +526,23 @@ class StreamingContext private[streaming] (
* will be thrown in this thread.
* @param timeout time to wait in milliseconds
*/
@deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0")
def awaitTermination(timeout: Long) {
waiter.waitForStopOrError(timeout)
}
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
*
* @param timeout time to wait in milliseconds
* @return `true` if it's stopped; or throw the reported error during the execution; or `false`
* if the waiting time elapsed before returning from the method.
*/
def awaitTerminationOrTimeout(timeout: Long): Boolean = {
waiter.waitForStopOrError(timeout)
}
/**
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed).
......
......@@ -597,10 +597,23 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* will be thrown in this thread.
* @param timeout time to wait in milliseconds
*/
@deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0")
def awaitTermination(timeout: Long): Unit = {
ssc.awaitTermination(timeout)
}
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
*
* @param timeout time to wait in milliseconds
* @return `true` if it's stopped; or throw the reported error during the execution; or `false`
* if the waiting time elapsed before returning from the method.
*/
def awaitTerminationOrTimeout(timeout: Long): Boolean = {
ssc.awaitTerminationOrTimeout(timeout)
}
/**
* Stop the execution of the streams. Will stop the associated JavaSparkContext as well.
*/
......
......@@ -304,6 +304,30 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(exception.getMessage.contains("transform"), "Expected exception not thrown")
}
test("awaitTerminationOrTimeout") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
inputStream.map(x => x).register()
ssc.start()
// test whether awaitTerminationOrTimeout() return false after give amount of time
failAfter(1000 millis) {
assert(ssc.awaitTerminationOrTimeout(500) === false)
}
// test whether awaitTerminationOrTimeout() return true if context is stopped
failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown
new Thread() {
override def run() {
Thread.sleep(500)
ssc.stop()
}
}.start()
assert(ssc.awaitTerminationOrTimeout(10000) === true)
}
}
test("DStream and generated RDD creation sites") {
testPackage.test()
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment