diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index b1ad0d42ec8a6c39b33b1dc62b8c38528c7c78aa..bbdb4e8af036c9336f5ea5274eebd92d9c084081 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -563,13 +563,17 @@ class StreamingContext private[streaming] ( /** * Stop the execution of the streams immediately (does not wait for all received data - * to be processed). + * to be processed). By default, if `stopSparkContext` is not specified, the underlying + * SparkContext will also be stopped. This implicit behavior can be configured using the + * SparkConf configuration spark.streaming.stopSparkContextByDefault. * - * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext + * @param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext * will be stopped regardless of whether this StreamingContext has been * started. */ - def stop(stopSparkContext: Boolean = true): Unit = synchronized { + def stop( + stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true) + ): Unit = synchronized { stop(stopSparkContext, false) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 5207b7109e69b3d7b11371bd1a70b10cf7468d80..a589deb1fa5795679f4017c4aa0fbfc912f12109 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -118,6 +118,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w assert(ssc.state === ssc.StreamingContextState.Started) ssc.stop() assert(ssc.state === ssc.StreamingContextState.Stopped) + + // Make sure that the SparkContext is also stopped by default + intercept[Exception] { + ssc.sparkContext.makeRDD(1 to 10) + } } test("start multiple times") { @@ -154,16 +159,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } test("stop only streaming context") { - ssc = new StreamingContext(master, appName, batchDuration) + val conf = new SparkConf().setMaster(master).setAppName(appName) + + // Explicitly do not stop SparkContext + ssc = new StreamingContext(conf, batchDuration) sc = ssc.sparkContext addInputStream(ssc).register() ssc.start() ssc.stop(stopSparkContext = false) assert(sc.makeRDD(1 to 100).collect().size === 100) - ssc = new StreamingContext(sc, batchDuration) + sc.stop() + + // Implicitly do not stop SparkContext + conf.set("spark.streaming.stopSparkContextByDefault", "false") + ssc = new StreamingContext(conf, batchDuration) + sc = ssc.sparkContext addInputStream(ssc).register() ssc.start() ssc.stop() + assert(sc.makeRDD(1 to 100).collect().size === 100) + sc.stop() } test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {