Skip to content
Snippets Groups Projects
Commit 2180c871 authored by Reynold Xin's avatar Reynold Xin
Browse files

Stop SparkListenerBus daemon thread when DAGScheduler is stopped.

parent ee6e7f9b
No related branches found
No related tags found
No related merge requests found
......@@ -133,7 +133,8 @@ class DAGScheduler(
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
private[spark] val listenerBus = new SparkListenerBus()
// An async scheduler event bus. The bus should be stopped when DAGSCheduler is stopped.
private[spark] val listenerBus = new SparkListenerBus
// Contains the locations that each RDD's partitions are cached on
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
......@@ -1121,5 +1122,6 @@ class DAGScheduler(
}
metadataCleaner.cancel()
taskSched.stop()
listenerBus.stop()
}
}
......@@ -43,6 +43,9 @@ case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int], propertie
case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
extends SparkListenerEvents
/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents
trait SparkListener {
/**
* Called when a stage is completed, with information on the completed stage
......
......@@ -24,15 +24,17 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import org.apache.spark.Logging
/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
private[spark] class SparkListenerBus() extends Logging {
private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener]
private[spark] class SparkListenerBus extends Logging {
private val sparkListeners = new ArrayBuffer[SparkListener] with SynchronizedBuffer[SparkListener]
/* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
private val EVENT_QUEUE_CAPACITY = 10000
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
// Create a new daemon thread to listen for events. This thread is stopped when it receives
// a SparkListenerShutdown event, using the stop method.
new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
......@@ -53,6 +55,9 @@ private[spark] class SparkListenerBus() extends Logging {
sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult))
case taskEnd: SparkListenerTaskEnd =>
sparkListeners.foreach(_.onTaskEnd(taskEnd))
case SparkListenerShutdown =>
// Get out of the while loop and shutdown the daemon thread
return
case _ =>
}
}
......@@ -80,7 +85,7 @@ private[spark] class SparkListenerBus() extends Logging {
*/
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
val finishTime = System.currentTimeMillis + timeoutMillis
while (!eventQueue.isEmpty()) {
while (!eventQueue.isEmpty) {
if (System.currentTimeMillis > finishTime) {
return false
}
......@@ -90,4 +95,6 @@ private[spark] class SparkListenerBus() extends Logging {
}
return true
}
def stop(): Unit = post(SparkListenerShutdown)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment