diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 38b536023b4504a60eb8ba5f57a95f6d38f569ba..7046c06d2057d68e6e16f4d6bc2e15e5cbda42db 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -133,7 +133,8 @@ class DAGScheduler( private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo] - private[spark] val listenerBus = new SparkListenerBus() + // An async scheduler event bus. The bus should be stopped when DAGSCheduler is stopped. + private[spark] val listenerBus = new SparkListenerBus // Contains the locations that each RDD's partitions are cached on private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] @@ -1121,5 +1122,6 @@ class DAGScheduler( } metadataCleaner.cancel() taskSched.stop() + listenerBus.stop() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 627995c826e2b844cdc6a9656ee2d45c51a204df..55a40a92c96521ebdeece352ebf65324cbeee59f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -43,6 +43,9 @@ case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int], propertie case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult) extends SparkListenerEvents +/** An event used in the listener to shutdown the listener daemon thread. */ +private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents + trait SparkListener { /** * Called when a stage is completed, with information on the completed stage diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index e7defd768b2c3857e03b8747b6026dfcb9428dd6..fc637884e2833e0d88320409a8e3898532be53b2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -24,15 +24,17 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import org.apache.spark.Logging /** Asynchronously passes SparkListenerEvents to registered SparkListeners. */ -private[spark] class SparkListenerBus() extends Logging { - private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener] +private[spark] class SparkListenerBus extends Logging { + private val sparkListeners = new ArrayBuffer[SparkListener] with SynchronizedBuffer[SparkListener] /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ - private val EVENT_QUEUE_CAPACITY = 10000 + private val EVENT_QUEUE_CAPACITY = 10000 private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY) private var queueFullErrorMessageLogged = false + // Create a new daemon thread to listen for events. This thread is stopped when it receives + // a SparkListenerShutdown event, using the stop method. new Thread("SparkListenerBus") { setDaemon(true) override def run() { @@ -53,6 +55,9 @@ private[spark] class SparkListenerBus() extends Logging { sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult)) case taskEnd: SparkListenerTaskEnd => sparkListeners.foreach(_.onTaskEnd(taskEnd)) + case SparkListenerShutdown => + // Get out of the while loop and shutdown the daemon thread + return case _ => } } @@ -80,7 +85,7 @@ private[spark] class SparkListenerBus() extends Logging { */ def waitUntilEmpty(timeoutMillis: Int): Boolean = { val finishTime = System.currentTimeMillis + timeoutMillis - while (!eventQueue.isEmpty()) { + while (!eventQueue.isEmpty) { if (System.currentTimeMillis > finishTime) { return false } @@ -90,4 +95,6 @@ private[spark] class SparkListenerBus() extends Logging { } return true } + + def stop(): Unit = post(SparkListenerShutdown) }