Skip to content
Snippets Groups Projects
Commit 01187f59 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-7217] [STREAMING] Add configuration to control the default behavior of...

[SPARK-7217] [STREAMING] Add configuration to control the default behavior of StreamingContext.stop() implicitly calling SparkContext.stop()

In environments like notebooks, the SparkContext is managed by the underlying infrastructure and it is expected that the SparkContext will not be stopped. However, StreamingContext.stop() calls SparkContext.stop() as a non-intuitive side-effect. This PR adds a configuration in SparkConf that sets the default StreamingContext stop behavior. It should be such that the existing behavior does not change for existing users.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5929 from tdas/SPARK-7217 and squashes the following commits:

869a763 [Tathagata Das] Changed implementation.
685fe00 [Tathagata Das] Added configuration
parent cfdadcbd
No related branches found
No related tags found
No related merge requests found
......@@ -563,13 +563,17 @@ class StreamingContext private[streaming] (
/**
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed).
* to be processed). By default, if `stopSparkContext` is not specified, the underlying
* SparkContext will also be stopped. This implicit behavior can be configured using the
* SparkConf configuration spark.streaming.stopSparkContextByDefault.
*
* @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
* @param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext
* will be stopped regardless of whether this StreamingContext has been
* started.
*/
def stop(stopSparkContext: Boolean = true): Unit = synchronized {
def stop(
stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)
): Unit = synchronized {
stop(stopSparkContext, false)
}
......
......@@ -118,6 +118,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(ssc.state === ssc.StreamingContextState.Started)
ssc.stop()
assert(ssc.state === ssc.StreamingContextState.Stopped)
// Make sure that the SparkContext is also stopped by default
intercept[Exception] {
ssc.sparkContext.makeRDD(1 to 10)
}
}
test("start multiple times") {
......@@ -154,16 +159,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
test("stop only streaming context") {
ssc = new StreamingContext(master, appName, batchDuration)
val conf = new SparkConf().setMaster(master).setAppName(appName)
// Explicitly do not stop SparkContext
ssc = new StreamingContext(conf, batchDuration)
sc = ssc.sparkContext
addInputStream(ssc).register()
ssc.start()
ssc.stop(stopSparkContext = false)
assert(sc.makeRDD(1 to 100).collect().size === 100)
ssc = new StreamingContext(sc, batchDuration)
sc.stop()
// Implicitly do not stop SparkContext
conf.set("spark.streaming.stopSparkContextByDefault", "false")
ssc = new StreamingContext(conf, batchDuration)
sc = ssc.sparkContext
addInputStream(ssc).register()
ssc.start()
ssc.stop()
assert(sc.makeRDD(1 to 100).collect().size === 100)
sc.stop()
}
test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment