diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1a2443f7ee78d961ada044b0a1ba990d0f8555b6..b2a26c51d4de11057cdf1ac0e97975594deaee4e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -195,6 +195,7 @@ class SparkContext(config: SparkConf) extends Logging {
   private var _conf: SparkConf = _
   private var _eventLogDir: Option[URI] = None
   private var _eventLogCodec: Option[String] = None
+  private var _listenerBus: LiveListenerBus = _
   private var _env: SparkEnv = _
   private var _jobProgressListener: JobProgressListener = _
   private var _statusTracker: SparkStatusTracker = _
@@ -247,7 +248,7 @@ class SparkContext(config: SparkConf) extends Logging {
   def isStopped: Boolean = stopped.get()
 
   // An asynchronous listener bus for Spark events
-  private[spark] val listenerBus = new LiveListenerBus(this)
+  private[spark] def listenerBus: LiveListenerBus = _listenerBus
 
   // This function allows components created by SparkEnv to be mocked in unit tests:
   private[spark] def createSparkEnv(
@@ -423,6 +424,8 @@ class SparkContext(config: SparkConf) extends Logging {
 
     if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true")
 
+    _listenerBus = new LiveListenerBus(_conf)
+
     // "_jobProgressListener" should be set up before creating SparkEnv because when creating
     // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
     _jobProgressListener = new JobProgressListener(_conf)
@@ -2388,7 +2391,7 @@ class SparkContext(config: SparkConf) extends Logging {
         }
     }
 
-    listenerBus.start()
+    listenerBus.start(this, _env.metricsSystem)
     _listenerBusStarted = true
   }
 
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 4ad04b04c312d36e9f2e30ff09bb694e4b5df57e..7827e6760f355bbbe733d2586f7a49c79bbf0aec 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -158,6 +158,12 @@ package object config {
       .checkValue(_ > 0, "The capacity of listener bus event queue must not be negative")
       .createWithDefault(10000)
 
+  private[spark] val LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED =
+    ConfigBuilder("spark.scheduler.listenerbus.metrics.maxListenerClassesTimed")
+      .internal()
+      .intConf
+      .createWithDefault(128)
+
   // This property sets the root namespace for metrics reporting
   private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace")
     .stringConf
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 801dfaa62306ac695f1c0cc10dede690d330ff23..f0887e090b9562225b262a4032f4e69057ef9ce0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -20,10 +20,16 @@ package org.apache.spark.scheduler
 import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 
+import scala.collection.mutable
 import scala.util.DynamicVariable
 
-import org.apache.spark.SparkContext
+import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer}
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.metrics.source.Source
 import org.apache.spark.util.Utils
 
 /**
@@ -33,15 +39,20 @@ import org.apache.spark.util.Utils
  * has started will events be actually propagated to all attached listeners. This listener bus
  * is stopped when `stop()` is called, and it will drop further events after stopping.
  */
-private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
+private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
+
   self =>
 
   import LiveListenerBus._
 
+  private var sparkContext: SparkContext = _
+
   // Cap the capacity of the event queue so we get an explicit error (rather than
   // an OOM exception) if it's perpetually being added to more quickly than it's being drained.
-  private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
-    sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
+  private val eventQueue =
+    new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
+
+  private[spark] val metrics = new LiveListenerBusMetrics(conf, eventQueue)
 
   // Indicate if `start()` is called
   private val started = new AtomicBoolean(false)
@@ -67,6 +78,7 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
     setDaemon(true)
     override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
       LiveListenerBus.withinListenerThread.withValue(true) {
+        val timer = metrics.eventProcessingTime
         while (true) {
           eventLock.acquire()
           self.synchronized {
@@ -82,7 +94,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
               }
               return
             }
-            postToAll(event)
+            val timerContext = timer.time()
+            try {
+              postToAll(event)
+            } finally {
+              timerContext.stop()
+            }
           } finally {
             self.synchronized {
               processingEvent = false
@@ -93,6 +110,10 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
     }
   }
 
+  override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = {
+    metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
+  }
+
   /**
    * Start sending events to attached listeners.
    *
@@ -100,9 +121,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
    * listens for any additional events asynchronously while the listener bus is still running.
    * This should only be called once.
    *
+   * @param sc Used to stop the SparkContext in case the listener thread dies.
    */
-  def start(): Unit = {
+  def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = {
     if (started.compareAndSet(false, true)) {
+      sparkContext = sc
+      metricsSystem.registerSource(metrics)
       listenerThread.start()
     } else {
       throw new IllegalStateException(s"$name already started!")
@@ -115,12 +139,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
       logError(s"$name has already stopped! Dropping event $event")
       return
     }
+    metrics.numEventsPosted.inc()
     val eventAdded = eventQueue.offer(event)
     if (eventAdded) {
       eventLock.release()
     } else {
       onDropEvent(event)
-      droppedEventsCounter.incrementAndGet()
     }
 
     val droppedEvents = droppedEventsCounter.get
@@ -200,6 +224,8 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
    * Note: `onDropEvent` can be called in any thread.
    */
   def onDropEvent(event: SparkListenerEvent): Unit = {
+    metrics.numDroppedEvents.inc()
+    droppedEventsCounter.incrementAndGet()
     if (logDroppedEvent.compareAndSet(false, true)) {
       // Only log the following message once to avoid duplicated annoying logs.
       logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
@@ -217,3 +243,64 @@ private[spark] object LiveListenerBus {
   val name = "SparkListenerBus"
 }
 
+private[spark] class LiveListenerBusMetrics(
+    conf: SparkConf,
+    queue: LinkedBlockingQueue[_])
+  extends Source with Logging {
+
+  override val sourceName: String = "LiveListenerBus"
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  /**
+   * The total number of events posted to the LiveListenerBus. This is a count of the total number
+   * of events which have been produced by the application and sent to the listener bus, NOT a
+   * count of the number of events which have been processed and delivered to listeners (or dropped
+   * without being delivered).
+   */
+  val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
+
+  /**
+   * The total number of events that were dropped without being delivered to listeners.
+   */
+  val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
+
+  /**
+   * The amount of time taken to post a single event to all listeners.
+   */
+  val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
+
+  /**
+   * The number of messages waiting in the queue.
+   */
+  val queueSize: Gauge[Int] = {
+    metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{
+      override def getValue: Int = queue.size()
+    })
+  }
+
+  // Guarded by synchronization.
+  private val perListenerClassTimers = mutable.Map[String, Timer]()
+
+  /**
+   * Returns a timer tracking the processing time of the given listener class.
+   * events processed by that listener. This method is thread-safe.
+   */
+  def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = {
+    synchronized {
+      val className = cls.getName
+      val maxTimed = conf.get(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED)
+      perListenerClassTimers.get(className).orElse {
+        if (perListenerClassTimers.size == maxTimed) {
+          logError(s"Not measuring processing time for listener class $className because a " +
+            s"maximum of $maxTimed listener classes are already timed.")
+          None
+        } else {
+          perListenerClassTimers(className) =
+            metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", className))
+          perListenerClassTimers.get(className)
+        }
+      }
+    }
+  }
+}
+
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
index fa5ad4e8d81e1db9e338da6dc4bff626c6ac2021..76a56298aaebc7e011a3732f769e2b53194e967b 100644
--- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -23,6 +23,8 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
+import com.codahale.metrics.Timer
+
 import org.apache.spark.internal.Logging
 
 /**
@@ -30,14 +32,22 @@ import org.apache.spark.internal.Logging
  */
 private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
 
+  private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]
+
   // Marked `private[spark]` for access in tests.
-  private[spark] val listeners = new CopyOnWriteArrayList[L]
+  private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
+
+  /**
+   * Returns a CodaHale metrics Timer for measuring the listener's event processing time.
+   * This method is intended to be overridden by subclasses.
+   */
+  protected def getTimer(listener: L): Option[Timer] = None
 
   /**
    * Add a listener to listen events. This method is thread-safe and can be called in any thread.
    */
   final def addListener(listener: L): Unit = {
-    listeners.add(listener)
+    listenersPlusTimers.add((listener, getTimer(listener)))
   }
 
   /**
@@ -45,7 +55,9 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
    * in any thread.
    */
   final def removeListener(listener: L): Unit = {
-    listeners.remove(listener)
+    listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
+      listenersPlusTimers.remove(listenerAndTimer)
+    }
   }
 
   /**
@@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
     // JavaConverters can create a JIterableWrapper if we use asScala.
     // However, this method will be called frequently. To avoid the wrapper cost, here we use
     // Java Iterator directly.
-    val iter = listeners.iterator
+    val iter = listenersPlusTimers.iterator
     while (iter.hasNext) {
-      val listener = iter.next()
+      val listenerAndMaybeTimer = iter.next()
+      val listener = listenerAndMaybeTimer._1
+      val maybeTimer = listenerAndMaybeTimer._2
+      val maybeTimerContext = if (maybeTimer.isDefined) {
+        maybeTimer.get.time()
+      } else {
+        null
+      }
       try {
         doPostEvent(listener, event)
       } catch {
         case NonFatal(e) =>
           logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
+      } finally {
+        if (maybeTimerContext != null) {
+          maybeTimerContext.stop()
+        }
       }
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 4c3d0b102152c8847063a724708d130d2698ead8..4cae6c61118a862db7b8c594bcc2d0f6b0afeb54 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -25,12 +25,14 @@ import scala.io.Source
 
 import org.apache.hadoop.fs.Path
 import org.json4s.jackson.JsonMethods._
+import org.mockito.Mockito
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.io._
+import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.{JsonProtocol, Utils}
 
 /**
@@ -155,17 +157,18 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
     extraConf.foreach { case (k, v) => conf.set(k, v) }
     val logName = compressionCodec.map("test-" + _).getOrElse("test")
     val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
-    val listenerBus = new LiveListenerBus(sc)
+    val listenerBus = new LiveListenerBus(conf)
     val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
       125L, "Mickey", None)
     val applicationEnd = SparkListenerApplicationEnd(1000L)
 
     // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
     eventLogger.start()
-    listenerBus.start()
+    listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem]))
     listenerBus.addListener(eventLogger)
     listenerBus.postToAll(applicationStart)
     listenerBus.postToAll(applicationEnd)
+    listenerBus.stop()
     eventLogger.stop()
 
     // Verify file contains exactly the two events logged
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 80c7e0bfee6ef94bd852208defbadfc9f9dc2b4d..f3d0bc19675fc78b65c5c1f7fa813a3611804373 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -22,10 +22,13 @@ import java.util.concurrent.Semaphore
 import scala.collection.mutable
 import scala.collection.JavaConverters._
 
+import org.mockito.Mockito
 import org.scalatest.Matchers
 
 import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.config.LISTENER_BUS_EVENT_QUEUE_CAPACITY
+import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
 
 class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers
@@ -36,14 +39,17 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
 
   val jobCompletionTime = 1421191296660L
 
+  private val mockSparkContext: SparkContext = Mockito.mock(classOf[SparkContext])
+  private val mockMetricsSystem: MetricsSystem = Mockito.mock(classOf[MetricsSystem])
+
   test("don't call sc.stop in listener") {
     sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
     val listener = new SparkContextStoppingListener(sc)
-    val bus = new LiveListenerBus(sc)
+    val bus = new LiveListenerBus(sc.conf)
     bus.addListener(listener)
 
     // Starting listener bus should flush all buffered events
-    bus.start()
+    bus.start(sc, sc.env.metricsSystem)
     bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
     bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
 
@@ -52,35 +58,54 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
   }
 
   test("basic creation and shutdown of LiveListenerBus") {
-    sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
+    val conf = new SparkConf()
     val counter = new BasicJobCounter
-    val bus = new LiveListenerBus(sc)
+    val bus = new LiveListenerBus(conf)
     bus.addListener(counter)
 
-    // Listener bus hasn't started yet, so posting events should not increment counter
+    // Metrics are initially empty.
+    assert(bus.metrics.numEventsPosted.getCount === 0)
+    assert(bus.metrics.numDroppedEvents.getCount === 0)
+    assert(bus.metrics.queueSize.getValue === 0)
+    assert(bus.metrics.eventProcessingTime.getCount === 0)
+
+    // Post five events:
     (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
+
+    // Five messages should be marked as received and queued, but no messages should be posted to
+    // listeners yet because the the listener bus hasn't been started.
+    assert(bus.metrics.numEventsPosted.getCount === 5)
+    assert(bus.metrics.queueSize.getValue === 5)
     assert(counter.count === 0)
 
     // Starting listener bus should flush all buffered events
-    bus.start()
+    bus.start(mockSparkContext, mockMetricsSystem)
+    Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
     bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(counter.count === 5)
+    assert(bus.metrics.queueSize.getValue === 0)
+    assert(bus.metrics.eventProcessingTime.getCount === 5)
 
     // After listener bus has stopped, posting events should not increment counter
     bus.stop()
     (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
     assert(counter.count === 5)
+    assert(bus.metrics.numEventsPosted.getCount === 5)
+
+    // Make sure per-listener-class timers were created:
+    assert(bus.metrics.getTimerForListenerClass(
+      classOf[BasicJobCounter].asSubclass(classOf[SparkListenerInterface])).get.getCount == 5)
 
     // Listener bus must not be started twice
     intercept[IllegalStateException] {
-      val bus = new LiveListenerBus(sc)
-      bus.start()
-      bus.start()
+      val bus = new LiveListenerBus(conf)
+      bus.start(mockSparkContext, mockMetricsSystem)
+      bus.start(mockSparkContext, mockMetricsSystem)
     }
 
     // ... or stopped before starting
     intercept[IllegalStateException] {
-      val bus = new LiveListenerBus(sc)
+      val bus = new LiveListenerBus(conf)
       bus.stop()
     }
   }
@@ -107,12 +132,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
         drained = true
       }
     }
-    sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
-    val bus = new LiveListenerBus(sc)
+    val bus = new LiveListenerBus(new SparkConf())
     val blockingListener = new BlockingListener
 
     bus.addListener(blockingListener)
-    bus.start()
+    bus.start(mockSparkContext, mockMetricsSystem)
     bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
 
     listenerStarted.acquire()
@@ -138,6 +162,44 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     assert(drained)
   }
 
+  test("metrics for dropped listener events") {
+    val bus = new LiveListenerBus(new SparkConf().set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 1))
+
+    val listenerStarted = new Semaphore(0)
+    val listenerWait = new Semaphore(0)
+
+    bus.addListener(new SparkListener {
+      override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+        listenerStarted.release()
+        listenerWait.acquire()
+      }
+    })
+
+    bus.start(mockSparkContext, mockMetricsSystem)
+
+    // Post a message to the listener bus and wait for processing to begin:
+    bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+    listenerStarted.acquire()
+    assert(bus.metrics.queueSize.getValue === 0)
+    assert(bus.metrics.numDroppedEvents.getCount === 0)
+
+    // If we post an additional message then it should remain in the queue because the listener is
+    // busy processing the first event:
+    bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+    assert(bus.metrics.queueSize.getValue === 1)
+    assert(bus.metrics.numDroppedEvents.getCount === 0)
+
+    // The queue is now full, so any additional events posted to the listener will be dropped:
+    bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+    assert(bus.metrics.queueSize.getValue === 1)
+    assert(bus.metrics.numDroppedEvents.getCount === 1)
+
+
+    // Allow the the remaining events to be processed so we can stop the listener bus:
+    listenerWait.release(2)
+    bus.stop()
+  }
+
   test("basic creation of StageInfo") {
     sc = new SparkContext("local", "SparkListenerSuite")
     val listener = new SaveStageAndTaskInfo
@@ -354,14 +416,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     val badListener = new BadListener
     val jobCounter1 = new BasicJobCounter
     val jobCounter2 = new BasicJobCounter
-    sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
-    val bus = new LiveListenerBus(sc)
+    val bus = new LiveListenerBus(new SparkConf())
 
     // Propagate events to bad listener first
     bus.addListener(badListener)
     bus.addListener(jobCounter1)
     bus.addListener(jobCounter2)
-    bus.start()
+    bus.start(mockSparkContext, mockMetricsSystem)
 
     // Post events to all listeners, and wait until the queue is drained
     (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index c100803279eafc52d39242e6d04f492842a2c8e0..dd61dcd11bcda9ff01ac2178f84980e042d556f3 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -100,7 +100,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
     sc = new SparkContext("local", "test", conf)
     master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
       new BlockManagerMasterEndpoint(rpcEnv, true, conf,
-        new LiveListenerBus(sc))), conf, true)
+        new LiveListenerBus(conf))), conf, true)
     allStores.clear()
   }
 
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 0d2912ba8c5fbcc694b29fda5022948bcd868d69..9d52b488b223e1ae58bc7a5ff5edb1264442bb10 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -125,7 +125,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     when(sc.conf).thenReturn(conf)
     master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
       new BlockManagerMasterEndpoint(rpcEnv, true, conf,
-        new LiveListenerBus(sc))), conf, true)
+        new LiveListenerBus(conf))), conf, true)
 
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index f6c8418ba3ac4193b0fc693567f70d98fff474e0..66dda382eb653aad464914c46aaf2b7bd9bbf413 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.storage._
 /**
  * Test various functionality in the StorageListener that supports the StorageTab.
  */
-class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter {
+class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
   private var bus: LiveListenerBus = _
   private var storageStatusListener: StorageStatusListener = _
   private var storageListener: StorageListener = _
@@ -43,8 +43,7 @@ class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAn
 
   before {
     val conf = new SparkConf()
-    sc = new SparkContext("local", "test", conf)
-    bus = new LiveListenerBus(sc)
+    bus = new LiveListenerBus(conf)
     storageStatusListener = new StorageStatusListener(conf)
     storageListener = new StorageListener(storageStatusListener)
     bus.addListener(storageStatusListener)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 3c4a2716caf9012e5e96b01028164ba1f14a9e6e..fe65353b9d502bd5870cf05df67aa094cec5bea0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -50,7 +50,6 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
   extends SparkFunSuite
   with BeforeAndAfter
   with Matchers
-  with LocalSparkContext
   with Logging {
 
   import WriteAheadLogBasedBlockHandler._
@@ -89,10 +88,9 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
     rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
     conf.set("spark.driver.port", rpcEnv.address.port.toString)
 
-    sc = new SparkContext("local", "test", conf)
     blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
       new BlockManagerMasterEndpoint(rpcEnv, true, conf,
-        new LiveListenerBus(sc))), conf, true)
+        new LiveListenerBus(conf))), conf, true)
 
     storageLevel = StorageLevel.MEMORY_ONLY_SER
     blockManager = createBlockManager(blockManagerSize, conf)