diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a0f794edfdfcb73fcae7d4e2cc38690c7fc8d74c..ad3337d94c921621fc9e858d3d2066bfaf63717f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -56,10 +56,9 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
-  SparkDeploySchedulerBackend, ClusterScheduler, SimrSchedulerBackend}
+  SparkDeploySchedulerBackend, SimrSchedulerBackend}
 import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import org.apache.spark.scheduler.local.LocalScheduler
-import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.scheduler.local.LocalBackend
 import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType,
@@ -522,9 +521,7 @@ class SparkContext(
     }
     addedFiles(key) = System.currentTimeMillis
 
-    // Fetch the file locally in case a job is executed locally.
-    // Jobs that run through LocalScheduler will already fetch the required dependencies,
-    // but jobs run in DAGScheduler.runLocally() will not so we must fetch the files here.
+    // Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
     Utils.fetchFile(path, new File(SparkFiles.getRootDirectory))
 
     logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
@@ -1042,18 +1039,30 @@ object SparkContext {
     // Regular expression for connection to Simr cluster
     val SIMR_REGEX = """simr://(.*)""".r
 
+    // When running locally, don't try to re-execute tasks on failure.
+    val MAX_LOCAL_TASK_FAILURES = 1
+
     master match {
       case "local" =>
-        new LocalScheduler(1, 0, sc)
+        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
+        val backend = new LocalBackend(scheduler, 1)
+        scheduler.initialize(backend)
+        scheduler
 
       case LOCAL_N_REGEX(threads) =>
-        new LocalScheduler(threads.toInt, 0, sc)
+        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
+        val backend = new LocalBackend(scheduler, threads.toInt)
+        scheduler.initialize(backend)
+        scheduler
 
       case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
-        new LocalScheduler(threads.toInt, maxFailures.toInt, sc)
+        val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
+        val backend = new LocalBackend(scheduler, threads.toInt)
+        scheduler.initialize(backend)
+        scheduler
 
       case SPARK_REGEX(sparkUrl) =>
-        val scheduler = new ClusterScheduler(sc)
+        val scheduler = new TaskSchedulerImpl(sc)
         val masterUrls = sparkUrl.split(",").map("spark://" + _)
         val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, appName)
         scheduler.initialize(backend)
@@ -1068,7 +1077,7 @@ object SparkContext {
               memoryPerSlaveInt, SparkContext.executorMemoryRequested))
         }
 
-        val scheduler = new ClusterScheduler(sc)
+        val scheduler = new TaskSchedulerImpl(sc)
         val localCluster = new LocalSparkCluster(
           numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
         val masterUrls = localCluster.start()
@@ -1083,7 +1092,7 @@ object SparkContext {
         val scheduler = try {
           val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
           val cons = clazz.getConstructor(classOf[SparkContext])
-          cons.newInstance(sc).asInstanceOf[ClusterScheduler]
+          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
         } catch {
           // TODO: Enumerate the exact reasons why it can fail
           // But irrespective of it, it means we cannot proceed !
@@ -1099,7 +1108,7 @@ object SparkContext {
         val scheduler = try {
           val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
           val cons = clazz.getConstructor(classOf[SparkContext])
-          cons.newInstance(sc).asInstanceOf[ClusterScheduler]
+          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
 
         } catch {
           case th: Throwable => {
@@ -1109,7 +1118,7 @@ object SparkContext {
 
         val backend = try {
           val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
-          val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext])
+          val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
           cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
         } catch {
           case th: Throwable => {
@@ -1122,7 +1131,7 @@ object SparkContext {
 
       case mesosUrl @ MESOS_REGEX(_) =>
         MesosNativeLibrary.load()
-        val scheduler = new ClusterScheduler(sc)
+        val scheduler = new TaskSchedulerImpl(sc)
         val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
         val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
         val backend = if (coarseGrained) {
@@ -1134,7 +1143,7 @@ object SparkContext {
         scheduler
 
       case SIMR_REGEX(simrUrl) =>
-        val scheduler = new ClusterScheduler(sc)
+        val scheduler = new TaskSchedulerImpl(sc)
         val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
         scheduler.initialize(backend)
         scheduler
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
similarity index 96%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorLossReason.scala
rename to core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 5077b2b48b5749aaf0b6264ffc93767f5560a9db..2bc43a91864491ff8f06b8ef2a04ed51131734b8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 import org.apache.spark.executor.ExecutorExitCode
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
similarity index 90%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
rename to core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 5367218faa685cdbe15c79c6f00de8b7fbf35c21..89aa098664dae46e227ba833ff5264deae083660 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 import org.apache.spark.SparkContext
 
 /**
- * A backend interface for cluster scheduling systems that allows plugging in different ones under
+ * A backend interface for scheduling systems that allows plugging in different ones under
  * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
  * machines become available and can launch tasks on them.
  */
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
similarity index 92%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
rename to core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index e68c5277135f697bef29b24719f236aa904d7f04..89102720fa0fddb00e1dfdf2297ebe4a6ca1950a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -15,21 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 import java.nio.ByteBuffer
 import java.util.concurrent.{LinkedBlockingDeque, ThreadFactory, ThreadPoolExecutor, TimeUnit}
 
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
 import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.util.Utils
 
 /**
  * Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
  */
-private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
+private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
   extends Logging {
   private val THREADS = System.getProperty("spark.resultGetter.threads", "4").toInt
   private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
@@ -42,7 +41,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterSche
   }
 
   def enqueueSuccessfulTask(
-    taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) {
+    taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
     getTaskResultExecutor.execute(new Runnable {
       override def run() {
         try {
@@ -78,7 +77,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterSche
     })
   }
 
-  def enqueueFailedTask(taskSetManager: ClusterTaskSetManager, tid: Long, taskState: TaskState,
+  def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState,
     serializedData: ByteBuffer) {
     var reason: Option[TaskEndReason] = None
     getTaskResultExecutor.execute(new Runnable {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 10e047810827c974e7e9230bca60693c1e67cf40..17b6d97e90e0a85f1e16090b03d7546d3510dfeb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -20,11 +20,12 @@ package org.apache.spark.scheduler
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 
 /**
- * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
- * Each TaskScheduler schedulers task for a single SparkContext.
- * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
- * and are responsible for sending the tasks to the cluster, running them, retrying if there
- * are failures, and mitigating stragglers. They return events to the DAGScheduler.
+ * Low-level task scheduler interface, currently implemented exclusively by the ClusterScheduler.
+ * This interface allows plugging in different task schedulers. Each TaskScheduler schedulers tasks
+ * for a single SparkContext. These schedulers get sets of tasks submitted to them from the
+ * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running
+ * them, retrying if there are failures, and mitigating stragglers. They return events to the
+ * DAGScheduler.
  */
 private[spark] trait TaskScheduler {
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
similarity index 93%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
rename to core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 66ab8ea4cdf57a084e308554e2d870f8cb759c8c..dbac6b96ac7de8303e93666d4073563ad35f9b30 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicLong
@@ -28,16 +28,16 @@ import scala.concurrent.duration._
 
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 
 /**
- * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
- * initialize() and start(), then submit task sets through the runTasks method.
- *
- * This class can work with multiple types of clusters by acting through a SchedulerBackend.
+ * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
+ * It can also work with a local setup by using a LocalBackend and setting isLocal to true.
  * It handles common logic, like determining a scheduling order across jobs, waking up to launch
  * speculative tasks, etc.
+ * 
+ * Clients should first call initialize() and start(), then submit task sets through the
+ * runTasks method.
  *
  * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
  * threads, so it needs locks in public API methods to maintain its state. In addition, some
@@ -45,19 +45,21 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
  * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
  * we are holding a lock on ourselves.
  */
-private[spark] class ClusterScheduler(val sc: SparkContext)
-  extends TaskScheduler
-  with Logging
-{
+private[spark] class TaskSchedulerImpl(
+    val sc: SparkContext,
+    val maxTaskFailures : Int = System.getProperty("spark.task.maxFailures", "4").toInt,
+    isLocal: Boolean = false)
+  extends TaskScheduler with Logging {
+
   // How often to check for speculative tasks
   val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
 
   // Threshold above which we warn user initial TaskSet may be starved
   val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
 
-  // ClusterTaskSetManagers are not thread safe, so any access to one should be synchronized
+  // TaskSetManagers are not thread safe, so any access to one should be synchronized
   // on this class.
-  val activeTaskSets = new HashMap[String, ClusterTaskSetManager]
+  val activeTaskSets = new HashMap[String, TaskSetManager]
 
   val taskIdToTaskSetId = new HashMap[Long, String]
   val taskIdToExecutorId = new HashMap[Long, String]
@@ -119,7 +121,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
   override def start() {
     backend.start()
 
-    if (System.getProperty("spark.speculation", "false").toBoolean) {
+    if (!isLocal && System.getProperty("spark.speculation", "false").toBoolean) {
       logInfo("Starting speculative execution thread")
       import sc.env.actorSystem.dispatcher
       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
@@ -133,12 +135,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
     val tasks = taskSet.tasks
     logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
     this.synchronized {
-      val manager = new ClusterTaskSetManager(this, taskSet)
+      val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
       activeTaskSets(taskSet.id) = manager
       schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
       taskSetTaskIds(taskSet.id) = new HashSet[Long]()
 
-      if (!hasReceivedTask) {
+      if (!isLocal && !hasReceivedTask) {
         starvationTimer.scheduleAtFixedRate(new TimerTask() {
           override def run() {
             if (!hasLaunchedTask) {
@@ -292,19 +294,19 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
     }
   }
 
-  def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: Long) {
+  def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
     taskSetManager.handleTaskGettingResult(tid)
   }
 
   def handleSuccessfulTask(
-    taskSetManager: ClusterTaskSetManager,
+    taskSetManager: TaskSetManager,
     tid: Long,
     taskResult: DirectTaskResult[_]) = synchronized {
     taskSetManager.handleSuccessfulTask(tid, taskResult)
   }
 
   def handleFailedTask(
-    taskSetManager: ClusterTaskSetManager,
+    taskSetManager: TaskSetManager,
     tid: Long,
     taskState: TaskState,
     reason: Option[TaskEndReason]) = synchronized {
@@ -352,7 +354,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 
   override def defaultParallelism() = backend.defaultParallelism()
 
-
   // Check for speculatable tasks in all our active jobs.
   def checkSpeculatableTasks() {
     var shouldRevive = false
@@ -429,7 +430,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 }
 
 
-object ClusterScheduler {
+private[spark] object TaskSchedulerImpl {
   /**
    * Used to balance containers across hosts.
    *
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 90f6bcefac0bfab0e29196c963f3d9ef3ac29fa2..c676e73e033d6f2fb2b65ab5602a6b043e1db634 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -17,32 +17,706 @@
 
 package org.apache.spark.scheduler
 
-import java.nio.ByteBuffer
+import java.io.NotSerializableException
+import java.util.Arrays
 
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+import scala.math.max
+import scala.math.min
+
+import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, SparkEnv,
+  Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
 import org.apache.spark.TaskState.TaskState
+import org.apache.spark.util.{Clock, SystemClock}
+
 
 /**
- * Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of
- * each task and is responsible for retries on failure and locality. The main interfaces to it
- * are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, and
- * statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
+ * Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of
+ * each task, retries tasks if they fail (up to a limited number of times), and
+ * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
+ * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
+ * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
+ *
+ * THREADING: This class is designed to only be called from code with a lock on the
+ * TaskScheduler (e.g. its event handlers). It should not be called from other threads.
  *
- * THREADING: This class is designed to only be called from code with a lock on the TaskScheduler
- * (e.g. its event handlers). It should not be called from other threads.
+ * @param sched           the ClusterScheduler associated with the TaskSetManager
+ * @param taskSet         the TaskSet to manage scheduling for
+ * @param maxTaskFailures if any particular task fails more than this number of times, the entire
+ *                        task set will be aborted
  */
-private[spark] trait TaskSetManager extends Schedulable {
-  def schedulableQueue = null
-  
-  def schedulingMode = SchedulingMode.NONE
-  
-  def taskSet: TaskSet
+private[spark] class TaskSetManager(
+    sched: TaskSchedulerImpl,
+    val taskSet: TaskSet,
+    val maxTaskFailures: Int,
+    clock: Clock = SystemClock)
+  extends Schedulable with Logging
+{
+  // CPUs to request per task
+  val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
+
+  // Quantile of tasks at which to start speculation
+  val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
+  val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
+
+  // Serializer for closures and tasks.
+  val env = SparkEnv.get
+  val ser = env.closureSerializer.newInstance()
+
+  val tasks = taskSet.tasks
+  val numTasks = tasks.length
+  val copiesRunning = new Array[Int](numTasks)
+  val successful = new Array[Boolean](numTasks)
+  val numFailures = new Array[Int](numTasks)
+  val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
+  var tasksSuccessful = 0
+
+  var weight = 1
+  var minShare = 0
+  var priority = taskSet.priority
+  var stageId = taskSet.stageId
+  var name = "TaskSet_"+taskSet.stageId.toString
+  var parent: Pool = null
+
+  var runningTasks = 0
+  private val runningTasksSet = new HashSet[Long]
+
+  // Set of pending tasks for each executor. These collections are actually
+  // treated as stacks, in which new tasks are added to the end of the
+  // ArrayBuffer and removed from the end. This makes it faster to detect
+  // tasks that repeatedly fail because whenever a task failed, it is put
+  // back at the head of the stack. They are also only cleaned up lazily;
+  // when a task is launched, it remains in all the pending lists except
+  // the one that it was launched from, but gets removed from them later.
+  private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
+
+  // Set of pending tasks for each host. Similar to pendingTasksForExecutor,
+  // but at host level.
+  private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
+
+  // Set of pending tasks for each rack -- similar to the above.
+  private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
+
+  // Set containing pending tasks with no locality preferences.
+  val pendingTasksWithNoPrefs = new ArrayBuffer[Int]
+
+  // Set containing all pending tasks (also used as a stack, as above).
+  val allPendingTasks = new ArrayBuffer[Int]
+
+  // Tasks that can be speculated. Since these will be a small fraction of total
+  // tasks, we'll just hold them in a HashSet.
+  val speculatableTasks = new HashSet[Int]
+
+  // Task index, start and finish time for each task attempt (indexed by task ID)
+  val taskInfos = new HashMap[Long, TaskInfo]
+
+  // Did the TaskSet fail?
+  var failed = false
+  var causeOfFailure = ""
+
+  // How frequently to reprint duplicate exceptions in full, in milliseconds
+  val EXCEPTION_PRINT_INTERVAL =
+    System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
+
+  // Map of recent exceptions (identified by string representation and top stack frame) to
+  // duplicate count (how many times the same exception has appeared) and time the full exception
+  // was printed. This should ideally be an LRU map that can drop old exceptions automatically.
+  val recentExceptions = HashMap[String, (Int, Long)]()
+
+  // Figure out the current map output tracker epoch and set it on all tasks
+  val epoch = sched.mapOutputTracker.getEpoch
+  logDebug("Epoch for " + taskSet + ": " + epoch)
+  for (t <- tasks) {
+    t.epoch = epoch
+  }
+
+  // Add all our tasks to the pending lists. We do this in reverse order
+  // of task index so that tasks with low indices get launched first.
+  for (i <- (0 until numTasks).reverse) {
+    addPendingTask(i)
+  }
+
+  // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
+  val myLocalityLevels = computeValidLocalityLevels()
+  val localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
+
+  // Delay scheduling variables: we keep track of our current locality level and the time we
+  // last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
+  // We then move down if we manage to launch a "more local" task.
+  var currentLocalityIndex = 0    // Index of our current locality level in validLocalityLevels
+  var lastLaunchTime = clock.getTime()  // Time we last launched a task at this level
+
+  override def schedulableQueue = null
+
+  override def schedulingMode = SchedulingMode.NONE
+
+  /**
+   * Add a task to all the pending-task lists that it should be on. If readding is set, we are
+   * re-adding the task so only include it in each list if it's not already there.
+   */
+  private def addPendingTask(index: Int, readding: Boolean = false) {
+    // Utility method that adds `index` to a list only if readding=false or it's not already there
+    def addTo(list: ArrayBuffer[Int]) {
+      if (!readding || !list.contains(index)) {
+        list += index
+      }
+    }
+
+    var hadAliveLocations = false
+    for (loc <- tasks(index).preferredLocations) {
+      for (execId <- loc.executorId) {
+        if (sched.isExecutorAlive(execId)) {
+          addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
+          hadAliveLocations = true
+        }
+      }
+      if (sched.hasExecutorsAliveOnHost(loc.host)) {
+        addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
+        for (rack <- sched.getRackForHost(loc.host)) {
+          addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
+        }
+        hadAliveLocations = true
+      }
+    }
+
+    if (!hadAliveLocations) {
+      // Even though the task might've had preferred locations, all of those hosts or executors
+      // are dead; put it in the no-prefs list so we can schedule it elsewhere right away.
+      addTo(pendingTasksWithNoPrefs)
+    }
+
+    if (!readding) {
+      allPendingTasks += index  // No point scanning this whole list to find the old task there
+    }
+  }
+
+  /**
+   * Return the pending tasks list for a given executor ID, or an empty list if
+   * there is no map entry for that host
+   */
+  private def getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int] = {
+    pendingTasksForExecutor.getOrElse(executorId, ArrayBuffer())
+  }
+
+  /**
+   * Return the pending tasks list for a given host, or an empty list if
+   * there is no map entry for that host
+   */
+  private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
+    pendingTasksForHost.getOrElse(host, ArrayBuffer())
+  }
+
+  /**
+   * Return the pending rack-local task list for a given rack, or an empty list if
+   * there is no map entry for that rack
+   */
+  private def getPendingTasksForRack(rack: String): ArrayBuffer[Int] = {
+    pendingTasksForRack.getOrElse(rack, ArrayBuffer())
+  }
+
+  /**
+   * Dequeue a pending task from the given list and return its index.
+   * Return None if the list is empty.
+   * This method also cleans up any tasks in the list that have already
+   * been launched, since we want that to happen lazily.
+   */
+  private def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
+    while (!list.isEmpty) {
+      val index = list.last
+      list.trimEnd(1)
+      if (copiesRunning(index) == 0 && !successful(index)) {
+        return Some(index)
+      }
+    }
+    return None
+  }
+
+  /** Check whether a task is currently running an attempt on a given host */
+  private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = {
+    !taskAttempts(taskIndex).exists(_.host == host)
+  }
+
+  /**
+   * Return a speculative task for a given executor if any are available. The task should not have
+   * an attempt running on this host, in case the host is slow. In addition, the task should meet
+   * the given locality constraint.
+   */
+  private def findSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
+    : Option[(Int, TaskLocality.Value)] =
+  {
+    speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set
+
+    if (!speculatableTasks.isEmpty) {
+      // Check for process-local or preference-less tasks; note that tasks can be process-local
+      // on multiple nodes when we replicate cached blocks, as in Spark Streaming
+      for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+        val prefs = tasks(index).preferredLocations
+        val executors = prefs.flatMap(_.executorId)
+        if (prefs.size == 0 || executors.contains(execId)) {
+          speculatableTasks -= index
+          return Some((index, TaskLocality.PROCESS_LOCAL))
+        }
+      }
+
+      // Check for node-local tasks
+      if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
+        for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+          val locations = tasks(index).preferredLocations.map(_.host)
+          if (locations.contains(host)) {
+            speculatableTasks -= index
+            return Some((index, TaskLocality.NODE_LOCAL))
+          }
+        }
+      }
+
+      // Check for rack-local tasks
+      if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
+        for (rack <- sched.getRackForHost(host)) {
+          for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+            val racks = tasks(index).preferredLocations.map(_.host).map(sched.getRackForHost)
+            if (racks.contains(rack)) {
+              speculatableTasks -= index
+              return Some((index, TaskLocality.RACK_LOCAL))
+            }
+          }
+        }
+      }
 
+      // Check for non-local tasks
+      if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
+        for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+          speculatableTasks -= index
+          return Some((index, TaskLocality.ANY))
+        }
+      }
+    }
+
+    return None
+  }
+
+  /**
+   * Dequeue a pending task for a given node and return its index and locality level.
+   * Only search for tasks matching the given locality constraint.
+   */
+  private def findTask(execId: String, host: String, locality: TaskLocality.Value)
+    : Option[(Int, TaskLocality.Value)] =
+  {
+    for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) {
+      return Some((index, TaskLocality.PROCESS_LOCAL))
+    }
+
+    if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
+      for (index <- findTaskFromList(getPendingTasksForHost(host))) {
+        return Some((index, TaskLocality.NODE_LOCAL))
+      }
+    }
+
+    if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
+      for {
+        rack <- sched.getRackForHost(host)
+        index <- findTaskFromList(getPendingTasksForRack(rack))
+      } {
+        return Some((index, TaskLocality.RACK_LOCAL))
+      }
+    }
+
+    // Look for no-pref tasks after rack-local tasks since they can run anywhere.
+    for (index <- findTaskFromList(pendingTasksWithNoPrefs)) {
+      return Some((index, TaskLocality.PROCESS_LOCAL))
+    }
+
+    if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
+      for (index <- findTaskFromList(allPendingTasks)) {
+        return Some((index, TaskLocality.ANY))
+      }
+    }
+
+    // Finally, if all else has failed, find a speculative task
+    return findSpeculativeTask(execId, host, locality)
+  }
+
+  /**
+   * Respond to an offer of a single executor from the scheduler by finding a task
+   */
   def resourceOffer(
       execId: String,
       host: String,
       availableCpus: Int,
       maxLocality: TaskLocality.TaskLocality)
-    : Option[TaskDescription]
+    : Option[TaskDescription] =
+  {
+    if (tasksSuccessful < numTasks && availableCpus >= CPUS_PER_TASK) {
+      val curTime = clock.getTime()
+
+      var allowedLocality = getAllowedLocalityLevel(curTime)
+      if (allowedLocality > maxLocality) {
+        allowedLocality = maxLocality   // We're not allowed to search for farther-away tasks
+      }
+
+      findTask(execId, host, allowedLocality) match {
+        case Some((index, taskLocality)) => {
+          // Found a task; do some bookkeeping and return a task description
+          val task = tasks(index)
+          val taskId = sched.newTaskId()
+          // Figure out whether this should count as a preferred launch
+          logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format(
+            taskSet.id, index, taskId, execId, host, taskLocality))
+          // Do various bookkeeping
+          copiesRunning(index) += 1
+          val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)
+          taskInfos(taskId) = info
+          taskAttempts(index) = info :: taskAttempts(index)
+          // Update our locality level for delay scheduling
+          currentLocalityIndex = getLocalityIndex(taskLocality)
+          lastLaunchTime = curTime
+          // Serialize and return the task
+          val startTime = clock.getTime()
+          // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
+          // we assume the task can be serialized without exceptions.
+          val serializedTask = Task.serializeWithDependencies(
+            task, sched.sc.addedFiles, sched.sc.addedJars, ser)
+          val timeTaken = clock.getTime() - startTime
+          addRunningTask(taskId)
+          logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
+            taskSet.id, index, serializedTask.limit, timeTaken))
+          val taskName = "task %s:%d".format(taskSet.id, index)
+          if (taskAttempts(index).size == 1)
+            taskStarted(task,info)
+          return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
+        }
+        case _ =>
+      }
+    }
+    return None
+  }
+
+  /**
+   * Get the level we can launch tasks according to delay scheduling, based on current wait time.
+   */
+  private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
+    while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&
+        currentLocalityIndex < myLocalityLevels.length - 1)
+    {
+      // Jump to the next locality level, and remove our waiting time for the current one since
+      // we don't want to count it again on the next one
+      lastLaunchTime += localityWaits(currentLocalityIndex)
+      currentLocalityIndex += 1
+    }
+    myLocalityLevels(currentLocalityIndex)
+  }
+
+  /**
+   * Find the index in myLocalityLevels for a given locality. This is also designed to work with
+   * localities that are not in myLocalityLevels (in case we somehow get those) by returning the
+   * next-biggest level we have. Uses the fact that the last value in myLocalityLevels is ANY.
+   */
+  def getLocalityIndex(locality: TaskLocality.TaskLocality): Int = {
+    var index = 0
+    while (locality > myLocalityLevels(index)) {
+      index += 1
+    }
+    index
+  }
+
+  private def taskStarted(task: Task[_], info: TaskInfo) {
+    sched.dagScheduler.taskStarted(task, info)
+  }
+
+  def handleTaskGettingResult(tid: Long) = {
+    val info = taskInfos(tid)
+    info.markGettingResult()
+    sched.dagScheduler.taskGettingResult(tasks(info.index), info)
+  }
+
+  /**
+   * Marks the task as successful and notifies the DAGScheduler that a task has ended.
+   */
+  def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {
+    val info = taskInfos(tid)
+    val index = info.index
+    info.markSuccessful()
+    removeRunningTask(tid)
+    if (!successful(index)) {
+      logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
+        tid, info.duration, info.host, tasksSuccessful, numTasks))
+      sched.dagScheduler.taskEnded(
+        tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
+
+      // Mark successful and stop if all the tasks have succeeded.
+      tasksSuccessful += 1
+      successful(index) = true
+      if (tasksSuccessful == numTasks) {
+        sched.taskSetFinished(this)
+      }
+    } else {
+      logInfo("Ignorning task-finished event for TID " + tid + " because task " +
+        index + " has already completed successfully")
+    }
+  }
+
+  /**
+   * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
+   * DAG Scheduler.
+   */
+  def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) {
+    val info = taskInfos(tid)
+    if (info.failed) {
+      return
+    }
+    removeRunningTask(tid)
+    val index = info.index
+    info.markFailed()
+    var failureReason = "unknown"
+    if (!successful(index)) {
+      logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
+      copiesRunning(index) -= 1
+      // Check if the problem is a map output fetch failure. In that case, this
+      // task will never succeed on any node, so tell the scheduler about it.
+      reason.foreach {
+        case fetchFailed: FetchFailed =>
+          logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)
+          sched.dagScheduler.taskEnded(tasks(index), fetchFailed, null, null, info, null)
+          successful(index) = true
+          tasksSuccessful += 1
+          sched.taskSetFinished(this)
+          removeAllRunningTasks()
+          return
+
+        case TaskKilled =>
+          logWarning("Task %d was killed.".format(tid))
+          sched.dagScheduler.taskEnded(tasks(index), reason.get, null, null, info, null)
+          return
+
+        case ef: ExceptionFailure =>
+          sched.dagScheduler.taskEnded(
+            tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
+          if (ef.className == classOf[NotSerializableException].getName()) {
+            // If the task result wasn't rerializable, there's no point in trying to re-execute it.
+            logError("Task %s:%s had a not serializable result: %s; not retrying".format(
+              taskSet.id, index, ef.description))
+            abort("Task %s:%s had a not serializable result: %s".format(
+              taskSet.id, index, ef.description))
+            return
+          }
+          val key = ef.description
+          failureReason = "Exception failure: %s".format(ef.description)
+          val now = clock.getTime()
+          val (printFull, dupCount) = {
+            if (recentExceptions.contains(key)) {
+              val (dupCount, printTime) = recentExceptions(key)
+              if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
+                recentExceptions(key) = (0, now)
+                (true, 0)
+              } else {
+                recentExceptions(key) = (dupCount + 1, printTime)
+                (false, dupCount + 1)
+              }
+            } else {
+              recentExceptions(key) = (0, now)
+              (true, 0)
+            }
+          }
+          if (printFull) {
+            val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
+            logWarning("Loss was due to %s\n%s\n%s".format(
+              ef.className, ef.description, locs.mkString("\n")))
+          } else {
+            logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
+          }
+
+        case TaskResultLost =>
+          failureReason = "Lost result for TID %s on host %s".format(tid, info.host)
+          logWarning(failureReason)
+          sched.dagScheduler.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
+
+        case _ => {}
+      }
+      // On non-fetch failures, re-enqueue the task as pending for a max number of retries
+      addPendingTask(index)
+      if (state != TaskState.KILLED) {
+        numFailures(index) += 1
+        if (numFailures(index) >= maxTaskFailures) {
+          logError("Task %s:%d failed %d times; aborting job".format(
+            taskSet.id, index, maxTaskFailures))
+          abort("Task %s:%d failed %d times (most recent failure: %s)".format(
+            taskSet.id, index, maxTaskFailures, failureReason))
+        }
+      }
+    } else {
+      logInfo("Ignoring task-lost event for TID " + tid +
+        " because task " + index + " is already finished")
+    }
+  }
+
+  def error(message: String) {
+    // Save the error message
+    abort("Error: " + message)
+  }
+
+  def abort(message: String) {
+    failed = true
+    causeOfFailure = message
+    // TODO: Kill running tasks if we were not terminated due to a Mesos error
+    sched.dagScheduler.taskSetFailed(taskSet, message)
+    removeAllRunningTasks()
+    sched.taskSetFinished(this)
+  }
+
+  /** If the given task ID is not in the set of running tasks, adds it.
+   *
+   * Used to keep track of the number of running tasks, for enforcing scheduling policies.
+   */
+  def addRunningTask(tid: Long) {
+    if (runningTasksSet.add(tid) && parent != null) {
+      parent.increaseRunningTasks(1)
+    }
+    runningTasks = runningTasksSet.size
+  }
+
+  /** If the given task ID is in the set of running tasks, removes it. */
+  def removeRunningTask(tid: Long) {
+    if (runningTasksSet.remove(tid) && parent != null) {
+      parent.decreaseRunningTasks(1)
+    }
+    runningTasks = runningTasksSet.size
+  }
+
+  private[scheduler] def removeAllRunningTasks() {
+    val numRunningTasks = runningTasksSet.size
+    runningTasksSet.clear()
+    if (parent != null) {
+      parent.decreaseRunningTasks(numRunningTasks)
+    }
+    runningTasks = 0
+  }
+
+  override def getSchedulableByName(name: String): Schedulable = {
+    return null
+  }
+
+  override def addSchedulable(schedulable: Schedulable) {}
+
+  override def removeSchedulable(schedulable: Schedulable) {}
+
+  override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
+    var sortedTaskSetQueue = ArrayBuffer[TaskSetManager](this)
+    sortedTaskSetQueue += this
+    return sortedTaskSetQueue
+  }
+
+  /** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */
+  override def executorLost(execId: String, host: String) {
+    logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
+
+    // Re-enqueue pending tasks for this host based on the status of the cluster -- for example, a
+    // task that used to have locations on only this host might now go to the no-prefs list. Note
+    // that it's okay if we add a task to the same queue twice (if it had multiple preferred
+    // locations), because findTaskFromList will skip already-running tasks.
+    for (index <- getPendingTasksForExecutor(execId)) {
+      addPendingTask(index, readding=true)
+    }
+    for (index <- getPendingTasksForHost(host)) {
+      addPendingTask(index, readding=true)
+    }
+
+    // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
+    if (tasks(0).isInstanceOf[ShuffleMapTask]) {
+      for ((tid, info) <- taskInfos if info.executorId == execId) {
+        val index = taskInfos(tid).index
+        if (successful(index)) {
+          successful(index) = false
+          copiesRunning(index) -= 1
+          tasksSuccessful -= 1
+          addPendingTask(index)
+          // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
+          // stage finishes when a total of tasks.size tasks finish.
+          sched.dagScheduler.taskEnded(tasks(index), Resubmitted, null, null, info, null)
+        }
+      }
+    }
+    // Also re-enqueue any tasks that were running on the node
+    for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
+      handleFailedTask(tid, TaskState.KILLED, None)
+    }
+  }
+
+  /**
+   * Check for tasks to be speculated and return true if there are any. This is called periodically
+   * by the TaskScheduler.
+   *
+   * TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that
+   * we don't scan the whole task set. It might also help to make this sorted by launch time.
+   */
+  override def checkSpeculatableTasks(): Boolean = {
+    // Can't speculate if we only have one task, or if all tasks have finished.
+    if (numTasks == 1 || tasksSuccessful == numTasks) {
+      return false
+    }
+    var foundTasks = false
+    val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
+    logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
+    if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
+      val time = clock.getTime()
+      val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
+      Arrays.sort(durations)
+      val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1))
+      val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100)
+      // TODO: Threshold should also look at standard deviation of task durations and have a lower
+      // bound based on that.
+      logDebug("Task length threshold for speculation: " + threshold)
+      for ((tid, info) <- taskInfos) {
+        val index = info.index
+        if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
+          !speculatableTasks.contains(index)) {
+          logInfo(
+            "Marking task %s:%d (on %s) as speculatable because it ran more than %.0f ms".format(
+              taskSet.id, index, info.host, threshold))
+          speculatableTasks += index
+          foundTasks = true
+        }
+      }
+    }
+    return foundTasks
+  }
+
+  override def hasPendingTasks(): Boolean = {
+    numTasks > 0 && tasksSuccessful < numTasks
+  }
+
+  private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
+    val defaultWait = System.getProperty("spark.locality.wait", "3000")
+    level match {
+      case TaskLocality.PROCESS_LOCAL =>
+        System.getProperty("spark.locality.wait.process", defaultWait).toLong
+      case TaskLocality.NODE_LOCAL =>
+        System.getProperty("spark.locality.wait.node", defaultWait).toLong
+      case TaskLocality.RACK_LOCAL =>
+        System.getProperty("spark.locality.wait.rack", defaultWait).toLong
+      case TaskLocality.ANY =>
+        0L
+    }
+  }
 
-  def error(message: String)
+  /**
+   * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
+   * added to queues using addPendingTask.
+   */
+  private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
+    import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
+    val levels = new ArrayBuffer[TaskLocality.TaskLocality]
+    if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0) {
+      levels += PROCESS_LOCAL
+    }
+    if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0) {
+      levels += NODE_LOCAL
+    }
+    if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) {
+      levels += RACK_LOCAL
+    }
+    levels += ANY
+    logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
+    levels.toArray
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
similarity index 95%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/WorkerOffer.scala
rename to core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
index 938f62883a104df5e60b11333e939ef20293792e..ba6bab3f91a65eb0bf972c2368c2df61bac0462b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/WorkerOffer.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 /**
  * Represents free resources available on an executor.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
deleted file mode 100644
index bf494aa64dc197c92daf8340e2118d520441a44a..0000000000000000000000000000000000000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ /dev/null
@@ -1,713 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import java.io.NotSerializableException
-import java.util.Arrays
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.math.max
-import scala.math.min
-
-import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, SparkEnv,
-  Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
-import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler._
-import org.apache.spark.util.{SystemClock, Clock}
-
-
-/**
- * Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of
- * the status of each task, retries tasks if they fail (up to a limited number of times), and
- * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
- * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
- * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
- *
- * THREADING: This class is designed to only be called from code with a lock on the
- * ClusterScheduler (e.g. its event handlers). It should not be called from other threads.
- */
-private[spark] class ClusterTaskSetManager(
-    sched: ClusterScheduler,
-    val taskSet: TaskSet,
-    clock: Clock = SystemClock)
-  extends TaskSetManager
-  with Logging
-{
-  // CPUs to request per task
-  val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
-
-  // Maximum times a task is allowed to fail before failing the job
-  val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
-
-  // Quantile of tasks at which to start speculation
-  val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
-  val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
-
-  // Serializer for closures and tasks.
-  val env = SparkEnv.get
-  val ser = env.closureSerializer.newInstance()
-
-  val tasks = taskSet.tasks
-  val numTasks = tasks.length
-  val copiesRunning = new Array[Int](numTasks)
-  val successful = new Array[Boolean](numTasks)
-  val numFailures = new Array[Int](numTasks)
-  val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
-  var tasksSuccessful = 0
-
-  var weight = 1
-  var minShare = 0
-  var priority = taskSet.priority
-  var stageId = taskSet.stageId
-  var name = "TaskSet_"+taskSet.stageId.toString
-  var parent: Pool = null
-
-  var runningTasks = 0
-  private val runningTasksSet = new HashSet[Long]
-
-  // Set of pending tasks for each executor. These collections are actually
-  // treated as stacks, in which new tasks are added to the end of the
-  // ArrayBuffer and removed from the end. This makes it faster to detect
-  // tasks that repeatedly fail because whenever a task failed, it is put
-  // back at the head of the stack. They are also only cleaned up lazily;
-  // when a task is launched, it remains in all the pending lists except
-  // the one that it was launched from, but gets removed from them later.
-  private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
-
-  // Set of pending tasks for each host. Similar to pendingTasksForExecutor,
-  // but at host level.
-  private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
-
-  // Set of pending tasks for each rack -- similar to the above.
-  private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
-
-  // Set containing pending tasks with no locality preferences.
-  val pendingTasksWithNoPrefs = new ArrayBuffer[Int]
-
-  // Set containing all pending tasks (also used as a stack, as above).
-  val allPendingTasks = new ArrayBuffer[Int]
-
-  // Tasks that can be speculated. Since these will be a small fraction of total
-  // tasks, we'll just hold them in a HashSet.
-  val speculatableTasks = new HashSet[Int]
-
-  // Task index, start and finish time for each task attempt (indexed by task ID)
-  val taskInfos = new HashMap[Long, TaskInfo]
-
-  // Did the TaskSet fail?
-  var failed = false
-  var causeOfFailure = ""
-
-  // How frequently to reprint duplicate exceptions in full, in milliseconds
-  val EXCEPTION_PRINT_INTERVAL =
-    System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
-
-  // Map of recent exceptions (identified by string representation and top stack frame) to
-  // duplicate count (how many times the same exception has appeared) and time the full exception
-  // was printed. This should ideally be an LRU map that can drop old exceptions automatically.
-  val recentExceptions = HashMap[String, (Int, Long)]()
-
-  // Figure out the current map output tracker epoch and set it on all tasks
-  val epoch = sched.mapOutputTracker.getEpoch
-  logDebug("Epoch for " + taskSet + ": " + epoch)
-  for (t <- tasks) {
-    t.epoch = epoch
-  }
-
-  // Add all our tasks to the pending lists. We do this in reverse order
-  // of task index so that tasks with low indices get launched first.
-  for (i <- (0 until numTasks).reverse) {
-    addPendingTask(i)
-  }
-
-  // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
-  val myLocalityLevels = computeValidLocalityLevels()
-  val localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
-
-  // Delay scheduling variables: we keep track of our current locality level and the time we
-  // last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
-  // We then move down if we manage to launch a "more local" task.
-  var currentLocalityIndex = 0    // Index of our current locality level in validLocalityLevels
-  var lastLaunchTime = clock.getTime()  // Time we last launched a task at this level
-
-  /**
-   * Add a task to all the pending-task lists that it should be on. If readding is set, we are
-   * re-adding the task so only include it in each list if it's not already there.
-   */
-  private def addPendingTask(index: Int, readding: Boolean = false) {
-    // Utility method that adds `index` to a list only if readding=false or it's not already there
-    def addTo(list: ArrayBuffer[Int]) {
-      if (!readding || !list.contains(index)) {
-        list += index
-      }
-    }
-
-    var hadAliveLocations = false
-    for (loc <- tasks(index).preferredLocations) {
-      for (execId <- loc.executorId) {
-        if (sched.isExecutorAlive(execId)) {
-          addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
-          hadAliveLocations = true
-        }
-      }
-      if (sched.hasExecutorsAliveOnHost(loc.host)) {
-        addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
-        for (rack <- sched.getRackForHost(loc.host)) {
-          addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
-        }
-        hadAliveLocations = true
-      }
-    }
-
-    if (!hadAliveLocations) {
-      // Even though the task might've had preferred locations, all of those hosts or executors
-      // are dead; put it in the no-prefs list so we can schedule it elsewhere right away.
-      addTo(pendingTasksWithNoPrefs)
-    }
-
-    if (!readding) {
-      allPendingTasks += index  // No point scanning this whole list to find the old task there
-    }
-  }
-
-  /**
-   * Return the pending tasks list for a given executor ID, or an empty list if
-   * there is no map entry for that host
-   */
-  private def getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int] = {
-    pendingTasksForExecutor.getOrElse(executorId, ArrayBuffer())
-  }
-
-  /**
-   * Return the pending tasks list for a given host, or an empty list if
-   * there is no map entry for that host
-   */
-  private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
-    pendingTasksForHost.getOrElse(host, ArrayBuffer())
-  }
-
-  /**
-   * Return the pending rack-local task list for a given rack, or an empty list if
-   * there is no map entry for that rack
-   */
-  private def getPendingTasksForRack(rack: String): ArrayBuffer[Int] = {
-    pendingTasksForRack.getOrElse(rack, ArrayBuffer())
-  }
-
-  /**
-   * Dequeue a pending task from the given list and return its index.
-   * Return None if the list is empty.
-   * This method also cleans up any tasks in the list that have already
-   * been launched, since we want that to happen lazily.
-   */
-  private def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
-    while (!list.isEmpty) {
-      val index = list.last
-      list.trimEnd(1)
-      if (copiesRunning(index) == 0 && !successful(index)) {
-        return Some(index)
-      }
-    }
-    return None
-  }
-
-  /** Check whether a task is currently running an attempt on a given host */
-  private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = {
-    !taskAttempts(taskIndex).exists(_.host == host)
-  }
-
-  /**
-   * Return a speculative task for a given executor if any are available. The task should not have
-   * an attempt running on this host, in case the host is slow. In addition, the task should meet
-   * the given locality constraint.
-   */
-  private def findSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
-    : Option[(Int, TaskLocality.Value)] =
-  {
-    speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set
-
-    if (!speculatableTasks.isEmpty) {
-      // Check for process-local or preference-less tasks; note that tasks can be process-local
-      // on multiple nodes when we replicate cached blocks, as in Spark Streaming
-      for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
-        val prefs = tasks(index).preferredLocations
-        val executors = prefs.flatMap(_.executorId)
-        if (prefs.size == 0 || executors.contains(execId)) {
-          speculatableTasks -= index
-          return Some((index, TaskLocality.PROCESS_LOCAL))
-        }
-      }
-
-      // Check for node-local tasks
-      if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
-        for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
-          val locations = tasks(index).preferredLocations.map(_.host)
-          if (locations.contains(host)) {
-            speculatableTasks -= index
-            return Some((index, TaskLocality.NODE_LOCAL))
-          }
-        }
-      }
-
-      // Check for rack-local tasks
-      if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
-        for (rack <- sched.getRackForHost(host)) {
-          for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
-            val racks = tasks(index).preferredLocations.map(_.host).map(sched.getRackForHost)
-            if (racks.contains(rack)) {
-              speculatableTasks -= index
-              return Some((index, TaskLocality.RACK_LOCAL))
-            }
-          }
-        }
-      }
-
-      // Check for non-local tasks
-      if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
-        for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
-          speculatableTasks -= index
-          return Some((index, TaskLocality.ANY))
-        }
-      }
-    }
-
-    return None
-  }
-
-  /**
-   * Dequeue a pending task for a given node and return its index and locality level.
-   * Only search for tasks matching the given locality constraint.
-   */
-  private def findTask(execId: String, host: String, locality: TaskLocality.Value)
-    : Option[(Int, TaskLocality.Value)] =
-  {
-    for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) {
-      return Some((index, TaskLocality.PROCESS_LOCAL))
-    }
-
-    if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
-      for (index <- findTaskFromList(getPendingTasksForHost(host))) {
-        return Some((index, TaskLocality.NODE_LOCAL))
-      }
-    }
-
-    if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
-      for {
-        rack <- sched.getRackForHost(host)
-        index <- findTaskFromList(getPendingTasksForRack(rack))
-      } {
-        return Some((index, TaskLocality.RACK_LOCAL))
-      }
-    }
-
-    // Look for no-pref tasks after rack-local tasks since they can run anywhere.
-    for (index <- findTaskFromList(pendingTasksWithNoPrefs)) {
-      return Some((index, TaskLocality.PROCESS_LOCAL))
-    }
-
-    if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
-      for (index <- findTaskFromList(allPendingTasks)) {
-        return Some((index, TaskLocality.ANY))
-      }
-    }
-
-    // Finally, if all else has failed, find a speculative task
-    return findSpeculativeTask(execId, host, locality)
-  }
-
-  /**
-   * Respond to an offer of a single executor from the scheduler by finding a task
-   */
-  override def resourceOffer(
-      execId: String,
-      host: String,
-      availableCpus: Int,
-      maxLocality: TaskLocality.TaskLocality)
-    : Option[TaskDescription] =
-  {
-    if (tasksSuccessful < numTasks && availableCpus >= CPUS_PER_TASK) {
-      val curTime = clock.getTime()
-
-      var allowedLocality = getAllowedLocalityLevel(curTime)
-      if (allowedLocality > maxLocality) {
-        allowedLocality = maxLocality   // We're not allowed to search for farther-away tasks
-      }
-
-      findTask(execId, host, allowedLocality) match {
-        case Some((index, taskLocality)) => {
-          // Found a task; do some bookkeeping and return a task description
-          val task = tasks(index)
-          val taskId = sched.newTaskId()
-          // Figure out whether this should count as a preferred launch
-          logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format(
-            taskSet.id, index, taskId, execId, host, taskLocality))
-          // Do various bookkeeping
-          copiesRunning(index) += 1
-          val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)
-          taskInfos(taskId) = info
-          taskAttempts(index) = info :: taskAttempts(index)
-          // Update our locality level for delay scheduling
-          currentLocalityIndex = getLocalityIndex(taskLocality)
-          lastLaunchTime = curTime
-          // Serialize and return the task
-          val startTime = clock.getTime()
-          // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
-          // we assume the task can be serialized without exceptions.
-          val serializedTask = Task.serializeWithDependencies(
-            task, sched.sc.addedFiles, sched.sc.addedJars, ser)
-          val timeTaken = clock.getTime() - startTime
-          addRunningTask(taskId)
-          logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
-            taskSet.id, index, serializedTask.limit, timeTaken))
-          val taskName = "task %s:%d".format(taskSet.id, index)
-          info.serializedSize = serializedTask.limit
-          if (taskAttempts(index).size == 1)
-            taskStarted(task,info)
-          return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
-        }
-        case _ =>
-      }
-    }
-    return None
-  }
-
-  /**
-   * Get the level we can launch tasks according to delay scheduling, based on current wait time.
-   */
-  private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
-    while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&
-        currentLocalityIndex < myLocalityLevels.length - 1)
-    {
-      // Jump to the next locality level, and remove our waiting time for the current one since
-      // we don't want to count it again on the next one
-      lastLaunchTime += localityWaits(currentLocalityIndex)
-      currentLocalityIndex += 1
-    }
-    myLocalityLevels(currentLocalityIndex)
-  }
-
-  /**
-   * Find the index in myLocalityLevels for a given locality. This is also designed to work with
-   * localities that are not in myLocalityLevels (in case we somehow get those) by returning the
-   * next-biggest level we have. Uses the fact that the last value in myLocalityLevels is ANY.
-   */
-  def getLocalityIndex(locality: TaskLocality.TaskLocality): Int = {
-    var index = 0
-    while (locality > myLocalityLevels(index)) {
-      index += 1
-    }
-    index
-  }
-
-  private def taskStarted(task: Task[_], info: TaskInfo) {
-    sched.dagScheduler.taskStarted(task, info)
-  }
-
-  def handleTaskGettingResult(tid: Long) = {
-    val info = taskInfos(tid)
-    info.markGettingResult()
-    sched.dagScheduler.taskGettingResult(tasks(info.index), info)
-  }
-
-  /**
-   * Marks the task as successful and notifies the DAGScheduler that a task has ended.
-   */
-  def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {
-    val info = taskInfos(tid)
-    val index = info.index
-    info.markSuccessful()
-    removeRunningTask(tid)
-    if (!successful(index)) {
-      logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
-        tid, info.duration, info.host, tasksSuccessful, numTasks))
-      sched.dagScheduler.taskEnded(
-        tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
-
-      // Mark successful and stop if all the tasks have succeeded.
-      tasksSuccessful += 1
-      successful(index) = true
-      if (tasksSuccessful == numTasks) {
-        sched.taskSetFinished(this)
-      }
-    } else {
-      logInfo("Ignorning task-finished event for TID " + tid + " because task " +
-        index + " has already completed successfully")
-    }
-  }
-
-  /**
-   * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
-   * DAG Scheduler.
-   */
-  def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) {
-    val info = taskInfos(tid)
-    if (info.failed) {
-      return
-    }
-    removeRunningTask(tid)
-    val index = info.index
-    info.markFailed()
-    if (!successful(index)) {
-      logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
-      copiesRunning(index) -= 1
-      // Check if the problem is a map output fetch failure. In that case, this
-      // task will never succeed on any node, so tell the scheduler about it.
-      reason.foreach {
-        case fetchFailed: FetchFailed =>
-          logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)
-          sched.dagScheduler.taskEnded(tasks(index), fetchFailed, null, null, info, null)
-          successful(index) = true
-          tasksSuccessful += 1
-          sched.taskSetFinished(this)
-          removeAllRunningTasks()
-          return
-
-        case TaskKilled =>
-          logWarning("Task %d was killed.".format(tid))
-          sched.dagScheduler.taskEnded(tasks(index), reason.get, null, null, info, null)
-          return
-
-        case ef: ExceptionFailure =>
-          sched.dagScheduler.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
-          if (ef.className == classOf[NotSerializableException].getName()) {
-            // If the task result wasn't serializable, there's no point in trying to re-execute it.
-            logError("Task %s:%s had a not serializable result: %s; not retrying".format(
-              taskSet.id, index, ef.description))
-            abort("Task %s:%s had a not serializable result: %s".format(
-              taskSet.id, index, ef.description))
-            return
-          }
-          val key = ef.description
-          val now = clock.getTime()
-          val (printFull, dupCount) = {
-            if (recentExceptions.contains(key)) {
-              val (dupCount, printTime) = recentExceptions(key)
-              if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
-                recentExceptions(key) = (0, now)
-                (true, 0)
-              } else {
-                recentExceptions(key) = (dupCount + 1, printTime)
-                (false, dupCount + 1)
-              }
-            } else {
-              recentExceptions(key) = (0, now)
-              (true, 0)
-            }
-          }
-          if (printFull) {
-            val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
-            logWarning("Loss was due to %s\n%s\n%s".format(
-              ef.className, ef.description, locs.mkString("\n")))
-          } else {
-            logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
-          }
-
-        case TaskResultLost =>
-          logWarning("Lost result for TID %s on host %s".format(tid, info.host))
-          sched.dagScheduler.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
-
-        case _ => {}
-      }
-      // On non-fetch failures, re-enqueue the task as pending for a max number of retries
-      addPendingTask(index)
-      if (state != TaskState.KILLED) {
-        numFailures(index) += 1
-        if (numFailures(index) >= MAX_TASK_FAILURES) {
-          logError("Task %s:%d failed %d times; aborting job".format(
-            taskSet.id, index, MAX_TASK_FAILURES))
-          abort("Task %s:%d failed %d times".format(taskSet.id, index, MAX_TASK_FAILURES))
-        }
-      }
-    } else {
-      logInfo("Ignoring task-lost event for TID " + tid +
-        " because task " + index + " is already finished")
-    }
-  }
-
-  override def error(message: String) {
-    // Save the error message
-    abort("Error: " + message)
-  }
-
-  def abort(message: String) {
-    failed = true
-    causeOfFailure = message
-    // TODO: Kill running tasks if we were not terminated due to a Mesos error
-    sched.dagScheduler.taskSetFailed(taskSet, message)
-    removeAllRunningTasks()
-    sched.taskSetFinished(this)
-  }
-
-  /** If the given task ID is not in the set of running tasks, adds it.
-   *
-   * Used to keep track of the number of running tasks, for enforcing scheduling policies.
-   */
-  def addRunningTask(tid: Long) {
-    if (runningTasksSet.add(tid) && parent != null) {
-      parent.increaseRunningTasks(1)
-    }
-    runningTasks = runningTasksSet.size
-  }
-
-  /** If the given task ID is in the set of running tasks, removes it. */
-  def removeRunningTask(tid: Long) {
-    if (runningTasksSet.remove(tid) && parent != null) {
-      parent.decreaseRunningTasks(1)
-    }
-    runningTasks = runningTasksSet.size
-  }
-
-  private[cluster] def removeAllRunningTasks() {
-    val numRunningTasks = runningTasksSet.size
-    runningTasksSet.clear()
-    if (parent != null) {
-      parent.decreaseRunningTasks(numRunningTasks)
-    }
-    runningTasks = 0
-  }
-
-  override def getSchedulableByName(name: String): Schedulable = {
-    return null
-  }
-
-  override def addSchedulable(schedulable: Schedulable) {}
-
-  override def removeSchedulable(schedulable: Schedulable) {}
-
-  override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
-    var sortedTaskSetQueue = ArrayBuffer[TaskSetManager](this)
-    sortedTaskSetQueue += this
-    return sortedTaskSetQueue
-  }
-
-  /** Called by cluster scheduler when an executor is lost so we can re-enqueue our tasks */
-  override def executorLost(execId: String, host: String) {
-    logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
-
-    // Re-enqueue pending tasks for this host based on the status of the cluster -- for example, a
-    // task that used to have locations on only this host might now go to the no-prefs list. Note
-    // that it's okay if we add a task to the same queue twice (if it had multiple preferred
-    // locations), because findTaskFromList will skip already-running tasks.
-    for (index <- getPendingTasksForExecutor(execId)) {
-      addPendingTask(index, readding=true)
-    }
-    for (index <- getPendingTasksForHost(host)) {
-      addPendingTask(index, readding=true)
-    }
-
-    // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
-    if (tasks(0).isInstanceOf[ShuffleMapTask]) {
-      for ((tid, info) <- taskInfos if info.executorId == execId) {
-        val index = taskInfos(tid).index
-        if (successful(index)) {
-          successful(index) = false
-          copiesRunning(index) -= 1
-          tasksSuccessful -= 1
-          addPendingTask(index)
-          // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
-          // stage finishes when a total of tasks.size tasks finish.
-          sched.dagScheduler.taskEnded(tasks(index), Resubmitted, null, null, info, null)
-        }
-      }
-    }
-    // Also re-enqueue any tasks that were running on the node
-    for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
-      handleFailedTask(tid, TaskState.KILLED, None)
-    }
-  }
-
-  /**
-   * Check for tasks to be speculated and return true if there are any. This is called periodically
-   * by the ClusterScheduler.
-   *
-   * TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that
-   * we don't scan the whole task set. It might also help to make this sorted by launch time.
-   */
-  override def checkSpeculatableTasks(): Boolean = {
-    // Can't speculate if we only have one task, or if all tasks have finished.
-    if (numTasks == 1 || tasksSuccessful == numTasks) {
-      return false
-    }
-    var foundTasks = false
-    val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
-    logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
-    if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
-      val time = clock.getTime()
-      val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
-      Arrays.sort(durations)
-      val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1))
-      val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100)
-      // TODO: Threshold should also look at standard deviation of task durations and have a lower
-      // bound based on that.
-      logDebug("Task length threshold for speculation: " + threshold)
-      for ((tid, info) <- taskInfos) {
-        val index = info.index
-        if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
-          !speculatableTasks.contains(index)) {
-          logInfo(
-            "Marking task %s:%d (on %s) as speculatable because it ran more than %.0f ms".format(
-              taskSet.id, index, info.host, threshold))
-          speculatableTasks += index
-          foundTasks = true
-        }
-      }
-    }
-    return foundTasks
-  }
-
-  override def hasPendingTasks(): Boolean = {
-    numTasks > 0 && tasksSuccessful < numTasks
-  }
-
-  private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
-    val defaultWait = System.getProperty("spark.locality.wait", "3000")
-    level match {
-      case TaskLocality.PROCESS_LOCAL =>
-        System.getProperty("spark.locality.wait.process", defaultWait).toLong
-      case TaskLocality.NODE_LOCAL =>
-        System.getProperty("spark.locality.wait.node", defaultWait).toLong
-      case TaskLocality.RACK_LOCAL =>
-        System.getProperty("spark.locality.wait.rack", defaultWait).toLong
-      case TaskLocality.ANY =>
-        0L
-    }
-  }
-
-  /**
-   * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
-   * added to queues using addPendingTask.
-   */
-  private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
-    import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
-    val levels = new ArrayBuffer[TaskLocality.TaskLocality]
-    if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0) {
-      levels += PROCESS_LOCAL
-    }
-    if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0) {
-      levels += NODE_LOCAL
-    }
-    if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) {
-      levels += RACK_LOCAL
-    }
-    levels += ANY
-    logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
-    levels.toArray
-  }
-}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7e22c843bf05e21d02386e1321c440bbf5324dfb..5c534a6f436f87de32cb4511e807e3ee6aa2ae4f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,8 +27,10 @@ import akka.actor._
 import akka.pattern.ask
 import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 
+import org.apache.spark.{SparkException, Logging, TaskState}
 import org.apache.spark.{Logging, SparkException, TaskState}
-import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.{TaskSchedulerImpl, SchedulerBackend, SlaveLost, TaskDescription,
+  WorkerOffer}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{AkkaUtils, Utils}
 
@@ -41,7 +43,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
  * (spark.deploy.*).
  */
 private[spark]
-class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
+class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem)
   extends SchedulerBackend with Logging
 {
   // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index e8fecec4a64f30cb2e78d49ced075f4a30dd28d7..ec3e68e9709378fd4ce5f45c668f90250aebff31 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -19,10 +19,12 @@ package org.apache.spark.scheduler.cluster
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, FileSystem}
+
 import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.scheduler.TaskSchedulerImpl
 
 private[spark] class SimrSchedulerBackend(
-    scheduler: ClusterScheduler,
+    scheduler: TaskSchedulerImpl,
     sc: SparkContext,
     driverFilePath: String)
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 7127a72d6d1e5ac6a3ce16e86a97a5a8e63099c6..404ce7a452c60ec552e483d471b2f9d633f26995 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -17,14 +17,16 @@
 
 package org.apache.spark.scheduler.cluster
 
+import scala.collection.mutable.HashMap
+
 import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.deploy.client.{Client, ClientListener}
 import org.apache.spark.deploy.{Command, ApplicationDescription}
-import scala.collection.mutable.HashMap
+import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
 import org.apache.spark.util.Utils
 
 private[spark] class SparkDeploySchedulerBackend(
-    scheduler: ClusterScheduler,
+    scheduler: TaskSchedulerImpl,
     sc: SparkContext,
     masters: Array[String],
     appName: String)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 84fe3094cc7bab94b50f45bd69f22cc54ab6138f..39573fc8c9b5138c7d72154527bf3d4c0fea772f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -30,7 +30,8 @@ import org.apache.mesos._
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
 
 import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
-import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 
 /**
  * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -43,7 +44,7 @@ import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedu
  * remove this.
  */
 private[spark] class CoarseMesosSchedulerBackend(
-    scheduler: ClusterScheduler,
+    scheduler: TaskSchedulerImpl,
     sc: SparkContext,
     master: String,
     appName: String)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 50cbc2ca92e7e5435cf86673827af165dd29f24f..6aa788c4609cdfceb1fec08c412e38cd507cf0f6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -30,9 +30,8 @@ import org.apache.mesos._
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
 
 import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
-import org.apache.spark.scheduler.TaskDescription
-import org.apache.spark.scheduler.cluster.{ClusterScheduler, ExecutorExited, ExecutorLossReason}
-import org.apache.spark.scheduler.cluster.{SchedulerBackend, SlaveLost, WorkerOffer}
+import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost,
+  TaskDescription, TaskSchedulerImpl, WorkerOffer}
 import org.apache.spark.util.Utils
 
 /**
@@ -41,7 +40,7 @@ import org.apache.spark.util.Utils
  * from multiple apps can run on different cores) and in time (a core can switch ownership).
  */
 private[spark] class MesosSchedulerBackend(
-    scheduler: ClusterScheduler,
+    scheduler: TaskSchedulerImpl,
     sc: SparkContext,
     master: String,
     appName: String)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
new file mode 100644
index 0000000000000000000000000000000000000000..4edc6a0d3f2a06a17a0768e90b3a69bfd7d15348
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.local
+
+import java.nio.ByteBuffer
+
+import akka.actor.{Actor, ActorRef, Props}
+
+import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
+import org.apache.spark.TaskState.TaskState
+import org.apache.spark.executor.{Executor, ExecutorBackend}
+import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
+
+private case class ReviveOffers()
+
+private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
+
+private case class KillTask(taskId: Long)
+
+/**
+ * Calls to LocalBackend are all serialized through LocalActor. Using an actor makes the calls on
+ * LocalBackend asynchronous, which is necessary to prevent deadlock between LocalBackend
+ * and the ClusterScheduler.
+ */
+private[spark] class LocalActor(
+  scheduler: TaskSchedulerImpl,
+  executorBackend: LocalBackend,
+  private val totalCores: Int) extends Actor with Logging {
+
+  private var freeCores = totalCores
+
+  private val localExecutorId = "localhost"
+  private val localExecutorHostname = "localhost"
+
+  val executor = new Executor(localExecutorId, localExecutorHostname, Seq.empty, isLocal = true)
+
+  def receive = {
+    case ReviveOffers =>
+      reviveOffers()
+
+    case StatusUpdate(taskId, state, serializedData) =>
+      scheduler.statusUpdate(taskId, state, serializedData)
+      if (TaskState.isFinished(state)) {
+        freeCores += 1
+        reviveOffers()
+      }
+
+    case KillTask(taskId) =>
+      executor.killTask(taskId)
+  }
+
+  def reviveOffers() {
+    val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
+    for (task <- scheduler.resourceOffers(offers).flatten) {
+      freeCores -= 1
+      executor.launchTask(executorBackend, task.taskId, task.serializedTask)
+    }
+  }
+}
+
+/**
+ * LocalBackend is used when running a local version of Spark where the executor, backend, and
+ * master all run in the same JVM. It sits behind a ClusterScheduler and handles launching tasks
+ * on a single Executor (created by the LocalBackend) running locally.
+ */
+private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)
+  extends SchedulerBackend with ExecutorBackend {
+
+  var localActor: ActorRef = null
+
+  override def start() {
+    localActor = SparkEnv.get.actorSystem.actorOf(
+      Props(new LocalActor(scheduler, this, totalCores)),
+      "LocalBackendActor")
+  }
+
+  override def stop() {
+  }
+
+  override def reviveOffers() {
+    localActor ! ReviveOffers
+  }
+
+  override def defaultParallelism() = totalCores
+
+  override def killTask(taskId: Long, executorId: String) {
+    localActor ! KillTask(taskId)
+  }
+
+  override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
+    localActor ! StatusUpdate(taskId, state, serializedData)
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
deleted file mode 100644
index 01e95162c0f70b2d5fd9305799c6ff774d998258..0000000000000000000000000000000000000000
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.local
-
-import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-
-import akka.actor._
-
-import org.apache.spark._
-import org.apache.spark.TaskState.TaskState
-import org.apache.spark.executor.{Executor, ExecutorBackend}
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
-
-
-/**
- * A FIFO or Fair TaskScheduler implementation that runs tasks locally in a thread pool. Optionally
- * the scheduler also allows each task to fail up to maxFailures times, which is useful for
- * testing fault recovery.
- */
-
-private[local]
-case class LocalReviveOffers()
-
-private[local]
-case class LocalStatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
-
-private[local]
-case class KillTask(taskId: Long)
-
-private[spark]
-class LocalActor(localScheduler: LocalScheduler, private var freeCores: Int)
-  extends Actor with Logging {
-
-  val executor = new Executor("localhost", "localhost", Seq.empty, isLocal = true)
-
-  def receive = {
-    case LocalReviveOffers =>
-      launchTask(localScheduler.resourceOffer(freeCores))
-
-    case LocalStatusUpdate(taskId, state, serializeData) =>
-      if (TaskState.isFinished(state)) {
-        freeCores += 1
-        launchTask(localScheduler.resourceOffer(freeCores))
-      }
-
-    case KillTask(taskId) =>
-      executor.killTask(taskId)
-  }
-
-  private def launchTask(tasks: Seq[TaskDescription]) {
-    for (task <- tasks) {
-      freeCores -= 1
-      executor.launchTask(localScheduler, task.taskId, task.serializedTask)
-    }
-  }
-}
-
-private[spark] class LocalScheduler(val threads: Int, val maxFailures: Int, val sc: SparkContext)
-  extends TaskScheduler
-  with ExecutorBackend
-  with Logging {
-
-  val env = SparkEnv.get
-  val attemptId = new AtomicInteger
-  var dagScheduler: DAGScheduler = null
-
-  // Application dependencies (added through SparkContext) that we've fetched so far on this node.
-  // Each map holds the master's timestamp for the version of that file or JAR we got.
-  val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
-  val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
-
-  var schedulableBuilder: SchedulableBuilder = null
-  var rootPool: Pool = null
-  val schedulingMode: SchedulingMode = SchedulingMode.withName(
-    System.getProperty("spark.scheduler.mode", "FIFO"))
-  val activeTaskSets = new HashMap[String, LocalTaskSetManager]
-  val taskIdToTaskSetId = new HashMap[Long, String]
-  val taskSetTaskIds = new HashMap[String, HashSet[Long]]
-
-  var localActor: ActorRef = null
-
-  override def start() {
-    // temporarily set rootPool name to empty
-    rootPool = new Pool("", schedulingMode, 0, 0)
-    schedulableBuilder = {
-      schedulingMode match {
-        case SchedulingMode.FIFO =>
-          new FIFOSchedulableBuilder(rootPool)
-        case SchedulingMode.FAIR =>
-          new FairSchedulableBuilder(rootPool)
-      }
-    }
-    schedulableBuilder.buildPools()
-
-    localActor = env.actorSystem.actorOf(Props(new LocalActor(this, threads)), "Test")
-  }
-
-  override def setDAGScheduler(dagScheduler: DAGScheduler) {
-    this.dagScheduler = dagScheduler
-  }
-
-  override def submitTasks(taskSet: TaskSet) {
-    synchronized {
-      val manager = new LocalTaskSetManager(this, taskSet)
-      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
-      activeTaskSets(taskSet.id) = manager
-      taskSetTaskIds(taskSet.id) = new HashSet[Long]()
-      localActor ! LocalReviveOffers
-    }
-  }
-
-  override def cancelTasks(stageId: Int): Unit = synchronized {
-    logInfo("Cancelling stage " + stageId)
-    logInfo("Cancelling stage " + activeTaskSets.map(_._2.stageId))
-    activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) =>
-      // There are two possible cases here:
-      // 1. The task set manager has been created and some tasks have been scheduled.
-      //    In this case, send a kill signal to the executors to kill the task and then abort
-      //    the stage.
-      // 2. The task set manager has been created but no tasks has been scheduled. In this case,
-      //    simply abort the stage.
-      val taskIds = taskSetTaskIds(tsm.taskSet.id)
-      if (taskIds.size > 0) {
-        taskIds.foreach { tid =>
-          localActor ! KillTask(tid)
-        }
-      }
-      logInfo("Stage %d was cancelled".format(stageId))
-      taskSetFinished(tsm)
-    }
-  }
-
-  def resourceOffer(freeCores: Int): Seq[TaskDescription] = {
-    synchronized {
-      var freeCpuCores = freeCores
-      val tasks = new ArrayBuffer[TaskDescription](freeCores)
-      val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
-      for (manager <- sortedTaskSetQueue) {
-        logDebug("parentName:%s,name:%s,runningTasks:%s".format(
-          manager.parent.name, manager.name, manager.runningTasks))
-      }
-
-      var launchTask = false
-      for (manager <- sortedTaskSetQueue) {
-        do {
-          launchTask = false
-          manager.resourceOffer(null, null, freeCpuCores, null) match {
-            case Some(task) =>
-              tasks += task
-              taskIdToTaskSetId(task.taskId) = manager.taskSet.id
-              taskSetTaskIds(manager.taskSet.id) += task.taskId
-              freeCpuCores -= 1
-              launchTask = true
-            case None => {}
-          }
-        } while(launchTask)
-      }
-      return tasks
-    }
-  }
-
-  def taskSetFinished(manager: TaskSetManager) {
-    synchronized {
-      activeTaskSets -= manager.taskSet.id
-      manager.parent.removeSchedulable(manager)
-      logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name))
-      taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
-      taskSetTaskIds -= manager.taskSet.id
-    }
-  }
-
-  override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
-    if (TaskState.isFinished(state)) {
-      synchronized {
-        taskIdToTaskSetId.get(taskId) match {
-          case Some(taskSetId) =>
-            val taskSetManager = activeTaskSets.get(taskSetId)
-            taskSetManager.foreach { tsm =>
-              taskSetTaskIds(taskSetId) -= taskId
-
-              state match {
-                case TaskState.FINISHED =>
-                  tsm.taskEnded(taskId, state, serializedData)
-                case TaskState.FAILED =>
-                  tsm.taskFailed(taskId, state, serializedData)
-                case TaskState.KILLED =>
-                  tsm.error("Task %d was killed".format(taskId))
-                case _ => {}
-              }
-            }
-          case None =>
-            logInfo("Ignoring update from TID " + taskId + " because its task set is gone")
-        }
-      }
-      localActor ! LocalStatusUpdate(taskId, state, serializedData)
-    }
-  }
-
-  override def stop() {
-  }
-
-  override def defaultParallelism() = threads
-}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
deleted file mode 100644
index 53bf78267e02668f48cf0c2239c8c2313ad6d2ca..0000000000000000000000000000000000000000
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.local
-
-import java.nio.ByteBuffer
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-
-import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, SparkException, Success, TaskState}
-import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Pool, Schedulable, Task,
-  TaskDescription, TaskInfo, TaskLocality, TaskResult, TaskSet, TaskSetManager}
-
-
-private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet)
-  extends TaskSetManager with Logging {
-
-  var parent: Pool = null
-  var weight: Int = 1
-  var minShare: Int = 0
-  var runningTasks: Int = 0
-  var priority: Int = taskSet.priority
-  var stageId: Int = taskSet.stageId
-  var name: String = "TaskSet_" + taskSet.stageId.toString
-
-  var failCount = new Array[Int](taskSet.tasks.size)
-  val taskInfos = new HashMap[Long, TaskInfo]
-  val numTasks = taskSet.tasks.size
-  var numFinished = 0
-  val env = SparkEnv.get
-  val ser = env.closureSerializer.newInstance()
-  val copiesRunning = new Array[Int](numTasks)
-  val finished = new Array[Boolean](numTasks)
-  val numFailures = new Array[Int](numTasks)
-  val MAX_TASK_FAILURES = sched.maxFailures
-
-  def increaseRunningTasks(taskNum: Int): Unit = {
-    runningTasks += taskNum
-    if (parent != null) {
-     parent.increaseRunningTasks(taskNum)
-    }
-  }
-
-  def decreaseRunningTasks(taskNum: Int): Unit = {
-    runningTasks -= taskNum
-    if (parent != null) {
-      parent.decreaseRunningTasks(taskNum)
-    }
-  }
-
-  override def addSchedulable(schedulable: Schedulable): Unit = {
-    // nothing
-  }
-
-  override def removeSchedulable(schedulable: Schedulable): Unit = {
-    // nothing
-  }
-
-  override def getSchedulableByName(name: String): Schedulable = {
-    return null
-  }
-
-  override def executorLost(executorId: String, host: String): Unit = {
-    // nothing
-  }
-
-  override def checkSpeculatableTasks() = true
-
-  override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
-    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
-    sortedTaskSetQueue += this
-    return sortedTaskSetQueue
-  }
-
-  override def hasPendingTasks() = true
-
-  def findTask(): Option[Int] = {
-    for (i <- 0 to numTasks-1) {
-      if (copiesRunning(i) == 0 && !finished(i)) {
-        return Some(i)
-      }
-    }
-    return None
-  }
-
-  override def resourceOffer(
-      execId: String,
-      host: String,
-      availableCpus: Int,
-      maxLocality: TaskLocality.TaskLocality)
-    : Option[TaskDescription] =
-  {
-    SparkEnv.set(sched.env)
-    logDebug("availableCpus:%d, numFinished:%d, numTasks:%d".format(
-      availableCpus.toInt, numFinished, numTasks))
-    if (availableCpus > 0 && numFinished < numTasks) {
-      findTask() match {
-        case Some(index) =>
-          val taskId = sched.attemptId.getAndIncrement()
-          val task = taskSet.tasks(index)
-          val info = new TaskInfo(taskId, index, System.currentTimeMillis(), "local", "local:1",
-            TaskLocality.NODE_LOCAL)
-          taskInfos(taskId) = info
-          // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
-          // we assume the task can be serialized without exceptions.
-          val bytes = Task.serializeWithDependencies(
-            task, sched.sc.addedFiles, sched.sc.addedJars, ser)
-          logInfo("Size of task " + taskId + " is " + bytes.limit + " bytes")
-          val taskName = "task %s:%d".format(taskSet.id, index)
-          copiesRunning(index) += 1
-          increaseRunningTasks(1)
-          taskStarted(task, info)
-          return Some(new TaskDescription(taskId, null, taskName, index, bytes))
-        case None => {}
-      }
-    }
-    return None
-  }
-
-  def taskStarted(task: Task[_], info: TaskInfo) {
-    sched.dagScheduler.taskStarted(task, info)
-  }
-
-  def taskEnded(tid: Long, state: TaskState, serializedData: ByteBuffer) {
-    val info = taskInfos(tid)
-    val index = info.index
-    val task = taskSet.tasks(index)
-    info.markSuccessful()
-    val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader) match {
-      case directResult: DirectTaskResult[_] => directResult
-      case IndirectTaskResult(blockId) => {
-        throw new SparkException("Expect only DirectTaskResults when using LocalScheduler")
-      }
-    }
-    result.metrics.resultSize = serializedData.limit()
-    sched.dagScheduler.taskEnded(task, Success, result.value, result.accumUpdates, info,
-      result.metrics)
-    numFinished += 1
-    decreaseRunningTasks(1)
-    finished(index) = true
-    if (numFinished == numTasks) {
-      sched.taskSetFinished(this)
-    }
-  }
-
-  def taskFailed(tid: Long, state: TaskState, serializedData: ByteBuffer) {
-    val info = taskInfos(tid)
-    val index = info.index
-    val task = taskSet.tasks(index)
-    info.markFailed()
-    decreaseRunningTasks(1)
-    val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](
-      serializedData, getClass.getClassLoader)
-    sched.dagScheduler.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
-    if (!finished(index)) {
-      copiesRunning(index) -= 1
-      numFailures(index) += 1
-      val locs = reason.stackTrace.map(loc => "\tat %s".format(loc.toString))
-      logInfo("Loss was due to %s\n%s\n%s".format(
-        reason.className, reason.description, locs.mkString("\n")))
-      if (numFailures(index) > MAX_TASK_FAILURES) {
-        val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(
-          taskSet.id, index, MAX_TASK_FAILURES, reason.description)
-        decreaseRunningTasks(runningTasks)
-        sched.dagScheduler.taskSetFailed(taskSet, errorMessage)
-        // need to delete failed Taskset from schedule queue
-        sched.taskSetFinished(this)
-      }
-    }
-  }
-
-  override def error(message: String) {
-    sched.dagScheduler.taskSetFailed(taskSet, message)
-    sched.taskSetFinished(this)
-  }
-}
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index af448fcb37a1f2676ea2a68e8b5f48f6c492e609..befdc1589f009de6c16e53280f9725bd25d1ccf2 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -42,7 +42,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
   // Run a 3-task map job in which task 1 deterministically fails once, and check
   // whether the job completes successfully and we ran 4 tasks in total.
   test("failure in a single-stage job") {
-    sc = new SparkContext("local[1,1]", "test")
+    sc = new SparkContext("local[1,2]", "test")
     val results = sc.makeRDD(1 to 3, 3).map { x =>
       FailureSuiteState.synchronized {
         FailureSuiteState.tasksRun += 1
@@ -62,7 +62,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
 
   // Run a map-reduce job in which a reduce task deterministically fails once.
   test("failure in a two-stage job") {
-    sc = new SparkContext("local[1,1]", "test")
+    sc = new SparkContext("local[1,2]", "test")
     val results = sc.makeRDD(1 to 3).map(x => (x, x)).groupByKey(3).map {
       case (k, v) =>
         FailureSuiteState.synchronized {
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 151af0d213c65e6144441f42be49163b2389faf4..f28d5c7b133b379ba8375dba8d9eded4b7eedf20 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -19,20 +19,21 @@ package org.apache.spark
 
 import org.scalatest.{FunSuite, PrivateMethodTester}
 
-import org.apache.spark.scheduler.TaskScheduler
-import org.apache.spark.scheduler.cluster.{ClusterScheduler, SimrSchedulerBackend, SparkDeploySchedulerBackend}
+import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskScheduler}
+import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
 import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import org.apache.spark.scheduler.local.LocalScheduler
+import org.apache.spark.scheduler.local.LocalBackend
 
 class SparkContextSchedulerCreationSuite
   extends FunSuite with PrivateMethodTester with LocalSparkContext with Logging {
 
-  def createTaskScheduler(master: String): TaskScheduler = {
+  def createTaskScheduler(master: String): TaskSchedulerImpl = {
     // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
     // real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
     sc = new SparkContext("local", "test")
     val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
-    SparkContext invokePrivate createTaskSchedulerMethod(sc, master, "test")
+    val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master, "test")
+    sched.asInstanceOf[TaskSchedulerImpl]
   }
 
   test("bad-master") {
@@ -43,55 +44,49 @@ class SparkContextSchedulerCreationSuite
   }
 
   test("local") {
-    createTaskScheduler("local") match {
-      case s: LocalScheduler =>
-        assert(s.threads === 1)
-        assert(s.maxFailures === 0)
+    val sched = createTaskScheduler("local")
+    sched.backend match {
+      case s: LocalBackend => assert(s.totalCores === 1)
       case _ => fail()
     }
   }
 
   test("local-n") {
-    createTaskScheduler("local[5]") match {
-      case s: LocalScheduler =>
-        assert(s.threads === 5)
-        assert(s.maxFailures === 0)
+    val sched = createTaskScheduler("local[5]")
+    assert(sched.maxTaskFailures === 1)
+    sched.backend match {
+      case s: LocalBackend => assert(s.totalCores === 5)
       case _ => fail()
     }
   }
 
   test("local-n-failures") {
-    createTaskScheduler("local[4, 2]") match {
-      case s: LocalScheduler =>
-        assert(s.threads === 4)
-        assert(s.maxFailures === 2)
+    val sched = createTaskScheduler("local[4, 2]")
+    assert(sched.maxTaskFailures === 2)
+    sched.backend match {
+      case s: LocalBackend => assert(s.totalCores === 4)
       case _ => fail()
     }
   }
 
   test("simr") {
-    createTaskScheduler("simr://uri") match {
-      case s: ClusterScheduler =>
-        assert(s.backend.isInstanceOf[SimrSchedulerBackend])
+    createTaskScheduler("simr://uri").backend match {
+      case s: SimrSchedulerBackend => // OK
       case _ => fail()
     }
   }
 
   test("local-cluster") {
-    createTaskScheduler("local-cluster[3, 14, 512]") match {
-      case s: ClusterScheduler =>
-        assert(s.backend.isInstanceOf[SparkDeploySchedulerBackend])
+    createTaskScheduler("local-cluster[3, 14, 512]").backend match {
+      case s: SparkDeploySchedulerBackend => // OK
       case _ => fail()
     }
   }
 
   def testYarn(master: String, expectedClassName: String) {
     try {
-      createTaskScheduler(master) match {
-        case s: ClusterScheduler =>
-          assert(s.getClass === Class.forName(expectedClassName))
-        case _ => fail()
-      }
+      val sched = createTaskScheduler(master)
+      assert(sched.getClass === Class.forName(expectedClassName))
     } catch {
       case e: SparkException =>
         assert(e.getMessage.contains("YARN mode not available"))
@@ -110,11 +105,8 @@ class SparkContextSchedulerCreationSuite
 
   def testMesos(master: String, expectedClass: Class[_]) {
     try {
-      createTaskScheduler(master) match {
-        case s: ClusterScheduler =>
-          assert(s.backend.getClass === expectedClass)
-        case _ => fail()
-      }
+      val sched = createTaskScheduler(master)
+      assert(sched.backend.getClass === expectedClass)
     } catch {
       case e: UnsatisfiedLinkError =>
         assert(e.getMessage.contains("no mesos in"))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
similarity index 95%
rename from core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
rename to core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
index 95d3553d918407d4f4ab8b3af7ff4b1f7a7aa07a..702edb862fea97e317d6d74a218692b5700d088b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
@@ -15,14 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 import org.scalatest.FunSuite
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark._
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster._
 import scala.collection.mutable.ArrayBuffer
 
 import java.util.Properties
@@ -31,9 +29,9 @@ class FakeTaskSetManager(
     initPriority: Int,
     initStageId: Int,
     initNumTasks: Int,
-    clusterScheduler: ClusterScheduler,
+    clusterScheduler: TaskSchedulerImpl,
     taskSet: TaskSet)
-  extends ClusterTaskSetManager(clusterScheduler, taskSet) {
+  extends TaskSetManager(clusterScheduler, taskSet, 0) {
 
   parent = null
   weight = 1
@@ -106,7 +104,7 @@ class FakeTaskSetManager(
 
 class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
 
-  def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): FakeTaskSetManager = {
+  def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl, taskSet: TaskSet): FakeTaskSetManager = {
     new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
   }
 
@@ -133,7 +131,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
 
   test("FIFO Scheduler Test") {
     sc = new SparkContext("local", "ClusterSchedulerSuite")
-    val clusterScheduler = new ClusterScheduler(sc)
+    val clusterScheduler = new TaskSchedulerImpl(sc)
     var tasks = ArrayBuffer[Task[_]]()
     val task = new FakeTask(0)
     tasks += task
@@ -160,7 +158,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
 
   test("Fair Scheduler Test") {
     sc = new SparkContext("local", "ClusterSchedulerSuite")
-    val clusterScheduler = new ClusterScheduler(sc)
+    val clusterScheduler = new TaskSchedulerImpl(sc)
     var tasks = ArrayBuffer[Task[_]]()
     val task = new FakeTask(0)
     tasks += task
@@ -217,7 +215,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
 
   test("Nested Pool Test") {
     sc = new SparkContext("local", "ClusterSchedulerSuite")
-    val clusterScheduler = new ClusterScheduler(sc)
+    val clusterScheduler = new TaskSchedulerImpl(sc)
     var tasks = ArrayBuffer[Task[_]]()
     val task = new FakeTask(0)
     tasks += task
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
similarity index 91%
rename from core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala
rename to core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index 0f01515179f0b8683716ffcc0e2ba1b31b3dff87..0b90c4e74c8a481d0befe61a25f6c5d0301c9328 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -15,10 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 import org.apache.spark.TaskContext
-import org.apache.spark.scheduler.{TaskLocation, Task}
 
 class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0) {
   override def runTask(context: TaskContext): Int = 0
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 2e41438a527aac204662ec5f6c7b687a408b4da4..d4320e5e14458afef262316bdf18d4e75257ac18 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -19,23 +19,26 @@ package org.apache.spark.scheduler
 
 import scala.collection.mutable.{Buffer, HashSet}
 
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
 import org.scalatest.matchers.ShouldMatchers
 
 import org.apache.spark.{LocalSparkContext, SparkContext}
 import org.apache.spark.SparkContext._
 
 class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
-    with BeforeAndAfterAll {
+    with BeforeAndAfter with BeforeAndAfterAll {
   /** Length of time to wait while draining listener events. */
   val WAIT_TIMEOUT_MILLIS = 10000
 
+  before {
+    sc = new SparkContext("local", "SparkListenerSuite")
+  }
+
   override def afterAll {
     System.clearProperty("spark.akka.frameSize")
   }
 
   test("basic creation of StageInfo") {
-    sc = new SparkContext("local", "DAGSchedulerSuite")
     val listener = new SaveStageInfo
     sc.addSparkListener(listener)
     val rdd1 = sc.parallelize(1 to 100, 4)
@@ -56,7 +59,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
   }
 
   test("StageInfo with fewer tasks than partitions") {
-    sc = new SparkContext("local", "DAGSchedulerSuite")
     val listener = new SaveStageInfo
     sc.addSparkListener(listener)
     val rdd1 = sc.parallelize(1 to 100, 4)
@@ -72,7 +74,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
   }
 
   test("local metrics") {
-    sc = new SparkContext("local", "DAGSchedulerSuite")
     val listener = new SaveStageInfo
     sc.addSparkListener(listener)
     sc.addSparkListener(new StatsReportListener)
@@ -135,10 +136,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
   }
 
   test("onTaskGettingResult() called when result fetched remotely") {
-    // Need to use local cluster mode here, because results are not ever returned through the
-    // block manager when using the LocalScheduler.
-    sc = new SparkContext("local-cluster[1,1,512]", "test")
-
     val listener = new SaveTaskEvents
     sc.addSparkListener(listener)
  
@@ -157,10 +154,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
   }
 
   test("onTaskGettingResult() not called when result sent directly") {
-    // Need to use local cluster mode here, because results are not ever returned through the
-    // block manager when using the LocalScheduler.
-    sc = new SparkContext("local-cluster[1,1,512]", "test")
-
     val listener = new SaveTaskEvents
     sc.addSparkListener(listener)
  
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
similarity index 87%
rename from core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
rename to core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 27c2d5336142f26f90f9903170ecfd0bc79b58fd..ca97f7d2a5be6fa979e2f7f656834ed0bef80ac7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -15,14 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 import java.nio.ByteBuffer
 
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
 
 import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
-import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
 import org.apache.spark.storage.TaskResultBlockId
 
 /**
@@ -31,12 +30,12 @@ import org.apache.spark.storage.TaskResultBlockId
  * Used to test the case where a BlockManager evicts the task result (or dies) before the
  * TaskResult is retrieved.
  */
-class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
+class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
   extends TaskResultGetter(sparkEnv, scheduler) {
   var removedResult = false
 
   override def enqueueSuccessfulTask(
-    taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) {
+    taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
     if (!removedResult) {
       // Only remove the result once, since we'd like to test the case where the task eventually
       // succeeds.
@@ -65,22 +64,18 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
     System.setProperty("spark.akka.frameSize", "1")
   }
 
-  before {
-    // Use local-cluster mode because results are returned differently when running with the
-    // LocalScheduler.
-    sc = new SparkContext("local-cluster[1,1,512]", "test")
-  }
-
   override def afterAll {
     System.clearProperty("spark.akka.frameSize")
   }
 
   test("handling results smaller than Akka frame size") {
+    sc = new SparkContext("local", "test")
     val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
     assert(result === 2)
   }
 
-  test("handling results larger than Akka frame size") { 
+  test("handling results larger than Akka frame size") {
+    sc = new SparkContext("local", "test")
     val akkaFrameSize =
       sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
     val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
@@ -92,10 +87,13 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
   }
 
   test("task retried if result missing from block manager") {
+    // Set the maximum number of task failures to > 0, so that the task set isn't aborted
+    // after the result is missing.
+    sc = new SparkContext("local[1,2]", "test")
     // If this test hangs, it's probably because no resource offers were made after the task
     // failed.
-    val scheduler: ClusterScheduler = sc.taskScheduler match {
-      case clusterScheduler: ClusterScheduler =>
+    val scheduler: TaskSchedulerImpl = sc.taskScheduler match {
+      case clusterScheduler: TaskSchedulerImpl =>
         clusterScheduler
       case _ =>
         assert(false, "Expect local cluster to use ClusterScheduler")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
similarity index 93%
rename from core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
rename to core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index bb28a31a990bc78874892dd543f9557b255bbb1f..3dcb01ae5e44630d0daef407e883a3992430b63c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable
@@ -23,7 +23,6 @@ import scala.collection.mutable
 import org.scalatest.FunSuite
 
 import org.apache.spark._
-import org.apache.spark.scheduler._
 import org.apache.spark.executor.TaskMetrics
 import java.nio.ByteBuffer
 import org.apache.spark.util.{Utils, FakeClock}
@@ -56,10 +55,10 @@ class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler
  * A mock ClusterScheduler implementation that just remembers information about tasks started and
  * feedback received from the TaskSetManagers. Note that it's important to initialize this with
  * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
- * to work, and these are required for locality in ClusterTaskSetManager.
+ * to work, and these are required for locality in TaskSetManager.
  */
 class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
-  extends ClusterScheduler(sc)
+  extends TaskSchedulerImpl(sc)
 {
   val startedTasks = new ArrayBuffer[Long]
   val endedTasks = new mutable.HashMap[Long, TaskEndReason]
@@ -79,16 +78,17 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
   override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
 }
 
-class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
+class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
   import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
 
   val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
+  val MAX_TASK_FAILURES = 4
 
   test("TaskSet with no preferences") {
     sc = new SparkContext("local", "test")
     val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
     val taskSet = createTaskSet(1)
-    val manager = new ClusterTaskSetManager(sched, taskSet)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
 
     // Offer a host with no CPUs
     assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None)
@@ -114,7 +114,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
     sc = new SparkContext("local", "test")
     val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
     val taskSet = createTaskSet(3)
-    val manager = new ClusterTaskSetManager(sched, taskSet)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
 
     // First three offers should all find tasks
     for (i <- 0 until 3) {
@@ -151,7 +151,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
       Seq()   // Last task has no locality prefs
     )
     val clock = new FakeClock
-    val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     // First offer host1, exec1: first task should be chosen
     assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
@@ -197,7 +197,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
       Seq(TaskLocation("host2"))
     )
     val clock = new FakeClock
-    val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     // First offer host1: first task should be chosen
     assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
@@ -234,7 +234,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
       Seq(TaskLocation("host3"))
     )
     val clock = new FakeClock
-    val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     // First offer host1: first task should be chosen
     assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
@@ -262,7 +262,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
     val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
     val taskSet = createTaskSet(1)
     val clock = new FakeClock
-    val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
 
@@ -279,17 +279,17 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
     val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
     val taskSet = createTaskSet(1)
     val clock = new FakeClock
-    val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
     // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
     // after the last failure.
-    (1 to manager.MAX_TASK_FAILURES).foreach { index =>
+    (1 to manager.maxTaskFailures).foreach { index =>
       val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
       assert(offerResult != None,
         "Expect resource offer on iteration %s to return a task".format(index))
       assert(offerResult.get.index === 0)
       manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost))
-      if (index < manager.MAX_TASK_FAILURES) {
+      if (index < MAX_TASK_FAILURES) {
         assert(!sched.taskSetsFailed.contains(taskSet.id))
       } else {
         assert(sched.taskSetsFailed.contains(taskSet.id))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala
deleted file mode 100644
index 1e676c1719337179e02940472632baeb7966b63a..0000000000000000000000000000000000000000
--- a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.local
-
-import java.util.concurrent.Semaphore
-import java.util.concurrent.CountDownLatch
-
-import scala.collection.mutable.HashMap
-
-import org.scalatest.{BeforeAndAfterEach, FunSuite}
-
-import org.apache.spark._
-
-
-class Lock() {
-  var finished = false
-  def jobWait() = {
-    synchronized {
-      while(!finished) {
-        this.wait()
-      }
-    }
-  }
-
-  def jobFinished() = {
-    synchronized {
-      finished = true
-      this.notifyAll()
-    }
-  }
-}
-
-object TaskThreadInfo {
-  val threadToLock = HashMap[Int, Lock]()
-  val threadToRunning = HashMap[Int, Boolean]()
-  val threadToStarted = HashMap[Int, CountDownLatch]()
-}
-
-/*
- * 1. each thread contains one job.
- * 2. each job contains one stage.
- * 3. each stage only contains one task.
- * 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
- *    it will get cpu core resource, and will wait to finished after user manually
- *    release "Lock" and then cluster will contain another free cpu cores.
- * 5. each task(pending) must use "sleep" to  make sure it has been added to taskSetManager queue,
- *    thus it will be scheduled later when cluster has free cpu cores.
- */
-class LocalSchedulerSuite extends FunSuite with LocalSparkContext with BeforeAndAfterEach {
-
-  override def afterEach() {
-    super.afterEach()
-    System.clearProperty("spark.scheduler.mode")
-  }
-
-  def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) {
-
-    TaskThreadInfo.threadToRunning(threadIndex) = false
-    val nums = sc.parallelize(threadIndex to threadIndex, 1)
-    TaskThreadInfo.threadToLock(threadIndex) = new Lock()
-    TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
-    new Thread {
-      if (poolName != null) {
-        sc.setLocalProperty("spark.scheduler.pool", poolName)
-      }
-      override def run() {
-        val ans = nums.map(number => {
-          TaskThreadInfo.threadToRunning(number) = true
-          TaskThreadInfo.threadToStarted(number).countDown()
-          TaskThreadInfo.threadToLock(number).jobWait()
-          TaskThreadInfo.threadToRunning(number) = false
-          number
-        }).collect()
-        assert(ans.toList === List(threadIndex))
-        sem.release()
-      }
-    }.start()
-  }
-
-  test("Local FIFO scheduler end-to-end test") {
-    System.setProperty("spark.scheduler.mode", "FIFO")
-    sc = new SparkContext("local[4]", "test")
-    val sem = new Semaphore(0)
-
-    createThread(1,null,sc,sem)
-    TaskThreadInfo.threadToStarted(1).await()
-    createThread(2,null,sc,sem)
-    TaskThreadInfo.threadToStarted(2).await()
-    createThread(3,null,sc,sem)
-    TaskThreadInfo.threadToStarted(3).await()
-    createThread(4,null,sc,sem)
-    TaskThreadInfo.threadToStarted(4).await()
-    // thread 5 and 6 (stage pending)must meet following two points
-    // 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
-    //    queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
-    // 2. priority of stage in thread 5 should be prior to priority of stage in thread 6
-    // So I just use "sleep" 1s here for each thread.
-    // TODO: any better solution?
-    createThread(5,null,sc,sem)
-    Thread.sleep(1000)
-    createThread(6,null,sc,sem)
-    Thread.sleep(1000)
-
-    assert(TaskThreadInfo.threadToRunning(1) === true)
-    assert(TaskThreadInfo.threadToRunning(2) === true)
-    assert(TaskThreadInfo.threadToRunning(3) === true)
-    assert(TaskThreadInfo.threadToRunning(4) === true)
-    assert(TaskThreadInfo.threadToRunning(5) === false)
-    assert(TaskThreadInfo.threadToRunning(6) === false)
-
-    TaskThreadInfo.threadToLock(1).jobFinished()
-    TaskThreadInfo.threadToStarted(5).await()
-
-    assert(TaskThreadInfo.threadToRunning(1) === false)
-    assert(TaskThreadInfo.threadToRunning(2) === true)
-    assert(TaskThreadInfo.threadToRunning(3) === true)
-    assert(TaskThreadInfo.threadToRunning(4) === true)
-    assert(TaskThreadInfo.threadToRunning(5) === true)
-    assert(TaskThreadInfo.threadToRunning(6) === false)
-
-    TaskThreadInfo.threadToLock(3).jobFinished()
-    TaskThreadInfo.threadToStarted(6).await()
-
-    assert(TaskThreadInfo.threadToRunning(1) === false)
-    assert(TaskThreadInfo.threadToRunning(2) === true)
-    assert(TaskThreadInfo.threadToRunning(3) === false)
-    assert(TaskThreadInfo.threadToRunning(4) === true)
-    assert(TaskThreadInfo.threadToRunning(5) === true)
-    assert(TaskThreadInfo.threadToRunning(6) === true)
-
-    TaskThreadInfo.threadToLock(2).jobFinished()
-    TaskThreadInfo.threadToLock(4).jobFinished()
-    TaskThreadInfo.threadToLock(5).jobFinished()
-    TaskThreadInfo.threadToLock(6).jobFinished()
-    sem.acquire(6)
-  }
-
-  test("Local fair scheduler end-to-end test") {
-    System.setProperty("spark.scheduler.mode", "FAIR")
-    val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    System.setProperty("spark.scheduler.allocation.file", xmlPath)
-
-    sc = new SparkContext("local[8]", "LocalSchedulerSuite")
-    val sem = new Semaphore(0)
-
-    createThread(10,"1",sc,sem)
-    TaskThreadInfo.threadToStarted(10).await()
-    createThread(20,"2",sc,sem)
-    TaskThreadInfo.threadToStarted(20).await()
-    createThread(30,"3",sc,sem)
-    TaskThreadInfo.threadToStarted(30).await()
-
-    assert(TaskThreadInfo.threadToRunning(10) === true)
-    assert(TaskThreadInfo.threadToRunning(20) === true)
-    assert(TaskThreadInfo.threadToRunning(30) === true)
-
-    createThread(11,"1",sc,sem)
-    TaskThreadInfo.threadToStarted(11).await()
-    createThread(21,"2",sc,sem)
-    TaskThreadInfo.threadToStarted(21).await()
-    createThread(31,"3",sc,sem)
-    TaskThreadInfo.threadToStarted(31).await()
-
-    assert(TaskThreadInfo.threadToRunning(11) === true)
-    assert(TaskThreadInfo.threadToRunning(21) === true)
-    assert(TaskThreadInfo.threadToRunning(31) === true)
-
-    createThread(12,"1",sc,sem)
-    TaskThreadInfo.threadToStarted(12).await()
-    createThread(22,"2",sc,sem)
-    TaskThreadInfo.threadToStarted(22).await()
-    createThread(32,"3",sc,sem)
-
-    assert(TaskThreadInfo.threadToRunning(12) === true)
-    assert(TaskThreadInfo.threadToRunning(22) === true)
-    assert(TaskThreadInfo.threadToRunning(32) === false)
-
-    TaskThreadInfo.threadToLock(10).jobFinished()
-    TaskThreadInfo.threadToStarted(32).await()
-
-    assert(TaskThreadInfo.threadToRunning(32) === true)
-
-    //1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
-    //   queue so that cluster will assign free cpu core to stage 23 after stage 11 finished.
-    //2. priority of 23 and 33 will be meaningless as using fair scheduler here.
-    createThread(23,"2",sc,sem)
-    createThread(33,"3",sc,sem)
-    Thread.sleep(1000)
-
-    TaskThreadInfo.threadToLock(11).jobFinished()
-    TaskThreadInfo.threadToStarted(23).await()
-
-    assert(TaskThreadInfo.threadToRunning(23) === true)
-    assert(TaskThreadInfo.threadToRunning(33) === false)
-
-    TaskThreadInfo.threadToLock(12).jobFinished()
-    TaskThreadInfo.threadToStarted(33).await()
-
-    assert(TaskThreadInfo.threadToRunning(33) === true)
-
-    TaskThreadInfo.threadToLock(20).jobFinished()
-    TaskThreadInfo.threadToLock(21).jobFinished()
-    TaskThreadInfo.threadToLock(22).jobFinished()
-    TaskThreadInfo.threadToLock(23).jobFinished()
-    TaskThreadInfo.threadToLock(30).jobFinished()
-    TaskThreadInfo.threadToLock(31).jobFinished()
-    TaskThreadInfo.threadToLock(32).jobFinished()
-    TaskThreadInfo.threadToLock(33).jobFinished()
-
-    sem.acquire(11)
-  }
-}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 29b3f22e13697b38bc501e2f914d8fc0a202d722..4e988b8017600e09b3300fe1a3ec36e82025a73d 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -17,16 +17,20 @@
 
 package org.apache.spark.scheduler.cluster
 
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.spark._
 import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
+import org.apache.spark.scheduler.ClusterScheduler
 import org.apache.spark.util.Utils
-import org.apache.hadoop.conf.Configuration
 
 /**
  *
- * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done
+ * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
+ * ApplicationMaster, etc. is done
  */
-private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
+private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
+  extends ClusterScheduler(sc) {
 
   logInfo("Created YarnClusterScheduler")