diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 36a6e6338faa6bea6a34ffdb6828336d171f13f0..be23056e7d42301e2d017de4a5a353725990a777 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -17,10 +17,9 @@
 
 package org.apache.spark.scheduler
 
-import java.util.concurrent.{LinkedBlockingQueue, Semaphore}
+import java.util.concurrent.atomic.AtomicBoolean
 
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.util.AsynchronousListenerBus
 
 /**
  * Asynchronously passes SparkListenerEvents to registered SparkListeners.
@@ -29,113 +28,19 @@ import org.apache.spark.util.Utils
  * has started will events be actually propagated to all attached listeners. This listener bus
  * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
  */
-private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
-
-  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
-   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
-  private val EVENT_QUEUE_CAPACITY = 10000
-  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
-  private var queueFullErrorMessageLogged = false
-  private var started = false
-
-  // A counter that represents the number of events produced and consumed in the queue
-  private val eventLock = new Semaphore(0)
-
-  private val listenerThread = new Thread("SparkListenerBus") {
-    setDaemon(true)
-    override def run(): Unit = Utils.logUncaughtExceptions {
-      while (true) {
-        eventLock.acquire()
-        // Atomically remove and process this event
-        LiveListenerBus.this.synchronized {
-          val event = eventQueue.poll
-          if (event == SparkListenerShutdown) {
-            // Get out of the while loop and shutdown the daemon thread
-            return
-          }
-          Option(event).foreach(postToAll)
-        }
-      }
-    }
-  }
-
-  /**
-   * Start sending events to attached listeners.
-   *
-   * This first sends out all buffered events posted before this listener bus has started, then
-   * listens for any additional events asynchronously while the listener bus is still running.
-   * This should only be called once.
-   */
-  def start() {
-    if (started) {
-      throw new IllegalStateException("Listener bus already started!")
+private[spark] class LiveListenerBus
+  extends AsynchronousListenerBus[SparkListener, SparkListenerEvent]("SparkListenerBus")
+  with SparkListenerBus {
+
+  private val logDroppedEvent = new AtomicBoolean(false)
+
+  override def onDropEvent(event: SparkListenerEvent): Unit = {
+    if (logDroppedEvent.compareAndSet(false, true)) {
+      // Only log the following message once to avoid duplicated annoying logs.
+      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
+        "This likely means one of the SparkListeners is too slow and cannot keep up with " +
+        "the rate at which tasks are being started by the scheduler.")
     }
-    listenerThread.start()
-    started = true
   }
 
-  def post(event: SparkListenerEvent) {
-    val eventAdded = eventQueue.offer(event)
-    if (eventAdded) {
-      eventLock.release()
-    } else {
-      logQueueFullErrorMessage()
-    }
-  }
-
-  /**
-   * For testing only. Wait until there are no more events in the queue, or until the specified
-   * time has elapsed. Return true if the queue has emptied and false is the specified time
-   * elapsed before the queue emptied.
-   */
-  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
-    val finishTime = System.currentTimeMillis + timeoutMillis
-    while (!queueIsEmpty) {
-      if (System.currentTimeMillis > finishTime) {
-        return false
-      }
-      /* Sleep rather than using wait/notify, because this is used only for testing and
-       * wait/notify add overhead in the general case. */
-      Thread.sleep(10)
-    }
-    true
-  }
-
-  /**
-   * For testing only. Return whether the listener daemon thread is still alive.
-   */
-  def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }
-
-  /**
-   * Return whether the event queue is empty.
-   *
-   * The use of synchronized here guarantees that all events that once belonged to this queue
-   * have already been processed by all attached listeners, if this returns true.
-   */
-  def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }
-
-  /**
-   * Log an error message to indicate that the event queue is full. Do this only once.
-   */
-  private def logQueueFullErrorMessage(): Unit = {
-    if (!queueFullErrorMessageLogged) {
-      if (listenerThread.isAlive) {
-        logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
-          "This likely means one of the SparkListeners is too slow and cannot keep up with" +
-          "the rate at which tasks are being started by the scheduler.")
-      } else {
-        logError("SparkListenerBus thread is dead! This means SparkListenerEvents have not" +
-          "been (and will no longer be) propagated to listeners for some time.")
-      }
-      queueFullErrorMessageLogged = true
-    }
-  }
-
-  def stop() {
-    if (!started) {
-      throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
-    }
-    post(SparkListenerShutdown)
-    listenerThread.join()
-  }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 8f5ceaa5de515e182d0806c349084584c389f2b5..dd28ddb31de1f8676559bcb5d11b9d1fb21dcc24 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -116,9 +116,6 @@ case class SparkListenerApplicationStart(appName: String, appId: Option[String],
 @DeveloperApi
 case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
 
-/** An event used in the listener to shutdown the listener daemon thread. */
-private[spark] case object SparkListenerShutdown extends SparkListenerEvent
-
 
 /**
  * :: DeveloperApi ::
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index e700c6af542f4a0f47c8508846fe515e8d7d1642..fe8a19a2c0cb94115f99a9a44ac110daebf55700 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -17,78 +17,47 @@
 
 package org.apache.spark.scheduler
 
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.util.ListenerBus
 
 /**
- * A SparkListenerEvent bus that relays events to its listeners
+ * A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners
  */
-private[spark] trait SparkListenerBus extends Logging {
-
-  // SparkListeners attached to this event bus
-  protected val sparkListeners = new ArrayBuffer[SparkListener]
-    with mutable.SynchronizedBuffer[SparkListener]
-
-  def addListener(listener: SparkListener) {
-    sparkListeners += listener
-  }
+private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] {
 
-  /**
-   * Post an event to all attached listeners.
-   * This does nothing if the event is SparkListenerShutdown.
-   */
-  def postToAll(event: SparkListenerEvent) {
+  override def onPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
     event match {
       case stageSubmitted: SparkListenerStageSubmitted =>
-        foreachListener(_.onStageSubmitted(stageSubmitted))
+        listener.onStageSubmitted(stageSubmitted)
       case stageCompleted: SparkListenerStageCompleted =>
-        foreachListener(_.onStageCompleted(stageCompleted))
+        listener.onStageCompleted(stageCompleted)
       case jobStart: SparkListenerJobStart =>
-        foreachListener(_.onJobStart(jobStart))
+        listener.onJobStart(jobStart)
       case jobEnd: SparkListenerJobEnd =>
-        foreachListener(_.onJobEnd(jobEnd))
+        listener.onJobEnd(jobEnd)
       case taskStart: SparkListenerTaskStart =>
-        foreachListener(_.onTaskStart(taskStart))
+        listener.onTaskStart(taskStart)
       case taskGettingResult: SparkListenerTaskGettingResult =>
-        foreachListener(_.onTaskGettingResult(taskGettingResult))
+        listener.onTaskGettingResult(taskGettingResult)
       case taskEnd: SparkListenerTaskEnd =>
-        foreachListener(_.onTaskEnd(taskEnd))
+        listener.onTaskEnd(taskEnd)
       case environmentUpdate: SparkListenerEnvironmentUpdate =>
-        foreachListener(_.onEnvironmentUpdate(environmentUpdate))
+        listener.onEnvironmentUpdate(environmentUpdate)
       case blockManagerAdded: SparkListenerBlockManagerAdded =>
-        foreachListener(_.onBlockManagerAdded(blockManagerAdded))
+        listener.onBlockManagerAdded(blockManagerAdded)
       case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
-        foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))
+        listener.onBlockManagerRemoved(blockManagerRemoved)
       case unpersistRDD: SparkListenerUnpersistRDD =>
-        foreachListener(_.onUnpersistRDD(unpersistRDD))
+        listener.onUnpersistRDD(unpersistRDD)
       case applicationStart: SparkListenerApplicationStart =>
-        foreachListener(_.onApplicationStart(applicationStart))
+        listener.onApplicationStart(applicationStart)
       case applicationEnd: SparkListenerApplicationEnd =>
-        foreachListener(_.onApplicationEnd(applicationEnd))
+        listener.onApplicationEnd(applicationEnd)
       case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
-        foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))
+        listener.onExecutorMetricsUpdate(metricsUpdate)
       case executorAdded: SparkListenerExecutorAdded =>
-        foreachListener(_.onExecutorAdded(executorAdded))
+        listener.onExecutorAdded(executorAdded)
       case executorRemoved: SparkListenerExecutorRemoved =>
-        foreachListener(_.onExecutorRemoved(executorRemoved))
-      case SparkListenerShutdown =>
-    }
-  }
-
-  /**
-   * Apply the given function to all attached listeners, catching and logging any exception.
-   */
-  private def foreachListener(f: SparkListener => Unit): Unit = {
-    sparkListeners.foreach { listener =>
-      try {
-        f(listener)
-      } catch {
-        case e: Exception =>
-          logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
-      }
+        listener.onExecutorRemoved(executorRemoved)
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
new file mode 100644
index 0000000000000000000000000000000000000000..18c627e8c7a151dc9f572a9abbd853c55f0acdaa
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicBoolean
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * Asynchronously passes events to registered listeners.
+ *
+ * Until `start()` is called, all posted events are only buffered. Only after this listener bus
+ * has started will events be actually propagated to all attached listeners. This listener bus
+ * is stopped when `stop()` is called, and it will drop further events after stopping.
+ *
+ * @param name name of the listener bus, will be the name of the listener thread.
+ * @tparam L type of listener
+ * @tparam E type of event
+ */
+private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: String)
+  extends ListenerBus[L, E] {
+
+  self =>
+
+  /* Cap the capacity of the event queue so we get an explicit error (rather than
+   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
+  private val EVENT_QUEUE_CAPACITY = 10000
+  private val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY)
+
+  // Indicate if `start()` is called
+  private val started = new AtomicBoolean(false)
+  // Indicate if `stop()` is called
+  private val stopped = new AtomicBoolean(false)
+
+  // Indicate if we are processing some event
+  // Guarded by `self`
+  private var processingEvent = false
+
+  // A counter that represents the number of events produced and consumed in the queue
+  private val eventLock = new Semaphore(0)
+
+  private val listenerThread = new Thread(name) {
+    setDaemon(true)
+    override def run(): Unit = Utils.logUncaughtExceptions {
+      while (true) {
+        eventLock.acquire()
+        self.synchronized {
+          processingEvent = true
+        }
+        try {
+          val event = eventQueue.poll
+          if (event == null) {
+            // Get out of the while loop and shutdown the daemon thread
+            if (!stopped.get) {
+              throw new IllegalStateException("Polling `null` from eventQueue means" +
+                " the listener bus has been stopped. So `stopped` must be true")
+            }
+            return
+          }
+          postToAll(event)
+        } finally {
+          self.synchronized {
+            processingEvent = false
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Start sending events to attached listeners.
+   *
+   * This first sends out all buffered events posted before this listener bus has started, then
+   * listens for any additional events asynchronously while the listener bus is still running.
+   * This should only be called once.
+   */
+  def start() {
+    if (started.compareAndSet(false, true)) {
+      listenerThread.start()
+    } else {
+      throw new IllegalStateException(s"$name already started!")
+    }
+  }
+
+  def post(event: E) {
+    if (stopped.get) {
+      // Drop further events to make `listenerThread` exit ASAP
+      logError(s"$name has already stopped! Dropping event $event")
+      return
+    }
+    val eventAdded = eventQueue.offer(event)
+    if (eventAdded) {
+      eventLock.release()
+    } else {
+      onDropEvent(event)
+    }
+  }
+
+  /**
+   * For testing only. Wait until there are no more events in the queue, or until the specified
+   * time has elapsed. Return true if the queue has emptied and false is the specified time
+   * elapsed before the queue emptied.
+   */
+  @VisibleForTesting
+  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
+    val finishTime = System.currentTimeMillis + timeoutMillis
+    while (!queueIsEmpty) {
+      if (System.currentTimeMillis > finishTime) {
+        return false
+      }
+      /* Sleep rather than using wait/notify, because this is used only for testing and
+       * wait/notify add overhead in the general case. */
+      Thread.sleep(10)
+    }
+    true
+  }
+
+  /**
+   * For testing only. Return whether the listener daemon thread is still alive.
+   */
+  @VisibleForTesting
+  def listenerThreadIsAlive: Boolean = listenerThread.isAlive
+
+  /**
+   * Return whether the event queue is empty.
+   *
+   * The use of synchronized here guarantees that all events that once belonged to this queue
+   * have already been processed by all attached listeners, if this returns true.
+   */
+  private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent }
+
+  /**
+   * Stop the listener bus. It will wait until the queued events have been processed, but drop the
+   * new events after stopping.
+   */
+  def stop() {
+    if (!started.get()) {
+      throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
+    }
+    if (stopped.compareAndSet(false, true)) {
+      // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know
+      // `stop` is called.
+      eventLock.release()
+      listenerThread.join()
+    } else {
+      // Keep quiet
+    }
+  }
+
+  /**
+   * If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be
+   * notified with the dropped events.
+   *
+   * Note: `onDropEvent` can be called in any thread.
+   */
+  def onDropEvent(event: E): Unit
+}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index b5f736dc41c6c893bcee940e728a1fa23c08a1ff..414bc49a57f8a88b563667c6a980b304131839a8 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -91,7 +91,6 @@ private[spark] object JsonProtocol {
       case executorRemoved: SparkListenerExecutorRemoved =>
         executorRemovedToJson(executorRemoved)
       // These aren't used, but keeps compiler happy
-      case SparkListenerShutdown => JNothing
       case SparkListenerExecutorMetricsUpdate(_, _) => JNothing
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
new file mode 100644
index 0000000000000000000000000000000000000000..bd0aa4dc4650f7938feb793e2c4cf308e0f1ba74
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent.CopyOnWriteArrayList
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.Logging
+
+/**
+ * An event bus which posts events to its listeners.
+ */
+private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
+
+  private val listeners = new CopyOnWriteArrayList[L]
+
+  /**
+   * Add a listener to listen events. This method is thread-safe and can be called in any thread.
+   */
+  final def addListener(listener: L) {
+    listeners.add(listener)
+  }
+
+  /**
+   * Post the event to all registered listeners. The `postToAll` caller should guarantee calling
+   * `postToAll` in the same thread for all events.
+   */
+  final def postToAll(event: E): Unit = {
+    // JavaConversions will create a JIterableWrapper if we use some Scala collection functions.
+    // However, this method will be called frequently. To avoid the wrapper cost, here ewe use
+    // Java Iterator directly.
+    val iter = listeners.iterator
+    while (iter.hasNext) {
+      val listener = iter.next()
+      try {
+        onPostEvent(listener, event)
+      } catch {
+        case NonFatal(e) =>
+          logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
+      }
+    }
+  }
+
+  /**
+   * Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same
+   * thread.
+   */
+  def onPostEvent(listener: L, event: E): Unit
+
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index ed1aa114e19d9f195df8fb00aca82f75e840e9fa..74dbba453f0264dd5e14e9d55704f7d2f4224c01 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -50,9 +50,6 @@ case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo)
 case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo)
   extends StreamingListenerEvent
 
-/** An event used in the listener to shutdown the listener daemon thread. */
-private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
-
 /**
  * :: DeveloperApi ::
  * A listener interface for receiving information about an ongoing streaming
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 398724d9e8130bfa796e8a393d39b63fbd711c92..b07d6cf347ca743a080197e36436e877580f805f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -17,83 +17,42 @@
 
 package org.apache.spark.streaming.scheduler
 
+import java.util.concurrent.atomic.AtomicBoolean
+
 import org.apache.spark.Logging
-import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
-import java.util.concurrent.LinkedBlockingQueue
+import org.apache.spark.util.AsynchronousListenerBus
 
 /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
-private[spark] class StreamingListenerBus() extends Logging {
-  private val listeners = new ArrayBuffer[StreamingListener]()
-    with SynchronizedBuffer[StreamingListener]
-
-  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
-   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
-  private val EVENT_QUEUE_CAPACITY = 10000
-  private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY)
-  private var queueFullErrorMessageLogged = false
-
-  val listenerThread = new Thread("StreamingListenerBus") {
-    setDaemon(true)
-    override def run() {
-      while (true) {
-        val event = eventQueue.take
-        event match {
-          case receiverStarted: StreamingListenerReceiverStarted =>
-            listeners.foreach(_.onReceiverStarted(receiverStarted))
-          case receiverError: StreamingListenerReceiverError =>
-            listeners.foreach(_.onReceiverError(receiverError))
-          case receiverStopped: StreamingListenerReceiverStopped =>
-            listeners.foreach(_.onReceiverStopped(receiverStopped))
-          case batchSubmitted: StreamingListenerBatchSubmitted =>
-            listeners.foreach(_.onBatchSubmitted(batchSubmitted))
-          case batchStarted: StreamingListenerBatchStarted =>
-            listeners.foreach(_.onBatchStarted(batchStarted))
-          case batchCompleted: StreamingListenerBatchCompleted =>
-            listeners.foreach(_.onBatchCompleted(batchCompleted))
-          case StreamingListenerShutdown =>
-            // Get out of the while loop and shutdown the daemon thread
-            return
-          case _ =>
-        }
-      }
+private[spark] class StreamingListenerBus
+  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
+  with Logging {
+
+  private val logDroppedEvent = new AtomicBoolean(false)
+
+  override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
+    event match {
+      case receiverStarted: StreamingListenerReceiverStarted =>
+        listener.onReceiverStarted(receiverStarted)
+      case receiverError: StreamingListenerReceiverError =>
+        listener.onReceiverError(receiverError)
+      case receiverStopped: StreamingListenerReceiverStopped =>
+        listener.onReceiverStopped(receiverStopped)
+      case batchSubmitted: StreamingListenerBatchSubmitted =>
+        listener.onBatchSubmitted(batchSubmitted)
+      case batchStarted: StreamingListenerBatchStarted =>
+        listener.onBatchStarted(batchStarted)
+      case batchCompleted: StreamingListenerBatchCompleted =>
+        listener.onBatchCompleted(batchCompleted)
+      case _ =>
     }
   }
 
-  def start() {
-    listenerThread.start()
-  }
-
-  def addListener(listener: StreamingListener) {
-    listeners += listener
-  }
-
-  def post(event: StreamingListenerEvent) {
-    val eventAdded = eventQueue.offer(event)
-    if (!eventAdded && !queueFullErrorMessageLogged) {
+  override def onDropEvent(event: StreamingListenerEvent): Unit = {
+    if (logDroppedEvent.compareAndSet(false, true)) {
+      // Only log the following message once to avoid duplicated annoying logs.
       logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
         "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
         "rate at which events are being started by the scheduler.")
-      queueFullErrorMessageLogged = true
     }
   }
-
-  /**
-   * Waits until there are no more events in the queue, or until the specified time has elapsed.
-   * Used for testing only. Returns true if the queue has emptied and false is the specified time
-   * elapsed before the queue emptied.
-   */
-  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
-    val finishTime = System.currentTimeMillis + timeoutMillis
-    while (!eventQueue.isEmpty) {
-      if (System.currentTimeMillis > finishTime) {
-        return false
-      }
-      /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
-       * add overhead in the general case. */
-      Thread.sleep(10)
-    }
-    true
-  }
-
-  def stop(): Unit = post(StreamingListenerShutdown)
 }