diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3f1a7dd99d6358f9eec4612a08ba0e1046820afa..e106c5c4bef60054edcb0e980138fba7d234405c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -31,6 +31,7 @@ import scala.collection.JavaConversions._
 import scala.collection.generic.Growable
 import scala.collection.mutable.HashMap
 import scala.reflect.{ClassTag, classTag}
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -50,9 +51,10 @@ import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump}
 import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
   FixedLengthBinaryInputFormat}
 import org.apache.spark.io.CompressionCodec
+import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
-import org.apache.spark.rpc.RpcAddress
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
   SparkDeploySchedulerBackend, SimrSchedulerBackend}
@@ -192,8 +194,42 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   // log out Spark Version in Spark driver log
   logInfo(s"Running Spark version $SPARK_VERSION")
 
-  private[spark] val conf = config.clone()
-  conf.validateSettings()
+  /* ------------------------------------------------------------------------------------- *
+   | Private variables. These variables keep the internal state of the context, and are    |
+   | not accessible by the outside world. They're mutable since we want to initialize all  |
+   | of them to some neutral value ahead of time, so that calling "stop()" while the       |
+   | constructor is still running is safe.                                                 |
+   * ------------------------------------------------------------------------------------- */
+
+  private var _conf: SparkConf = _
+  private var _eventLogDir: Option[URI] = None
+  private var _eventLogCodec: Option[String] = None
+  private var _env: SparkEnv = _
+  private var _metadataCleaner: MetadataCleaner = _
+  private var _jobProgressListener: JobProgressListener = _
+  private var _statusTracker: SparkStatusTracker = _
+  private var _progressBar: Option[ConsoleProgressBar] = None
+  private var _ui: Option[SparkUI] = None
+  private var _hadoopConfiguration: Configuration = _
+  private var _executorMemory: Int = _
+  private var _schedulerBackend: SchedulerBackend = _
+  private var _taskScheduler: TaskScheduler = _
+  private var _heartbeatReceiver: RpcEndpointRef = _
+  @volatile private var _dagScheduler: DAGScheduler = _
+  private var _applicationId: String = _
+  private var _eventLogger: Option[EventLoggingListener] = None
+  private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
+  private var _cleaner: Option[ContextCleaner] = None
+  private var _listenerBusStarted: Boolean = false
+  private var _jars: Seq[String] = _
+  private var _files: Seq[String] = _
+
+  /* ------------------------------------------------------------------------------------- *
+   | Accessors and public fields. These provide access to the internal state of the        |
+   | context.                                                                              |
+   * ------------------------------------------------------------------------------------- */
+
+  private[spark] def conf: SparkConf = _conf
 
   /**
    * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
@@ -201,65 +237,24 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    */
   def getConf: SparkConf = conf.clone()
 
-  if (!conf.contains("spark.master")) {
-    throw new SparkException("A master URL must be set in your configuration")
-  }
-  if (!conf.contains("spark.app.name")) {
-    throw new SparkException("An application name must be set in your configuration")
-  }
-
-  if (conf.getBoolean("spark.logConf", false)) {
-    logInfo("Spark configuration:\n" + conf.toDebugString)
-  }
-
-  // Set Spark driver host and port system properties
-  conf.setIfMissing("spark.driver.host", Utils.localHostName())
-  conf.setIfMissing("spark.driver.port", "0")
-
-  val jars: Seq[String] =
-    conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
-
-  val files: Seq[String] =
-    conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
-
-  val master = conf.get("spark.master")
-  val appName = conf.get("spark.app.name")
+  def jars: Seq[String] = _jars
+  def files: Seq[String] = _files
+  def master: String = _conf.get("spark.master")
+  def appName: String = _conf.get("spark.app.name")
 
-  private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
-  private[spark] val eventLogDir: Option[URI] = {
-    if (isEventLogEnabled) {
-      val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
-        .stripSuffix("/")
-      Some(Utils.resolveURI(unresolvedDir))
-    } else {
-      None
-    }
-  }
-  private[spark] val eventLogCodec: Option[String] = {
-    val compress = conf.getBoolean("spark.eventLog.compress", false)
-    if (compress && isEventLogEnabled) {
-      Some(CompressionCodec.getCodecName(conf)).map(CompressionCodec.getShortName)
-    } else {
-      None
-    }
-  }
+  private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
+  private[spark] def eventLogDir: Option[URI] = _eventLogDir
+  private[spark] def eventLogCodec: Option[String] = _eventLogCodec
 
   // Generate the random name for a temp folder in Tachyon
   // Add a timestamp as the suffix here to make it more safe
   val tachyonFolderName = "spark-" + randomUUID.toString()
-  conf.set("spark.tachyonStore.folderName", tachyonFolderName)
 
-  val isLocal = (master == "local" || master.startsWith("local["))
-
-  if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
+  def isLocal: Boolean = (master == "local" || master.startsWith("local["))
 
   // An asynchronous listener bus for Spark events
   private[spark] val listenerBus = new LiveListenerBus
 
-  conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
-
-  // Create the Spark execution environment (cache, map output tracker, etc)
-
   // This function allows components created by SparkEnv to be mocked in unit tests:
   private[spark] def createSparkEnv(
       conf: SparkConf,
@@ -268,8 +263,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
   }
 
-  private[spark] val env = createSparkEnv(conf, isLocal, listenerBus)
-  SparkEnv.set(env)
+  private[spark] def env: SparkEnv = _env
 
   // Used to store a URL for each static file/jar together with the file's local timestamp
   private[spark] val addedFiles = HashMap[String, Long]()
@@ -277,35 +271,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
 
   // Keeps track of all persisted RDDs
   private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]
-  private[spark] val metadataCleaner =
-    new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
-
+  private[spark] def metadataCleaner: MetadataCleaner = _metadataCleaner
+  private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener
 
-  private[spark] val jobProgressListener = new JobProgressListener(conf)
-  listenerBus.addListener(jobProgressListener)
+  def statusTracker: SparkStatusTracker = _statusTracker
 
-  val statusTracker = new SparkStatusTracker(this)
+  private[spark] def progressBar: Option[ConsoleProgressBar] = _progressBar
 
-  private[spark] val progressBar: Option[ConsoleProgressBar] =
-    if (conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
-      Some(new ConsoleProgressBar(this))
-    } else {
-      None
-    }
-
-  // Initialize the Spark UI
-  private[spark] val ui: Option[SparkUI] =
-    if (conf.getBoolean("spark.ui.enabled", true)) {
-      Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
-        env.securityManager,appName))
-    } else {
-      // For tests, do not enable the UI
-      None
-    }
-
-  // Bind the UI before starting the task scheduler to communicate
-  // the bound port to the cluster manager properly
-  ui.foreach(_.bind())
+  private[spark] def ui: Option[SparkUI] = _ui
 
   /**
    * A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
@@ -313,134 +286,248 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you
    * plan to set some global configurations for all Hadoop RDDs.
    */
-  val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
+  def hadoopConfiguration: Configuration = _hadoopConfiguration
+
+  private[spark] def executorMemory: Int = _executorMemory
+
+  // Environment variables to pass to our executors.
+  private[spark] val executorEnvs = HashMap[String, String]()
+
+  // Set SPARK_USER for user who is running SparkContext.
+  val sparkUser = Utils.getCurrentUserName()
 
-  // Add each JAR given through the constructor
-  if (jars != null) {
-    jars.foreach(addJar)
+  private[spark] def schedulerBackend: SchedulerBackend = _schedulerBackend
+  private[spark] def schedulerBackend_=(sb: SchedulerBackend): Unit = {
+    _schedulerBackend = sb
   }
 
-  if (files != null) {
-    files.foreach(addFile)
+  private[spark] def taskScheduler: TaskScheduler = _taskScheduler
+  private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = {
+    _taskScheduler = ts
   }
 
+  private[spark] def dagScheduler: DAGScheduler = _dagScheduler
+  private[spark] def dagScheduler_=(ds: DAGScheduler): Unit = {
+    _dagScheduler = ds
+  }
+
+  def applicationId: String = _applicationId
+
+  def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null
+
+  private[spark] def eventLogger: Option[EventLoggingListener] = _eventLogger
+
+  private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] =
+    _executorAllocationManager
+
+  private[spark] def cleaner: Option[ContextCleaner] = _cleaner
+
+  private[spark] var checkpointDir: Option[String] = None
+
+  // Thread Local variable that can be used by users to pass information down the stack
+  private val localProperties = new InheritableThreadLocal[Properties] {
+    override protected def childValue(parent: Properties): Properties = new Properties(parent)
+    override protected def initialValue(): Properties = new Properties()
+  }
+
+  /* ------------------------------------------------------------------------------------- *
+   | Initialization. This code initializes the context in a manner that is exception-safe. |
+   | All internal fields holding state are initialized here, and any error prompts the     |
+   | stop() method to be called.                                                           |
+   * ------------------------------------------------------------------------------------- */
+
   private def warnSparkMem(value: String): String = {
     logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
       "deprecated, please use spark.executor.memory instead.")
     value
   }
 
-  private[spark] val executorMemory = conf.getOption("spark.executor.memory")
-    .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
-    .orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem))
-    .map(Utils.memoryStringToMb)
-    .getOrElse(512)
+  try {
+    _conf = config.clone()
+    _conf.validateSettings()
 
-  // Environment variables to pass to our executors.
-  private[spark] val executorEnvs = HashMap[String, String]()
+    if (!_conf.contains("spark.master")) {
+      throw new SparkException("A master URL must be set in your configuration")
+    }
+    if (!_conf.contains("spark.app.name")) {
+      throw new SparkException("An application name must be set in your configuration")
+    }
 
-  // Convert java options to env vars as a work around
-  // since we can't set env vars directly in sbt.
-  for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
-    value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
-    executorEnvs(envKey) = value
-  }
-  Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
-    executorEnvs("SPARK_PREPEND_CLASSES") = v
-  }
-  // The Mesos scheduler backend relies on this environment variable to set executor memory.
-  // TODO: Set this only in the Mesos scheduler.
-  executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
-  executorEnvs ++= conf.getExecutorEnv
+    if (_conf.getBoolean("spark.logConf", false)) {
+      logInfo("Spark configuration:\n" + _conf.toDebugString)
+    }
 
-  // Set SPARK_USER for user who is running SparkContext.
-  val sparkUser = Utils.getCurrentUserName()
-  executorEnvs("SPARK_USER") = sparkUser
+    // Set Spark driver host and port system properties
+    _conf.setIfMissing("spark.driver.host", Utils.localHostName())
+    _conf.setIfMissing("spark.driver.port", "0")
 
-  // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
-  // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
-  private val heartbeatReceiver = env.rpcEnv.setupEndpoint(
-    HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
+    _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
 
-  // Create and start the scheduler
-  private[spark] var (schedulerBackend, taskScheduler) =
-    SparkContext.createTaskScheduler(this, master)
+    _jars =_conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
+    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0))
+      .toSeq.flatten
 
-  heartbeatReceiver.send(TaskSchedulerIsSet)
+    _eventLogDir =
+      if (isEventLogEnabled) {
+        val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
+          .stripSuffix("/")
+        Some(Utils.resolveURI(unresolvedDir))
+      } else {
+        None
+      }
 
-  @volatile private[spark] var dagScheduler: DAGScheduler = _
-  try {
-    dagScheduler = new DAGScheduler(this)
-  } catch {
-    case e: Exception => {
-      try {
-        stop()
-      } finally {
-        throw new SparkException("Error while constructing DAGScheduler", e)
+    _eventLogCodec = {
+      val compress = _conf.getBoolean("spark.eventLog.compress", false)
+      if (compress && isEventLogEnabled) {
+        Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
+      } else {
+        None
       }
     }
-  }
 
-  // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
-  // constructor
-  taskScheduler.start()
+    _conf.set("spark.tachyonStore.folderName", tachyonFolderName)
 
-  val applicationId: String = taskScheduler.applicationId()
-  conf.set("spark.app.id", applicationId)
+    if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
 
-  env.blockManager.initialize(applicationId)
+    // Create the Spark execution environment (cache, map output tracker, etc)
+    _env = createSparkEnv(_conf, isLocal, listenerBus)
+    SparkEnv.set(_env)
 
-  val metricsSystem = env.metricsSystem
+    _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)
 
-  // The metrics system for Driver need to be set spark.app.id to app ID.
-  // So it should start after we get app ID from the task scheduler and set spark.app.id.
-  metricsSystem.start()
-  // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
-  metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
+    _jobProgressListener = new JobProgressListener(_conf)
+    listenerBus.addListener(jobProgressListener)
 
-  // Optionally log Spark events
-  private[spark] val eventLogger: Option[EventLoggingListener] = {
-    if (isEventLogEnabled) {
-      val logger =
-        new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
-      logger.start()
-      listenerBus.addListener(logger)
-      Some(logger)
-    } else None
-  }
+    _statusTracker = new SparkStatusTracker(this)
 
-  // Optionally scale number of executors dynamically based on workload. Exposed for testing.
-  private val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
-  private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false)
-  private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
-    if (dynamicAllocationEnabled) {
-      assert(supportDynamicAllocation,
-        "Dynamic allocation of executors is currently only supported in YARN mode")
-      Some(new ExecutorAllocationManager(this, listenerBus, conf))
-    } else {
-      None
+    _progressBar =
+      if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
+        Some(new ConsoleProgressBar(this))
+      } else {
+        None
+      }
+
+    _ui =
+      if (conf.getBoolean("spark.ui.enabled", true)) {
+        Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
+          _env.securityManager,appName))
+      } else {
+        // For tests, do not enable the UI
+        None
+      }
+    // Bind the UI before starting the task scheduler to communicate
+    // the bound port to the cluster manager properly
+    _ui.foreach(_.bind())
+
+    _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
+
+    // Add each JAR given through the constructor
+    if (jars != null) {
+      jars.foreach(addJar)
     }
-  executorAllocationManager.foreach(_.start())
 
-  private[spark] val cleaner: Option[ContextCleaner] = {
-    if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
-      Some(new ContextCleaner(this))
-    } else {
-      None
+    if (files != null) {
+      files.foreach(addFile)
     }
-  }
-  cleaner.foreach(_.start())
 
-  setupAndStartListenerBus()
-  postEnvironmentUpdate()
-  postApplicationStart()
+    _executorMemory = _conf.getOption("spark.executor.memory")
+      .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
+      .orElse(Option(System.getenv("SPARK_MEM"))
+      .map(warnSparkMem))
+      .map(Utils.memoryStringToMb)
+      .getOrElse(512)
+
+    // Convert java options to env vars as a work around
+    // since we can't set env vars directly in sbt.
+    for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
+      value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
+      executorEnvs(envKey) = value
+    }
+    Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
+      executorEnvs("SPARK_PREPEND_CLASSES") = v
+    }
+    // The Mesos scheduler backend relies on this environment variable to set executor memory.
+    // TODO: Set this only in the Mesos scheduler.
+    executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
+    executorEnvs ++= _conf.getExecutorEnv
+    executorEnvs("SPARK_USER") = sparkUser
+
+    // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
+    // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
+    _heartbeatReceiver = env.rpcEnv.setupEndpoint(
+      HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
+
+    // Create and start the scheduler
+    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
+    _schedulerBackend = sched
+    _taskScheduler = ts
+    _dagScheduler = new DAGScheduler(this)
+    _heartbeatReceiver.send(TaskSchedulerIsSet)
+
+    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
+    // constructor
+    _taskScheduler.start()
+
+    _applicationId = _taskScheduler.applicationId()
+    _conf.set("spark.app.id", _applicationId)
+    _env.blockManager.initialize(_applicationId)
+
+    // The metrics system for Driver need to be set spark.app.id to app ID.
+    // So it should start after we get app ID from the task scheduler and set spark.app.id.
+    metricsSystem.start()
+    // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
+    metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
+
+    _eventLogger =
+      if (isEventLogEnabled) {
+        val logger =
+          new EventLoggingListener(_applicationId, _eventLogDir.get, _conf, _hadoopConfiguration)
+        logger.start()
+        listenerBus.addListener(logger)
+        Some(logger)
+      } else {
+        None
+      }
 
-  private[spark] var checkpointDir: Option[String] = None
+    // Optionally scale number of executors dynamically based on workload. Exposed for testing.
+    val dynamicAllocationEnabled = _conf.getBoolean("spark.dynamicAllocation.enabled", false)
+    _executorAllocationManager =
+      if (dynamicAllocationEnabled) {
+        assert(supportDynamicAllocation,
+          "Dynamic allocation of executors is currently only supported in YARN mode")
+        Some(new ExecutorAllocationManager(this, listenerBus, _conf))
+      } else {
+        None
+      }
+    _executorAllocationManager.foreach(_.start())
 
-  // Thread Local variable that can be used by users to pass information down the stack
-  private val localProperties = new InheritableThreadLocal[Properties] {
-    override protected def childValue(parent: Properties): Properties = new Properties(parent)
-    override protected def initialValue(): Properties = new Properties()
+    _cleaner =
+      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
+        Some(new ContextCleaner(this))
+      } else {
+        None
+      }
+    _cleaner.foreach(_.start())
+
+    setupAndStartListenerBus()
+    postEnvironmentUpdate()
+    postApplicationStart()
+
+    // Post init
+    _taskScheduler.postStartHook()
+    _env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
+    _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
+  } catch {
+    case NonFatal(e) =>
+      logError("Error initializing SparkContext.", e)
+      try {
+        stop()
+      } catch {
+        case NonFatal(inner) =>
+          logError("Error stopping SparkContext after init error.", inner)
+      } finally {
+        throw e
+      }
   }
 
   /**
@@ -544,19 +631,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
   }
 
-  // Post init
-  taskScheduler.postStartHook()
-
-  private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
-  private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
-
-  private def initDriverMetrics() {
-    SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
-    SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
-  }
-
-  initDriverMetrics()
-
   // Methods for creating RDDs
 
   /** Distribute a local Scala collection to form an RDD.
@@ -1146,7 +1220,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * this application is supported. This is currently only available for YARN.
    */
   private[spark] def supportDynamicAllocation =
-    master.contains("yarn") || dynamicAllocationTesting
+    master.contains("yarn") || _conf.getBoolean("spark.dynamicAllocation.testing", false)
 
   /**
    * :: DeveloperApi ::
@@ -1163,7 +1237,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * This is currently only supported in YARN mode. Return whether the request is received.
    */
   private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = {
-    assert(master.contains("yarn") || dynamicAllocationTesting,
+    assert(supportDynamicAllocation,
       "Requesting executors is currently only supported in YARN mode")
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
@@ -1403,28 +1477,40 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   def stop() {
     // Use the stopping variable to ensure no contention for the stop scenario.
     // Still track the stopped variable for use elsewhere in the code.
-    
     if (!stopped.compareAndSet(false, true)) {
       logInfo("SparkContext already stopped.")
       return
     }
-    
+
     postApplicationEnd()
-    ui.foreach(_.stop())
-    env.metricsSystem.report()
-    metadataCleaner.cancel()
-    cleaner.foreach(_.stop()) 
-    executorAllocationManager.foreach(_.stop())
-    dagScheduler.stop()
-    dagScheduler = null
-    listenerBus.stop()
-    eventLogger.foreach(_.stop())
-    env.rpcEnv.stop(heartbeatReceiver)
-    progressBar.foreach(_.stop())
-    taskScheduler = null
+    _ui.foreach(_.stop())
+    if (env != null) {
+      env.metricsSystem.report()
+    }
+    if (metadataCleaner != null) {
+      metadataCleaner.cancel()
+    }
+    _cleaner.foreach(_.stop())
+    _executorAllocationManager.foreach(_.stop())
+    if (_dagScheduler != null) {
+      _dagScheduler.stop()
+      _dagScheduler = null
+    }
+    if (_listenerBusStarted) {
+      listenerBus.stop()
+      _listenerBusStarted = false
+    }
+    _eventLogger.foreach(_.stop())
+    if (env != null && _heartbeatReceiver != null) {
+      env.rpcEnv.stop(_heartbeatReceiver)
+    }
+    _progressBar.foreach(_.stop())
+    _taskScheduler = null
     // TODO: Cache.stop()?
-    env.stop()
-    SparkEnv.set(null)
+    if (_env != null) {
+      _env.stop()
+      SparkEnv.set(null)
+    }
     SparkContext.clearActiveContext()
     logInfo("Successfully stopped SparkContext")
   }
@@ -1749,6 +1835,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     }
 
     listenerBus.start(this)
+    _listenerBusStarted = true
   }
 
   /** Post the application start event */
@@ -2152,7 +2239,7 @@ object SparkContext extends Logging {
     master match {
       case "local" =>
         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
-        val backend = new LocalBackend(scheduler, 1)
+        val backend = new LocalBackend(sc.getConf, scheduler, 1)
         scheduler.initialize(backend)
         (backend, scheduler)
 
@@ -2164,7 +2251,7 @@ object SparkContext extends Logging {
           throw new SparkException(s"Asked to run locally with $threadCount threads")
         }
         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
-        val backend = new LocalBackend(scheduler, threadCount)
+        val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
         scheduler.initialize(backend)
         (backend, scheduler)
 
@@ -2174,7 +2261,7 @@ object SparkContext extends Logging {
         // local[N, M] means exactly N threads with M failures
         val threadCount = if (threads == "*") localCpuCount else threads.toInt
         val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
-        val backend = new LocalBackend(scheduler, threadCount)
+        val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
         scheduler.initialize(backend)
         (backend, scheduler)
 
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 516f619529c48b63b3ca72a7ab306033d0fc1255..1b5fdeba28ee22015c13349e8e6f9544f37579eb 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.lang.management.ManagementFactory
 import java.net.URL
 import java.nio.ByteBuffer
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -60,8 +60,6 @@ private[spark] class Executor(
 
   private val conf = env.conf
 
-  @volatile private var isStopped = false
-
   // No ip or host:port - just hostname
   Utils.checkHost(executorHostname, "Expected executed slave to be a hostname")
   // must not have port specified.
@@ -114,6 +112,10 @@ private[spark] class Executor(
   // Maintains the list of running tasks.
   private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
 
+  // Executor for the heartbeat task.
+  private val heartbeater = Executors.newSingleThreadScheduledExecutor(
+    Utils.namedThreadFactory("driver-heartbeater"))
+
   startDriverHeartbeater()
 
   def launchTask(
@@ -138,7 +140,8 @@ private[spark] class Executor(
   def stop(): Unit = {
     env.metricsSystem.report()
     env.rpcEnv.stop(executorEndpoint)
-    isStopped = true
+    heartbeater.shutdown()
+    heartbeater.awaitTermination(10, TimeUnit.SECONDS)
     threadPool.shutdown()
     if (!isLocal) {
       env.stop()
@@ -432,23 +435,17 @@ private[spark] class Executor(
   }
 
   /**
-   * Starts a thread to report heartbeat and partial metrics for active tasks to driver.
-   * This thread stops running when the executor is stopped.
+   * Schedules a task to report heartbeat and partial metrics for active tasks to driver.
    */
   private def startDriverHeartbeater(): Unit = {
     val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
-    val thread = new Thread() {
-      override def run() {
-        // Sleep a random interval so the heartbeats don't end up in sync
-        Thread.sleep(intervalMs + (math.random * intervalMs).asInstanceOf[Int])
-        while (!isStopped) {
-          reportHeartBeat()
-          Thread.sleep(intervalMs)
-        }
-      }
+
+    // Wait a random interval so the heartbeats don't end up in sync
+    val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
+
+    val heartbeatTask = new Runnable() {
+      override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
     }
-    thread.setDaemon(true)
-    thread.setName("driver-heartbeater")
-    thread.start()
+    heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 8e3c30fc3d781f6134d94e5d45d2de72b85e3c0d..5a74c13b38bf7caaee0df44ef602682bccc72f26 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -86,11 +86,11 @@ private[nio] class ConnectionManager(
       conf.get("spark.network.timeout", "120s"))
 
   // Get the thread counts from the Spark Configuration.
-  // 
+  //
   // Even though the ThreadPoolExecutor constructor takes both a minimum and maximum value,
   // we only query for the minimum value because we are using LinkedBlockingDeque.
-  // 
-  // The JavaDoc for ThreadPoolExecutor points out that when using a LinkedBlockingDeque (which is 
+  //
+  // The JavaDoc for ThreadPoolExecutor points out that when using a LinkedBlockingDeque (which is
   // an unbounded queue) no more than corePoolSize threads will ever be created, so only the "min"
   // parameter is necessary.
   private val handlerThreadCount = conf.getInt("spark.core.connection.handler.threads.min", 20)
@@ -989,6 +989,7 @@ private[nio] class ConnectionManager(
 
   def stop() {
     ackTimeoutMonitor.stop()
+    selector.wakeup()
     selectorThread.interrupt()
     selectorThread.join()
     selector.close()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index ecc8bf189986dcfa836a7c5b17da68ef6ce92534..13a52d836f32fb87a16d4bb220166c20c9e6bf11 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -142,11 +142,10 @@ private[spark] class TaskSchedulerImpl(
 
     if (!isLocal && conf.getBoolean("spark.speculation", false)) {
       logInfo("Starting speculative execution thread")
-      import sc.env.actorSystem.dispatcher
       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds,
             SPECULATION_INTERVAL_MS milliseconds) {
         Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
-      }
+      }(sc.env.actorSystem.dispatcher)
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 70a477a6895cc74d3f854ad945d3c7c34bfdbe94..50ba0b9d5a6122f40614d2588987a6e82b7e885c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -20,12 +20,12 @@ package org.apache.spark.scheduler.local
 import java.nio.ByteBuffer
 import java.util.concurrent.{Executors, TimeUnit}
 
-import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEndpointRef, RpcEnv}
-import org.apache.spark.util.Utils
-import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.{Executor, ExecutorBackend}
+import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.util.Utils
 
 private case class ReviveOffers()
 
@@ -71,11 +71,15 @@ private[spark] class LocalEndpoint(
 
     case KillTask(taskId, interruptThread) =>
       executor.killTask(taskId, interruptThread)
+  }
 
+  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
     case StopExecutor =>
       executor.stop()
+      context.reply(true)
   }
 
+
   def reviveOffers() {
     val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
     val tasks = scheduler.resourceOffers(offers).flatten
@@ -104,8 +108,11 @@ private[spark] class LocalEndpoint(
  * master all run in the same JVM. It sits behind a TaskSchedulerImpl and handles launching tasks
  * on a single Executor (created by the LocalBackend) running locally.
  */
-private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)
-  extends SchedulerBackend with ExecutorBackend {
+private[spark] class LocalBackend(
+    conf: SparkConf,
+    scheduler: TaskSchedulerImpl,
+    val totalCores: Int)
+  extends SchedulerBackend with ExecutorBackend with Logging {
 
   private val appId = "local-" + System.currentTimeMillis
   var localEndpoint: RpcEndpointRef = null
@@ -116,7 +123,7 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
   }
 
   override def stop() {
-    localEndpoint.send(StopExecutor)
+    localEndpoint.sendWithReply(StopExecutor)
   }
 
   override def reviveOffers() {
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 6b3049b28cd5e5c9feddee515ce655e18a9bfead..22acc270b983e15dc3e3ce3f6123aacb4d3d9355 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -56,19 +56,13 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
     // Min < 0
     val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
     intercept[SparkException] { contexts += new SparkContext(conf1) }
-    SparkEnv.get.stop()
-    SparkContext.clearActiveContext()
 
     // Max < 0
     val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
     intercept[SparkException] { contexts += new SparkContext(conf2) }
-    SparkEnv.get.stop()
-    SparkContext.clearActiveContext()
 
     // Both min and max, but min > max
     intercept[SparkException] { createSparkContext(2, 1) }
-    SparkEnv.get.stop()
-    SparkContext.clearActiveContext()
 
     // Both min and max, and min == max
     val sc1 = createSparkContext(1, 1)