From 81012546ee5a80d2576740af0dad067b0f5962c5 Mon Sep 17 00:00:00 2001
From: tedyu <yuzhihong@gmail.com>
Date: Tue, 24 Nov 2015 12:22:33 -0800
Subject: [PATCH] [SPARK-11872] Prevent the call to SparkContext#stop() in the
 listener bus's thread

This is continuation of SPARK-11761

Andrew suggested adding this protection. See tail of https://github.com/apache/spark/pull/9741

Author: tedyu <yuzhihong@gmail.com>

Closes #9852 from tedyu/master.
---
 .../scala/org/apache/spark/SparkContext.scala |  4 +++
 .../spark/scheduler/SparkListenerSuite.scala  | 31 +++++++++++++++++++
 2 files changed, 35 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b153a7b08e..e19ba11370 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1694,6 +1694,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
 
   // Shut down the SparkContext.
   def stop() {
+    if (AsynchronousListenerBus.withinListenerThread.value) {
+      throw new SparkException("Cannot stop SparkContext within listener thread of" +
+        " AsynchronousListenerBus")
+    }
     // Use the stopping variable to ensure no contention for the stop scenario.
     // Still track the stopped variable for use elsewhere in the code.
     if (!stopped.compareAndSet(false, true)) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 84e545851f..f20d5be7c0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 
 import org.scalatest.Matchers
 
+import org.apache.spark.SparkException
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.util.ResetSystemProperties
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
@@ -36,6 +37,21 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
 
   val jobCompletionTime = 1421191296660L
 
+  test("don't call sc.stop in listener") {
+    sc = new SparkContext("local", "SparkListenerSuite")
+    val listener = new SparkContextStoppingListener(sc)
+    val bus = new LiveListenerBus
+    bus.addListener(listener)
+
+    // Starting listener bus should flush all buffered events
+    bus.start(sc)
+    bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+    bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+
+    bus.stop()
+    assert(listener.sparkExSeen)
+  }
+
   test("basic creation and shutdown of LiveListenerBus") {
     val counter = new BasicJobCounter
     val bus = new LiveListenerBus
@@ -443,6 +459,21 @@ private class BasicJobCounter extends SparkListener {
   override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
 }
 
+/**
+ * A simple listener that tries to stop SparkContext.
+ */
+private class SparkContextStoppingListener(val sc: SparkContext) extends SparkListener {
+  @volatile var sparkExSeen = false
+  override def onJobEnd(job: SparkListenerJobEnd): Unit = {
+    try {
+      sc.stop()
+    } catch {
+      case se: SparkException =>
+        sparkExSeen = true
+    }
+  }
+}
+
 private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListener {
   var count = 0
   override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
-- 
GitLab