diff --git a/bin/slaves.sh b/bin/slaves.sh
index 752565b759f77384b169f917bf212a474abd59ae..c367c2fd8ed9ae152607e54eb51b162788521613 100755
--- a/bin/slaves.sh
+++ b/bin/slaves.sh
@@ -28,7 +28,7 @@
 #   SPARK_SSH_OPTS Options passed to ssh when running remote commands.
 ##
 
-usage="Usage: slaves.sh [--config confdir] command..."
+usage="Usage: slaves.sh [--config <conf-dir>] command..."
 
 # if no args specified, show usage
 if [ $# -le 0 ]; then
@@ -46,6 +46,23 @@ bin=`cd "$bin"; pwd`
 # spark-env.sh. Save it here.
 HOSTLIST=$SPARK_SLAVES
 
+# Check if --config is passed as an argument. It is an optional parameter.
+# Exit if the argument is not a directory.
+if [ "$1" == "--config" ]
+then
+  shift
+  conf_dir=$1
+  if [ ! -d "$conf_dir" ]
+  then
+    echo "ERROR : $conf_dir is not a directory"
+    echo $usage
+    exit 1
+  else
+    export SPARK_CONF_DIR=$conf_dir
+  fi
+  shift
+fi
+
 if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
   . "${SPARK_CONF_DIR}/spark-env.sh"
 fi
diff --git a/bin/spark-daemon.sh b/bin/spark-daemon.sh
index 5bfe967fbfaeb76729ae3a30b81287cfb6021b55..a0c0d44b58d9991f2d94ed16c41269baabe15900 100755
--- a/bin/spark-daemon.sh
+++ b/bin/spark-daemon.sh
@@ -29,7 +29,7 @@
 #   SPARK_NICENESS The scheduling priority for daemons. Defaults to 0.
 ##
 
-usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <spark-instance-number> <args...>"
+usage="Usage: spark-daemon.sh [--config <conf-dir>] (start|stop) <spark-command> <spark-instance-number> <args...>"
 
 # if no args specified, show usage
 if [ $# -le 1 ]; then
@@ -43,6 +43,25 @@ bin=`cd "$bin"; pwd`
 . "$bin/spark-config.sh"
 
 # get arguments
+
+# Check if --config is passed as an argument. It is an optional parameter.
+# Exit if the argument is not a directory.
+
+if [ "$1" == "--config" ]
+then
+  shift
+  conf_dir=$1
+  if [ ! -d "$conf_dir" ]
+  then
+    echo "ERROR : $conf_dir is not a directory"
+    echo $usage
+    exit 1
+  else
+    export SPARK_CONF_DIR=$conf_dir
+  fi
+  shift
+fi
+
 startStop=$1
 shift
 command=$1
diff --git a/bin/spark-daemons.sh b/bin/spark-daemons.sh
index 354eb905a1c069dd428111157098b2ff86aa8e64..64286cb2da4f5acee66db6dd19ed442190d6466c 100755
--- a/bin/spark-daemons.sh
+++ b/bin/spark-daemons.sh
@@ -19,7 +19,7 @@
 
 # Run a Spark command on all slave hosts.
 
-usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command instance-number args..."
+usage="Usage: spark-daemons.sh [--config <conf-dir>] [start|stop] command instance-number args..."
 
 # if no args specified, show usage
 if [ $# -le 1 ]; then
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7db6b6b8bce18da8fea0b1457a50b725bdd72eef..b3a2cb39fcdd13ed26ba2c30698ae8a546dd6a61 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -56,7 +56,7 @@ import org.apache.spark.deploy.LocalSparkCluster
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
 import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend,
   ClusterScheduler}
 import org.apache.spark.scheduler.local.LocalScheduler
 import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
@@ -213,7 +213,7 @@ class SparkContext(
             throw new SparkException("YARN mode not available ?", th)
           }
         }
-        val backend = new StandaloneSchedulerBackend(scheduler, this.env.actorSystem)
+        val backend = new CoarseGrainedSchedulerBackend(scheduler, this.env.actorSystem)
         scheduler.initialize(backend)
         scheduler
 
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 2bab9d6e3d9280c2173fa2b6f2498c32d03bb6cc..afa76a4a76c412fbd2fe20b8800e6882a9ec56e7 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -36,7 +36,10 @@ import org.apache.spark.SerializableWritable
  * Saves the RDD using a JobConf, which should contain an output key class, an output value class,
  * a filename to write to, etc, exactly like in a Hadoop MapReduce job.
  */
-class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable {
+class SparkHadoopWriter(@transient jobConf: JobConf)
+  extends Logging
+  with SparkHadoopMapRedUtil
+  with Serializable {
 
   private val now = new Date()
   private val conf = new SerializableWritable(jobConf)
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 51584d686d1d4d5cb6206465d7345a946c8864c9..cae983ed4c652cc936f6df4f704a04cd9b1cd2b6 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.executor.TaskMetrics
 
 class TaskContext(
-  private[spark] val stageId: Int,
+  val stageId: Int,
   val partitionId: Int,
   val attemptId: Long,
   val runningLocally: Boolean = false,
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
index b6c484bfe1967dbe9d99cacf87a00c796110d402..5332510e87cafc638ea2b0576021d9eeaef12ef7 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
@@ -326,7 +326,8 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
     private var blocksInRequestBitVector = new BitSet(totalBlocks)
 
     override def run() {
-      var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots)
+      var threadPool = Utils.newDaemonFixedThreadPool(
+        MultiTracker.MaxChatSlots, "Bit Torrent Chatter")
 
       while (hasBlocks.get < totalBlocks) {
         var numThreadsToCreate = 0
@@ -736,7 +737,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
     private var setOfCompletedSources = Set[SourceInfo]()
 
     override def run() {
-      var threadPool = Utils.newDaemonCachedThreadPool()
+      var threadPool = Utils.newDaemonCachedThreadPool("Bit torrent guide multiple requests")
       var serverSocket: ServerSocket = null
 
       serverSocket = new ServerSocket(0)
@@ -927,7 +928,8 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
   class ServeMultipleRequests
   extends Thread with Logging {
     // Server at most MultiTracker.MaxChatSlots peers
-    var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots)
+    var threadPool = Utils.newDaemonFixedThreadPool(
+      MultiTracker.MaxChatSlots, "Bit torrent serve multiple requests")
 
     override def run() {
       var serverSocket = new ServerSocket(0)
diff --git a/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala b/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala
index 21ec94659e6011e63b8164b2ec53b91d6effb83c..82ed64f190b4f1605d97380b8346ebaad35e5b46 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala
@@ -137,7 +137,7 @@ extends Logging {
   class TrackMultipleValues
   extends Thread with Logging {
     override def run() {
-      var threadPool = Utils.newDaemonCachedThreadPool()
+      var threadPool = Utils.newDaemonCachedThreadPool("Track multiple values")
       var serverSocket: ServerSocket = null
 
       serverSocket = new ServerSocket(DriverTrackerPort)
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
index e6674d49a7226bda26c7e1d44f8290a98956a877..51af80a35ebec01e876524136fdc48f645919407 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
@@ -291,7 +291,7 @@ extends Broadcast[T](id) with Logging with Serializable {
     private var setOfCompletedSources = Set[SourceInfo]()
 
     override def run() {
-      var threadPool = Utils.newDaemonCachedThreadPool()
+      var threadPool = Utils.newDaemonCachedThreadPool("Tree broadcast guide multiple requests")
       var serverSocket: ServerSocket = null
 
       serverSocket = new ServerSocket(0)
@@ -493,7 +493,7 @@ extends Broadcast[T](id) with Logging with Serializable {
   class ServeMultipleRequests
   extends Thread with Logging {
     
-    var threadPool = Utils.newDaemonCachedThreadPool()
+    var threadPool = Utils.newDaemonCachedThreadPool("Tree broadcast serve multiple requests")
     
     override def run() {      
       var serverSocket = new ServerSocket(0)
diff --git a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
similarity index 90%
rename from core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
rename to core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index db0bea0472a52eb5a400bb435d0088ecabc0930c..52b1c492b237b3b16fb76c647f43e50df332d2e8 100644
--- a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -24,11 +24,11 @@ import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClie
 
 import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{Utils, AkkaUtils}
 
 
-private[spark] class StandaloneExecutorBackend(
+private[spark] class CoarseGrainedExecutorBackend(
     driverUrl: String,
     executorId: String,
     hostPort: String,
@@ -87,7 +87,7 @@ private[spark] class StandaloneExecutorBackend(
   }
 }
 
-private[spark] object StandaloneExecutorBackend {
+private[spark] object CoarseGrainedExecutorBackend {
   def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
     // Debug code
     Utils.checkHost(hostname)
@@ -99,7 +99,7 @@ private[spark] object StandaloneExecutorBackend {
     val sparkHostPort = hostname + ":" + boundPort
     System.setProperty("spark.hostPort", sparkHostPort)
     val actor = actorSystem.actorOf(
-      Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
+      Props(new CoarseGrainedExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
       name = "Executor")
     actorSystem.awaitTermination()
   }
@@ -107,7 +107,9 @@ private[spark] object StandaloneExecutorBackend {
   def main(args: Array[String]) {
     if (args.length < 4) {
       //the reason we allow the last frameworkId argument is to make it easy to kill rogue executors
-      System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <executorId> <hostname> <cores> [<appid>]")
+      System.err.println(
+        "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> <cores> " +
+        "[<appid>]")
       System.exit(1)
     }
     run(args(0), args(1), args(2), args(3).toInt)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 20323ea038fc2e6bbf62836eb7052e83c34e5bb1..032eb04f43f0afc4d554e7a7c531d366a0fd91ff 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -121,8 +121,7 @@ private[spark] class Executor(
   }
 
   // Start worker thread pool
-  val threadPool = new ThreadPoolExecutor(
-    1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable], Utils.daemonThreadFactory)
+  val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
 
   // Maintains the list of running tasks.
   private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index e15a839c4e7a79ca14214fc66ca951726ad6253b..9c2fee4023be613ca8fa04c7cfdfcc5bfa64916e 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -79,7 +79,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
   private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
   private val registerRequests = new SynchronizedQueue[SendingConnection]
 
-  implicit val futureExecContext = ExecutionContext.fromExecutor(Utils.newDaemonCachedThreadPool())
+  implicit val futureExecContext = ExecutionContext.fromExecutor(
+    Utils.newDaemonCachedThreadPool("Connection manager future execution context"))
 
   private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7fb614402b7b7fc5f268dc8c1d823833cfc9b075..15a04e6558685936f8e9f1576a6dadee9281cfef 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -55,20 +55,20 @@ class DAGScheduler(
     mapOutputTracker: MapOutputTracker,
     blockManagerMaster: BlockManagerMaster,
     env: SparkEnv)
-  extends TaskSchedulerListener with Logging {
+  extends Logging {
 
   def this(taskSched: TaskScheduler) {
     this(taskSched, SparkEnv.get.mapOutputTracker, SparkEnv.get.blockManager.master, SparkEnv.get)
   }
-  taskSched.setListener(this)
+  taskSched.setDAGScheduler(this)
 
   // Called by TaskScheduler to report task's starting.
-  override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
+  def taskStarted(task: Task[_], taskInfo: TaskInfo) {
     eventQueue.put(BeginEvent(task, taskInfo))
   }
 
   // Called by TaskScheduler to report task completions or failures.
-  override def taskEnded(
+  def taskEnded(
       task: Task[_],
       reason: TaskEndReason,
       result: Any,
@@ -79,18 +79,18 @@ class DAGScheduler(
   }
 
   // Called by TaskScheduler when an executor fails.
-  override def executorLost(execId: String) {
+  def executorLost(execId: String) {
     eventQueue.put(ExecutorLost(execId))
   }
 
   // Called by TaskScheduler when a host is added
-  override def executorGained(execId: String, host: String) {
+  def executorGained(execId: String, host: String) {
     eventQueue.put(ExecutorGained(execId, host))
   }
 
   // Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
   // cancellation of the job itself.
-  override def taskSetFailed(taskSet: TaskSet, reason: String) {
+  def taskSetFailed(taskSet: TaskSet, reason: String) {
     eventQueue.put(TaskSetFailed(taskSet, reason))
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 6a51efe8d6c05f9828c62413a0dc6710b35b381e..10e047810827c974e7e9230bca60693c1e67cf40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -24,8 +24,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
  * Each TaskScheduler schedulers task for a single SparkContext.
  * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
  * and are responsible for sending the tasks to the cluster, running them, retrying if there
- * are failures, and mitigating stragglers. They return events to the DAGScheduler through
- * the TaskSchedulerListener interface.
+ * are failures, and mitigating stragglers. They return events to the DAGScheduler.
  */
 private[spark] trait TaskScheduler {
 
@@ -48,8 +47,8 @@ private[spark] trait TaskScheduler {
   // Cancel a stage.
   def cancelTasks(stageId: Int)
 
-  // Set a listener for upcalls. This is guaranteed to be set before submitTasks is called.
-  def setListener(listener: TaskSchedulerListener): Unit
+  // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
+  def setDAGScheduler(dagScheduler: DAGScheduler): Unit
 
   // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
   def defaultParallelism(): Int
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
deleted file mode 100644
index 593fa9fb93a55624573750e611c265c5de18d8a0..0000000000000000000000000000000000000000
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler
-
-import scala.collection.mutable.Map
-
-import org.apache.spark.TaskEndReason
-import org.apache.spark.executor.TaskMetrics
-
-/**
- * Interface for getting events back from the TaskScheduler.
- */
-private[spark] trait TaskSchedulerListener {
-  // A task has started.
-  def taskStarted(task: Task[_], taskInfo: TaskInfo)
-
-  // A task has finished or failed.
-  def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any],
-                taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
-
-  // A node was added to the cluster.
-  def executorGained(execId: String, host: String): Unit
-
-  // A node was lost from the cluster.
-  def executorLost(execId: String): Unit
-
-  // The TaskScheduler wants to abort an entire task set.
-  def taskSetFailed(taskSet: TaskSet, reason: String): Unit
-}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 7a72ff047442a27e06c249dd3d19ee1214f21160..4ea8bf88534cf1d90b691b5265101a9e235eaf5d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -79,7 +79,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
   private val executorIdToHost = new HashMap[String, String]
 
   // Listener object to pass upcalls into
-  var listener: TaskSchedulerListener = null
+  var dagScheduler: DAGScheduler = null
 
   var backend: SchedulerBackend = null
 
@@ -94,8 +94,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
   // This is a var so that we can reset it for testing purposes.
   private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
 
-  override def setListener(listener: TaskSchedulerListener) {
-    this.listener = listener
+  override def setDAGScheduler(dagScheduler: DAGScheduler) {
+    this.dagScheduler = dagScheduler
   }
 
   def initialize(context: SchedulerBackend) {
@@ -297,7 +297,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
     }
     // Update the DAGScheduler without holding a lock on this, since that can deadlock
     if (failedExecutor != None) {
-      listener.executorLost(failedExecutor.get)
+      dagScheduler.executorLost(failedExecutor.get)
       backend.reviveOffers()
     }
     if (taskFailed) {
@@ -397,9 +397,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
          logError("Lost an executor " + executorId + " (already removed): " + reason)
       }
     }
-    // Call listener.executorLost without holding the lock on this to prevent deadlock
+    // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
     if (failedExecutor != None) {
-      listener.executorLost(failedExecutor.get)
+      dagScheduler.executorLost(failedExecutor.get)
       backend.reviveOffers()
     }
   }
@@ -418,7 +418,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
   }
 
   def executorGained(execId: String, host: String) {
-    listener.executorGained(execId, host)
+    dagScheduler.executorGained(execId, host)
   }
 
   def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 7bd3499300e98ac5f15791b39496cb7742b36fa3..29093e3b4f511b1fe9d1e4fcadc9d20ac6e3d1bc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -415,11 +415,11 @@ private[spark] class ClusterTaskSetManager(
   }
 
   private def taskStarted(task: Task[_], info: TaskInfo) {
-    sched.listener.taskStarted(task, info)
+    sched.dagScheduler.taskStarted(task, info)
   }
 
   /**
-   * Marks the task as successful and notifies the listener that a task has ended.
+   * Marks the task as successful and notifies the DAGScheduler that a task has ended.
    */
   def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {
     val info = taskInfos(tid)
@@ -429,7 +429,7 @@ private[spark] class ClusterTaskSetManager(
     if (!successful(index)) {
       logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
         tid, info.duration, info.host, tasksSuccessful, numTasks))
-      sched.listener.taskEnded(
+      sched.dagScheduler.taskEnded(
         tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
 
       // Mark successful and stop if all the tasks have succeeded.
@@ -445,7 +445,8 @@ private[spark] class ClusterTaskSetManager(
   }
 
   /**
-   * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the listener.
+   * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
+   * DAG Scheduler.
    */
   def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) {
     val info = taskInfos(tid)
@@ -463,7 +464,7 @@ private[spark] class ClusterTaskSetManager(
       reason.foreach {
         case fetchFailed: FetchFailed =>
           logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)
-          sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
+          sched.dagScheduler.taskEnded(tasks(index), fetchFailed, null, null, info, null)
           successful(index) = true
           tasksSuccessful += 1
           sched.taskSetFinished(this)
@@ -472,11 +473,11 @@ private[spark] class ClusterTaskSetManager(
 
         case TaskKilled =>
           logWarning("Task %d was killed.".format(tid))
-          sched.listener.taskEnded(tasks(index), reason.get, null, null, info, null)
+          sched.dagScheduler.taskEnded(tasks(index), reason.get, null, null, info, null)
           return
 
         case ef: ExceptionFailure =>
-          sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
+          sched.dagScheduler.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
           val key = ef.description
           val now = clock.getTime()
           val (printFull, dupCount) = {
@@ -504,7 +505,7 @@ private[spark] class ClusterTaskSetManager(
 
         case TaskResultLost =>
           logWarning("Lost result for TID %s on host %s".format(tid, info.host))
-          sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
+          sched.dagScheduler.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
 
         case _ => {}
       }
@@ -533,7 +534,7 @@ private[spark] class ClusterTaskSetManager(
     failed = true
     causeOfFailure = message
     // TODO: Kill running tasks if we were not terminated due to a Mesos error
-    sched.listener.taskSetFailed(taskSet, message)
+    sched.dagScheduler.taskSetFailed(taskSet, message)
     removeAllRunningTasks()
     sched.taskSetFinished(this)
   }
@@ -606,7 +607,7 @@ private[spark] class ClusterTaskSetManager(
           addPendingTask(index)
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
-          sched.listener.taskEnded(tasks(index), Resubmitted, null, null, info, null)
+          sched.dagScheduler.taskEnded(tasks(index), Resubmitted, null, null, info, null)
         }
       }
     }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
similarity index 71%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
rename to core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 12b2fd01c0e02622fb2728e6c1dab1ce01ae50f7..a8230ec6bc7c6716f5f0eaf3d757ff3f6ada094f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -24,28 +24,28 @@ import org.apache.spark.scheduler.TaskDescription
 import org.apache.spark.util.{Utils, SerializableBuffer}
 
 
-private[spark] sealed trait StandaloneClusterMessage extends Serializable
+private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
 
-private[spark] object StandaloneClusterMessages {
+private[spark] object CoarseGrainedClusterMessages {
 
   // Driver to executors
-  case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
+  case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
 
-  case class KillTask(taskId: Long, executor: String) extends StandaloneClusterMessage
+  case class KillTask(taskId: Long, executor: String) extends CoarseGrainedClusterMessage
 
   case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
-    extends StandaloneClusterMessage
+    extends CoarseGrainedClusterMessage
 
-  case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
+  case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
 
   // Executors to driver
   case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
-    extends StandaloneClusterMessage {
+    extends CoarseGrainedClusterMessage {
     Utils.checkHostPort(hostPort, "Expected host port")
   }
 
   case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
-    data: SerializableBuffer) extends StandaloneClusterMessage
+    data: SerializableBuffer) extends CoarseGrainedClusterMessage
 
   object StatusUpdate {
     /** Alternate factory method that takes a ByteBuffer directly for the data field */
@@ -56,10 +56,10 @@ private[spark] object StandaloneClusterMessages {
   }
 
   // Internal messages in driver
-  case object ReviveOffers extends StandaloneClusterMessage
+  case object ReviveOffers extends CoarseGrainedClusterMessage
 
-  case object StopDriver extends StandaloneClusterMessage
+  case object StopDriver extends CoarseGrainedClusterMessage
 
-  case class RemoveExecutor(executorId: String, reason: String) extends StandaloneClusterMessage
+  case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
 
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
similarity index 89%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
rename to core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 08ee2182a2b91c080f0e1065534b0daefea22bc9..c0f1c6dbada264840e6edbae8c3f67fa591742b7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -30,16 +30,19 @@ import akka.util.duration._
 
 import org.apache.spark.{SparkException, Logging, TaskState}
 import org.apache.spark.scheduler.TaskDescription
-import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.Utils
 
 /**
- * A standalone scheduler backend, which waits for standalone executors to connect to it through
- * Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained
- * Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
+ * A scheduler backend that waits for coarse grained executors to connect to it through Akka.
+ * This backend holds onto each executor for the duration of the Spark job rather than relinquishing
+ * executors whenever a task is done and asking the scheduler to launch a new executor for
+ * each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the
+ * coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode
+ * (spark.deploy.*).
  */
 private[spark]
-class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
+class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
   extends SchedulerBackend with Logging
 {
   // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
@@ -162,7 +165,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
       }
     }
     driverActor = actorSystem.actorOf(
-      Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
+      Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
   }
 
   private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
@@ -202,6 +205,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
   }
 }
 
-private[spark] object StandaloneSchedulerBackend {
-  val ACTOR_NAME = "StandaloneScheduler"
+private[spark] object CoarseGrainedSchedulerBackend {
+  val ACTOR_NAME = "CoarseGrainedScheduler"
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index cb88159b8d3910bc688af411c8f4acb9b3bf97fa..cefa970bb92f97f60be78dab81ff0ac1ba8276e1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -28,7 +28,7 @@ private[spark] class SparkDeploySchedulerBackend(
     sc: SparkContext,
     masters: Array[String],
     appName: String)
-  extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
   with ClientListener
   with Logging {
 
@@ -44,10 +44,10 @@ private[spark] class SparkDeploySchedulerBackend(
     // The endpoint for executors to talk to us
     val driverUrl = "akka://spark@%s:%s/user/%s".format(
       System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
-      StandaloneSchedulerBackend.ACTOR_NAME)
+      CoarseGrainedSchedulerBackend.ACTOR_NAME)
     val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
     val command = Command(
-      "org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
+      "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
     val sparkHome = sc.getSparkHome().getOrElse(null)
     val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
         "http://" + sc.ui.appUIAddress)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
index feec8ecfe499b21808fbd02760cd7f5db675fac0..4312c46cc190c1279318942f0c150e743c36fe14 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
@@ -24,33 +24,16 @@ import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
 import org.apache.spark.serializer.SerializerInstance
+import org.apache.spark.util.Utils
 
 /**
  * Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
  */
 private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
   extends Logging {
-  private val MIN_THREADS = System.getProperty("spark.resultGetter.minThreads", "4").toInt
-  private val MAX_THREADS = System.getProperty("spark.resultGetter.maxThreads", "4").toInt
-  private val getTaskResultExecutor = new ThreadPoolExecutor(
-    MIN_THREADS,
-    MAX_THREADS,
-    0L,
-    TimeUnit.SECONDS,
-    new LinkedBlockingDeque[Runnable],
-    new ResultResolverThreadFactory)
-
-  class ResultResolverThreadFactory extends ThreadFactory {
-    private var counter = 0
-    private var PREFIX = "Result resolver thread"
-
-    override def newThread(r: Runnable): Thread = {
-      val thread = new Thread(r, "%s-%s".format(PREFIX, counter))
-      counter += 1
-      thread.setDaemon(true)
-      return thread
-    }
-  }
+  private val THREADS = System.getProperty("spark.resultGetter.threads", "4").toInt
+  private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
+    THREADS, "Result resolver thread")
 
   protected val serializer = new ThreadLocal[SerializerInstance] {
     override def initialValue(): SerializerInstance = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 8f2eef9a535fc062c68ab664721750317dad9ddc..300fe693f1c5556bf5fbb509f34f133d75639bdb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -30,13 +30,14 @@ import org.apache.mesos._
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
 
 import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
-import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
 
 /**
  * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
  * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
  * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
- * StandaloneBackend mechanism. This class is useful for lower and more predictable latency.
+ * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
+ * latency.
  *
  * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
  * remove this.
@@ -46,7 +47,7 @@ private[spark] class CoarseMesosSchedulerBackend(
     sc: SparkContext,
     master: String,
     appName: String)
-  extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
   with MScheduler
   with Logging {
 
@@ -122,20 +123,20 @@ private[spark] class CoarseMesosSchedulerBackend(
     val driverUrl = "akka://spark@%s:%s/user/%s".format(
       System.getProperty("spark.driver.host"),
       System.getProperty("spark.driver.port"),
-      StandaloneSchedulerBackend.ACTOR_NAME)
+      CoarseGrainedSchedulerBackend.ACTOR_NAME)
     val uri = System.getProperty("spark.executor.uri")
     if (uri == null) {
       val runScript = new File(sparkHome, "spark-class").getCanonicalPath
       command.setValue(
-        "\"%s\" org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+        "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
           runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
     } else {
       // Grab everything to the first '.'. We'll use that and '*' to
       // glob the directory "correctly".
       val basename = uri.split('/').last.split('.').head
       command.setValue(
-        "cd %s*; ./spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
-          basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+        "cd %s*; ./spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
+          .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
       command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
     }
     return command.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
index b445260d1b3f121ee359cf07e017735ba5b9a874..2699f0b33e8d302fa2acd2237245a6559fcdb3a7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
@@ -81,7 +81,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
 
   val env = SparkEnv.get
   val attemptId = new AtomicInteger
-  var listener: TaskSchedulerListener = null
+  var dagScheduler: DAGScheduler = null
 
   // Application dependencies (added through SparkContext) that we've fetched so far on this node.
   // Each map holds the master's timestamp for the version of that file or JAR we got.
@@ -114,8 +114,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
     localActor = env.actorSystem.actorOf(Props(new LocalActor(this, threads)), "Test")
   }
 
-  override def setListener(listener: TaskSchedulerListener) {
-    this.listener = listener
+  override def setDAGScheduler(dagScheduler: DAGScheduler) {
+    this.dagScheduler = dagScheduler
   }
 
   override def submitTasks(taskSet: TaskSet) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
index f72e77d40fc122890d643eb988fc6ae46d4e38a8..55f8313e8719ea203175d05433269d53f17dfe72 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
@@ -133,7 +133,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
   }
 
   def taskStarted(task: Task[_], info: TaskInfo) {
-    sched.listener.taskStarted(task, info)
+    sched.dagScheduler.taskStarted(task, info)
   }
 
   def taskEnded(tid: Long, state: TaskState, serializedData: ByteBuffer) {
@@ -148,7 +148,8 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
       }
     }
     result.metrics.resultSize = serializedData.limit()
-    sched.listener.taskEnded(task, Success, result.value, result.accumUpdates, info, result.metrics)
+    sched.dagScheduler.taskEnded(task, Success, result.value, result.accumUpdates, info,
+      result.metrics)
     numFinished += 1
     decreaseRunningTasks(1)
     finished(index) = true
@@ -165,7 +166,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
     decreaseRunningTasks(1)
     val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](
       serializedData, getClass.getClassLoader)
-    sched.listener.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
+    sched.dagScheduler.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
     if (!finished(index)) {
       copiesRunning(index) -= 1
       numFailures(index) += 1
@@ -176,7 +177,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
         val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(
           taskSet.id, index, 4, reason.description)
         decreaseRunningTasks(runningTasks)
-        sched.listener.taskSetFailed(taskSet, errorMessage)
+        sched.dagScheduler.taskSetFailed(taskSet, errorMessage)
         // need to delete failed Taskset from schedule queue
         sched.taskSetFinished(this)
       }
@@ -184,7 +185,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
   }
 
   override def error(message: String) {
-    sched.listener.taskSetFailed(taskSet, message)
+    sched.dagScheduler.taskSetFailed(taskSet, message)
     sched.taskSetFinished(this)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index f384875cc9af80889312ee61be325bc57ab9f7a3..a3b3968c5e451ce09c4794e916d6c3dc396c95da 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -447,14 +447,17 @@ private[spark] object Utils extends Logging {
     hostPortParseResults.get(hostPort)
   }
 
-  private[spark] val daemonThreadFactory: ThreadFactory =
-    new ThreadFactoryBuilder().setDaemon(true).build()
+  private val daemonThreadFactoryBuilder: ThreadFactoryBuilder =
+    new ThreadFactoryBuilder().setDaemon(true)
 
   /**
-   * Wrapper over newCachedThreadPool.
+   * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+   * unique, sequentially assigned integer.
    */
-  def newDaemonCachedThreadPool(): ThreadPoolExecutor =
-    Executors.newCachedThreadPool(daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
+  def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
+    val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
+    Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
+  }
 
   /**
    * Return the string to tell how long has passed in seconds. The passing parameter should be in
@@ -465,10 +468,13 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Wrapper over newFixedThreadPool.
+   * Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+   * unique, sequentially assigned integer.
    */
-  def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor =
-    Executors.newFixedThreadPool(nThreads, daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
+  def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
+    val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
+    Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
+  }
 
   private def listFilesSafely(file: File): Seq[File] = {
     val files = file.listFiles()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 838179c6b582ef56642e7e59a0eb9ab636a35643..2a2f828be69676986a188a2a120305ba6a57a611 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -60,7 +60,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
       taskSets += taskSet
     }
     override def cancelTasks(stageId: Int) {}
-    override def setListener(listener: TaskSchedulerListener) = {}
+    override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
     override def defaultParallelism() = 2
   }
 
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
index 80d0c5a5e929ac2c62c2eef41e70f75b9789d841..b97f2b19b581c3aa1f82899905bf564b5177afad 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -28,6 +28,30 @@ import org.apache.spark.executor.TaskMetrics
 import java.nio.ByteBuffer
 import org.apache.spark.util.{Utils, FakeClock}
 
+class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) {
+  override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
+    taskScheduler.startedTasks += taskInfo.index
+  }
+
+  override def taskEnded(
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: mutable.Map[Long, Any],
+      taskInfo: TaskInfo,
+      taskMetrics: TaskMetrics) {
+    taskScheduler.endedTasks(taskInfo.index) = reason
+  }
+
+  override def executorGained(execId: String, host: String) {}
+
+  override def executorLost(execId: String) {}
+
+  override def taskSetFailed(taskSet: TaskSet, reason: String) {
+    taskScheduler.taskSetsFailed += taskSet.id
+  }
+}
+
 /**
  * A mock ClusterScheduler implementation that just remembers information about tasks started and
  * feedback received from the TaskSetManagers. Note that it's important to initialize this with
@@ -44,30 +68,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
 
   val executors = new mutable.HashMap[String, String] ++ liveExecutors
 
-  listener = new TaskSchedulerListener {
-    def taskStarted(task: Task[_], taskInfo: TaskInfo) {
-      startedTasks += taskInfo.index
-    }
-
-    def taskEnded(
-        task: Task[_],
-        reason: TaskEndReason,
-        result: Any,
-        accumUpdates: mutable.Map[Long, Any],
-        taskInfo: TaskInfo,
-        taskMetrics: TaskMetrics)
-    {
-      endedTasks(taskInfo.index) = reason
-    }
-
-    def executorGained(execId: String, host: String) {}
-
-    def executorLost(execId: String) {}
-
-    def taskSetFailed(taskSet: TaskSet, reason: String) {
-      taskSetsFailed += taskSet.id
-    }
-  }
+  dagScheduler = new FakeDAGScheduler(this)
 
   def removeExecutor(execId: String): Unit = executors -= execId
 
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
index 884d6d6f3414c7c18d511b45e1c8ea6618fee8be..de70c50473558a5cf07df0eceacbef6f55fb785b 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
@@ -17,17 +17,19 @@
 
 package org.apache.spark.streaming.examples.clickstream
 
-import java.net.{InetAddress,ServerSocket,Socket,SocketException}
-import java.io.{InputStreamReader, BufferedReader, PrintWriter}
+import java.net.ServerSocket
+import java.io.PrintWriter
 import util.Random
 
 /** Represents a page view on a website with associated dimension data.*/
-class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) {
+class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int)
+    extends Serializable {
   override def toString() : String = {
     "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID)
   }
 }
-object PageView {
+
+object PageView extends Serializable {
   def fromString(in : String) : PageView = {
     val parts = in.split("\t")
     new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt)
@@ -39,6 +41,9 @@ object PageView {
   * This should be used in tandem with PageViewStream.scala. Example:
   * $ ./run-example spark.streaming.examples.clickstream.PageViewGenerator 44444 10
   * $ ./run-example spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+  *
+  * When running this, you may want to set the root logging level to ERROR in
+  * conf/log4j.properties to reduce the verbosity of the output.
   * */
 object PageViewGenerator {
   val pages = Map("http://foo.com/"        -> .7,
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index fee9d1c6b98782bbdf5db5e595e22d9825a39489..079e698ea0f44b50c4013ff7ae2eabad0d39b764 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -317,6 +317,7 @@ object SparkBuild extends Build {
     mergeStrategy in assembly := {
       case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
       case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
+      case "log4j.properties" => MergeStrategy.discard
       case "META-INF/services/org.apache.hadoop.fs.FileSystem" => MergeStrategy.concat
       case "reference.conf" => MergeStrategy.concat
       case _ => MergeStrategy.first
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 2d8f07262426f00373a7afc4ea0064887b60ff2f..bb9febad38d03abaea536b413a5f1405c19052af 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.Logging
 import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.MetadataCleaner
 
 
 private[streaming]
@@ -40,6 +41,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
   val checkpointDir = ssc.checkpointDir
   val checkpointDuration = ssc.checkpointDuration
   val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
+  val delaySeconds = MetadataCleaner.getDelaySeconds
 
   def validate() {
     assert(master != null, "Checkpoint.master is null")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 878725c705db71807c567c909b1b6ab0f48f7f94..098081d245605d2113bc027057a053dbbed4a365 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -100,6 +100,10 @@ class StreamingContext private (
       "both SparkContext and checkpoint as null")
   }
 
+  if(cp_ != null && cp_.delaySeconds >= 0 && MetadataCleaner.getDelaySeconds < 0) {
+    MetadataCleaner.setDelaySeconds(cp_.delaySeconds)
+  }
+
   if (MetadataCleaner.getDelaySeconds < 0) {
     throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; "
       + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 6d6ef149cc2d8b6351013bce769c0bf421cd9eab..25da9aa917d9510fbead2d7f34a42989144a741e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -22,7 +22,7 @@ import org.apache.spark.util.Utils
 import org.apache.spark.scheduler.SplitInfo
 import scala.collection
 import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container}
-import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
 import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
 import org.apache.hadoop.yarn.util.{RackResolver, Records}
 import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
@@ -211,7 +211,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
           val workerId = workerIdCounter.incrementAndGet().toString
           val driverUrl = "akka://spark@%s:%s/user/%s".format(
             System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
-            StandaloneSchedulerBackend.ACTOR_NAME)
+            CoarseGrainedSchedulerBackend.ACTOR_NAME)
 
           logInfo("launching container on " + containerId + " host " + workerHostname)
           // just to be safe, simply remove it from pendingReleaseContainers. Should not be there, but ..