diff --git a/core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala b/core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
index 307d96111c8cd9ce269b55f5c3426deef7f0f59d..bb58353e0cc68e657b20c5e06478d12545fb08ee 100644
--- a/core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -41,13 +41,6 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
     if (retval != null) Some(retval) else None
   }
 
-  // By default, if rack is unknown, return nothing
-  override def getCachedHostsForRack(rack: String): Option[Set[String]] = {
-    if (rack == None || rack == null) return None
-
-    YarnAllocationHandler.fetchCachedHostsForRack(rack)
-  }
-
   override def postStartHook() {
     val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
     if (sparkContextInitialized){
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 2c417e31dba6328a60ab34fb7d9918626b632cb7..0cd0341a7242e18006e5dbfa0e0235472222a12a 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -64,11 +64,11 @@ private[spark] class MapOutputTracker extends Logging {
 
   // Incremented every time a fetch fails so that client nodes know to clear
   // their cache of map output locations if this happens.
-  private var generation: Long = 0
-  private val generationLock = new java.lang.Object
+  private var epoch: Long = 0
+  private val epochLock = new java.lang.Object
 
   // Cache a serialized version of the output statuses for each shuffle to send them out faster
-  var cacheGeneration = generation
+  var cacheEpoch = epoch
   private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]
 
   val metadataCleaner = new MetadataCleaner("MapOutputTracker", this.cleanup)
@@ -108,10 +108,10 @@ private[spark] class MapOutputTracker extends Logging {
   def registerMapOutputs(
       shuffleId: Int,
       statuses: Array[MapStatus],
-      changeGeneration: Boolean = false) {
+      changeEpoch: Boolean = false) {
     mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses)
-    if (changeGeneration) {
-      incrementGeneration()
+    if (changeEpoch) {
+      incrementEpoch()
     }
   }
 
@@ -124,7 +124,7 @@ private[spark] class MapOutputTracker extends Logging {
           array(mapId) = null
         }
       }
-      incrementGeneration()
+      incrementEpoch()
     } else {
       throw new SparkException("unregisterMapOutput called for nonexistent shuffle ID")
     }
@@ -206,58 +206,58 @@ private[spark] class MapOutputTracker extends Logging {
     trackerActor = null
   }
 
-  // Called on master to increment the generation number
-  def incrementGeneration() {
-    generationLock.synchronized {
-      generation += 1
-      logDebug("Increasing generation to " + generation)
+  // Called on master to increment the epoch number
+  def incrementEpoch() {
+    epochLock.synchronized {
+      epoch += 1
+      logDebug("Increasing epoch to " + epoch)
     }
   }
 
-  // Called on master or workers to get current generation number
-  def getGeneration: Long = {
-    generationLock.synchronized {
-      return generation
+  // Called on master or workers to get current epoch number
+  def getEpoch: Long = {
+    epochLock.synchronized {
+      return epoch
     }
   }
 
-  // Called on workers to update the generation number, potentially clearing old outputs
-  // because of a fetch failure. (Each Mesos task calls this with the latest generation
+  // Called on workers to update the epoch number, potentially clearing old outputs
+  // because of a fetch failure. (Each worker task calls this with the latest epoch
   // number on the master at the time it was created.)
-  def updateGeneration(newGen: Long) {
-    generationLock.synchronized {
-      if (newGen > generation) {
-        logInfo("Updating generation to " + newGen + " and clearing cache")
+  def updateEpoch(newEpoch: Long) {
+    epochLock.synchronized {
+      if (newEpoch > epoch) {
+        logInfo("Updating epoch to " + newEpoch + " and clearing cache")
         // mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
         mapStatuses.clear()
-        generation = newGen
+        epoch = newEpoch
       }
     }
   }
 
   def getSerializedLocations(shuffleId: Int): Array[Byte] = {
     var statuses: Array[MapStatus] = null
-    var generationGotten: Long = -1
-    generationLock.synchronized {
-      if (generation > cacheGeneration) {
+    var epochGotten: Long = -1
+    epochLock.synchronized {
+      if (epoch > cacheEpoch) {
         cachedSerializedStatuses.clear()
-        cacheGeneration = generation
+        cacheEpoch = epoch
       }
       cachedSerializedStatuses.get(shuffleId) match {
         case Some(bytes) =>
           return bytes
         case None =>
           statuses = mapStatuses(shuffleId)
-          generationGotten = generation
+          epochGotten = epoch
       }
     }
     // If we got here, we failed to find the serialized locations in the cache, so we pulled
     // out a snapshot of the locations as "locs"; let's serialize and return that
     val bytes = serializeStatuses(statuses)
     logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
-    // Add them into the table only if the generation hasn't changed while we were working
-    generationLock.synchronized {
-      if (generation == generationGotten) {
+    // Add them into the table only if the epoch hasn't changed while we were working
+    epochLock.synchronized {
+      if (epoch == epochGotten) {
         cachedSerializedStatuses(shuffleId) = bytes
       }
     }
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index c9a044afabb91445385d711572094ea64d16f586..25a6951732b055d87123be8b6044ca0eb548bd98 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -220,8 +220,8 @@ abstract class RDD[T: ClassManifest](
   }
 
   /**
-   * Get the preferred location of a split, taking into account whether the
-   * RDD is checkpointed or not.
+   * Get the preferred locations of a partition (as hostnames), taking into account whether the
+   * RDD is checkpointed.
    */
   final def preferredLocations(split: Partition): Seq[String] = {
     checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 0adbf1d96e9d38f2ba502e85287297902d3b4da8..bca90886a32692add76c479b182bd479d8673166 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -54,11 +54,7 @@ class SparkEnv (
     val connectionManager: ConnectionManager,
     val httpFileServer: HttpFileServer,
     val sparkFilesDir: String,
-    val metricsSystem: MetricsSystem,
-    // To be set only as part of initialization of SparkContext.
-    // (executorId, defaultHostPort) => executorHostPort
-    // If executorId is NOT found, return defaultHostPort
-    var executorIdToHostPort: Option[(String, String) => String]) {
+    val metricsSystem: MetricsSystem) {
 
   private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
 
@@ -83,16 +79,6 @@ class SparkEnv (
       pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
     }
   }
-
-  def resolveExecutorIdToHostPort(executorId: String, defaultHostPort: String): String = {
-    val env = SparkEnv.get
-    if (env.executorIdToHostPort.isEmpty) {
-      // default to using host, not host port. Relevant to non cluster modes.
-      return defaultHostPort
-    }
-
-    env.executorIdToHostPort.get(executorId, defaultHostPort)
-  }
 }
 
 object SparkEnv extends Logging {
@@ -236,7 +222,6 @@ object SparkEnv extends Logging {
       connectionManager,
       httpFileServer,
       sparkFilesDir,
-      metricsSystem,
-      None)
+      metricsSystem)
   }
 }
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 885a7391d63d9bbd81d79e8c6d68bad24eb82748..a05dcdcd97da3b7790ada73b0cd415de979cd524 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -393,41 +393,14 @@ private object Utils extends Logging {
     retval
   }
 
-/*
-  // Used by DEBUG code : remove when all testing done
-  private val ipPattern = Pattern.compile("^[0-9]+(\\.[0-9]+)*$")
   def checkHost(host: String, message: String = "") {
-    // Currently catches only ipv4 pattern, this is just a debugging tool - not rigourous !
-    // if (host.matches("^[0-9]+(\\.[0-9]+)*$")) {
-    if (ipPattern.matcher(host).matches()) {
-      Utils.logErrorWithStack("Unexpected to have host " + host + " which matches IP pattern. Message " + message)
-    }
-    if (Utils.parseHostPort(host)._2 != 0){
-      Utils.logErrorWithStack("Unexpected to have host " + host + " which has port in it. Message " + message)
-    }
+    assert(host.indexOf(':') == -1, message)
   }
 
-  // Used by DEBUG code : remove when all testing done
   def checkHostPort(hostPort: String, message: String = "") {
-    val (host, port) = Utils.parseHostPort(hostPort)
-    checkHost(host)
-    if (port <= 0){
-      Utils.logErrorWithStack("Unexpected to have port " + port + " which is not valid in " + hostPort + ". Message " + message)
-    }
+    assert(hostPort.indexOf(':') != -1, message)
   }
 
-  // Used by DEBUG code : remove when all testing done
-  def logErrorWithStack(msg: String) {
-    try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
-    // temp code for debug
-    System.exit(-1)
-  }
-*/
-
-  // Once testing is complete in various modes, replace with this ?
-  def checkHost(host: String, message: String = "") {}
-  def checkHostPort(hostPort: String, message: String = "") {}
-
   // Used by DEBUG code : remove when all testing done
   def logErrorWithStack(msg: String) {
     try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index 31861f3ac2d985b5615eed7bb08adca40a6e02af..0db13ffc98723ff59183e02212a1e13a5575a5b9 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -80,6 +80,7 @@ private[deploy] object DeployMessages {
 
   case class RegisteredApplication(appId: String) extends DeployMessage
 
+  // TODO(matei): replace hostPort with host
   case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
     Utils.checkHostPort(hostPort, "Required hostport")
   }
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index f661accd2ff2f37d64c8c59e085546b7410cd990..5e53d95ac2f364dd01e601200e4d361b4a0596f2 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -40,13 +40,11 @@ private[spark] class ExecutorRunner(
     val memory: Int,
     val worker: ActorRef,
     val workerId: String,
-    val hostPort: String,
+    val host: String,
     val sparkHome: File,
     val workDir: File)
   extends Logging {
 
-  Utils.checkHostPort(hostPort, "Expected hostport")
-
   val fullId = appId + "/" + execId
   var workerThread: Thread = null
   var process: Process = null
@@ -92,7 +90,7 @@ private[spark] class ExecutorRunner(
   /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
   def substituteVariables(argument: String): String = argument match {
     case "{{EXECUTOR_ID}}" => execId.toString
-    case "{{HOSTNAME}}" => Utils.parseHostPort(hostPort)._1
+    case "{{HOSTNAME}}" => host
     case "{{CORES}}" => cores.toString
     case other => other
   }
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index d4b58fc34e88deacb1670d2ef7b9c9c5b88005d5..053ac5522607b0d9bd57df2d4501163adae0806c 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -132,7 +132,7 @@ private[spark] class Worker(
     case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
       logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
       val manager = new ExecutorRunner(
-        appId, execId, appDesc, cores_, memory_, self, workerId, host + ":" + port, new File(execSparkHome_), workDir)
+        appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
       executors(appId + "/" + execId) = manager
       manager.start()
       coresUsed += cores_
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 05a960d7c506d0df274b7ae7424c90b1d4e3e4e1..036c7191adcffd1383c05c184105ff50b15f662f 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -32,8 +32,12 @@ import spark._
 /**
  * The Mesos executor for Spark.
  */
-private[spark] class Executor(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) extends Logging {
-
+private[spark] class Executor(
+    executorId: String,
+    slaveHostname: String,
+    properties: Seq[(String, String)])
+  extends Logging
+{
   // Application dependencies (added through SparkContext) that we've fetched so far on this node.
   // Each map holds the master's timestamp for the version of that file or JAR we got.
   private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
@@ -125,8 +129,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
         updateDependencies(taskFiles, taskJars)
         val task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
         attemptedTask = Some(task)
-        logInfo("Its generation is " + task.generation)
-        env.mapOutputTracker.updateGeneration(task.generation)
+        logInfo("Its epoch is " + task.epoch)
+        env.mapOutputTracker.updateEpoch(task.epoch)
         taskStart = System.currentTimeMillis()
         val value = task.run(taskId.toInt)
         val taskFinish = System.currentTimeMillis()
diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala
index 0ebb722d73618dd47ae1a80c3db67e1112a50094..03800584ae0a012edea90042904d86380fd6f020 100644
--- a/core/src/main/scala/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/spark/rdd/BlockRDD.scala
@@ -28,13 +28,12 @@ private[spark]
 class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
   extends RDD[T](sc, Nil) {
 
-  @transient lazy val locations_ = BlockManager.blockIdsToExecutorLocations(blockIds, SparkEnv.get)
+  @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
 
   override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => {
     new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
   }).toArray
 
-
   override def compute(split: Partition, context: TaskContext): Iterator[T] = {
     val blockManager = SparkEnv.get.blockManager
     val blockId = split.asInstanceOf[BlockRDDPartition].blockId
@@ -45,8 +44,8 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
     }
   }
 
-  override def getPreferredLocations(split: Partition): Seq[String] =
+  override def getPreferredLocations(split: Partition): Seq[String] = {
     locations_(split.asInstanceOf[BlockRDDPartition].blockId)
-
+  }
 }
 
diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala
index 150e5bca29d4ffc4ebcea95c5f98082808153837..91b3e69d6fb3e759916c917be1d4e8b004466b53 100644
--- a/core/src/main/scala/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala
@@ -64,7 +64,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val currSplit = split.asInstanceOf[CartesianPartition]
-    rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)
+    (rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct
   }
 
   override def compute(split: Partition, context: TaskContext) = {
diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
index 6a4fa13ad61b8db3a30d07ba8d3ce274ac83b711..9a0831bd899e2ff48e7db6601e8fdc72ac279208 100644
--- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
@@ -55,27 +55,15 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
   }
 
   override def getPreferredLocations(s: Partition): Seq[String] = {
-    // Note that as number of rdd's increase and/or number of slaves in cluster increase, the computed preferredLocations below
-    // become diminishingly small : so we might need to look at alternate strategies to alleviate this.
-    // If there are no (or very small number of preferred locations), we will end up transferred the blocks to 'any' node in the
-    // cluster - paying with n/w and cache cost.
-    // Maybe pick a node which figures max amount of time ?
-    // Choose node which is hosting 'larger' of some subset of blocks ?
-    // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible)
-    val splits = s.asInstanceOf[ZippedPartitionsPartition].partitions
-    val rddSplitZip = rdds.zip(splits)
-
-    // exact match.
-    val exactMatchPreferredLocations = rddSplitZip.map(x => x._1.preferredLocations(x._2))
-    val exactMatchLocations = exactMatchPreferredLocations.reduce((x, y) => x.intersect(y))
-
-    // Remove exact match and then do host local match.
-    val exactMatchHosts = exactMatchLocations.map(Utils.parseHostPort(_)._1)
-    val matchPreferredHosts = exactMatchPreferredLocations.map(locs => locs.map(Utils.parseHostPort(_)._1))
-      .reduce((x, y) => x.intersect(y))
-    val otherNodeLocalLocations = matchPreferredHosts.filter { s => !exactMatchHosts.contains(s) }
-
-    otherNodeLocalLocations ++ exactMatchLocations
+    val parts = s.asInstanceOf[ZippedPartitionsPartition].partitions
+    val prefs = rdds.zip(parts).map { case (rdd, p) => rdd.preferredLocations(p) }
+    // Check whether there are any hosts that match all RDDs; otherwise return the union
+    val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y))
+    if (!exactMatchLocations.isEmpty) {
+      exactMatchLocations
+    } else {
+      prefs.flatten.distinct
+    }
   }
 
   override def clearDependencies() {
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala
index b1c43b3195ab63b0be8a5ae9ed423d984e198319..4074e50e44040b96b756efeb69c238b7165ff142 100644
--- a/core/src/main/scala/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala
@@ -65,27 +65,16 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
   }
 
   override def getPreferredLocations(s: Partition): Seq[String] = {
-    // Note that as number of slaves in cluster increase, the computed preferredLocations can become small : so we might need
-    // to look at alternate strategies to alleviate this. (If there are no (or very small number of preferred locations), we
-    // will end up transferred the blocks to 'any' node in the cluster - paying with n/w and cache cost.
-    // Maybe pick one or the other ? (so that atleast one block is local ?).
-    // Choose node which is hosting 'larger' of the blocks ?
-    // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible)
     val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
     val pref1 = rdd1.preferredLocations(partition1)
     val pref2 = rdd2.preferredLocations(partition2)
-
-    // exact match - instance local and host local.
+    // Check whether there are any hosts that match both RDDs; otherwise return the union
     val exactMatchLocations = pref1.intersect(pref2)
-
-    // remove locations which are already handled via exactMatchLocations, and intersect where both partitions are node local.
-    val otherNodeLocalPref1 = pref1.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
-    val otherNodeLocalPref2 = pref2.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
-    val otherNodeLocalLocations = otherNodeLocalPref1.intersect(otherNodeLocalPref2)
-
-
-    // Can have mix of instance local (hostPort) and node local (host) locations as preference !
-    exactMatchLocations ++ otherNodeLocalLocations
+    if (!exactMatchLocations.isEmpty) {
+      exactMatchLocations
+    } else {
+      (pref1 ++ pref2).distinct
+    }
   }
 
   override def clearDependencies() {
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index fbf3f4c80715f7589a88a387edb25031d0a0236b..35b31f45a7cb32d40dacf4e0bfa4db54c9bf8f35 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -32,10 +32,22 @@ import spark.storage.{BlockManager, BlockManagerMaster}
 import spark.util.{MetadataCleaner, TimeStampedHashMap}
 
 /**
- * A Scheduler subclass that implements stage-oriented scheduling. It computes a DAG of stages for
- * each job, keeps track of which RDDs and stage outputs are materialized, and computes a minimal
- * schedule to run the job. Subclasses only need to implement the code to send a task to the cluster
- * and to report fetch failures (the submitTasks method, and code to add CompletionEvents).
+ * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
+ * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a
+ * minimal schedule to run the job. It then submits stages as TaskSets to an underlying
+ * TaskScheduler implementation that runs them on the cluster.
+ *
+ * In addition to coming up with a DAG of stages, this class also determines the preferred
+ * locations to run each task on, based on the current cache status, and passes these to the
+ * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
+ * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are
+ * not caused by shuffie file loss are handled by the TaskScheduler, which will retry each task
+ * a small number of times before cancelling the whole stage.
+ *
+ * THREADING: This class runs all its logic in a single thread executing the run() method, to which
+ * events are submitted using a synchonized queue (eventQueue). The public API methods, such as
+ * runJob, taskEnded and executorLost, post events asynchronously to this queue. All other methods
+ * should be private.
  */
 private[spark]
 class DAGScheduler(
@@ -72,8 +84,8 @@ class DAGScheduler(
   }
 
   // Called by TaskScheduler when a host is added
-  override def executorGained(execId: String, hostPort: String) {
-    eventQueue.put(ExecutorGained(execId, hostPort))
+  override def executorGained(execId: String, host: String) {
+    eventQueue.put(ExecutorGained(execId, host))
   }
 
   // Called by TaskScheduler to cancel an entire TaskSet due to repeated failures.
@@ -104,15 +116,16 @@ class DAGScheduler(
 
   private val listenerBus = new SparkListenerBus()
 
-  var cacheLocs = new HashMap[Int, Array[List[String]]]
+  // Contains the locations that each RDD's partitions are cached on
+  private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
 
-  // For tracking failed nodes, we use the MapOutputTracker's generation number, which is
-  // sent with every task. When we detect a node failing, we note the current generation number
-  // and failed executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask
-  // results.
-  // TODO: Garbage collect information about failure generations when we know there are no more
+  // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with
+  // every task. When we detect a node failing, we note the current epoch number and failed
+  // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results.
+  //
+  // TODO: Garbage collect information about failure epochs when we know there are no more
   //       stray messages to detect.
-  val failedGeneration = new HashMap[String, Long]
+  val failedEpoch = new HashMap[String, Long]
 
   val idToActiveJob = new HashMap[Int, ActiveJob]
 
@@ -141,11 +154,13 @@ class DAGScheduler(
     listenerBus.addListener(listener)
   }
 
-  private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
+  private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
     if (!cacheLocs.contains(rdd.id)) {
       val blockIds = rdd.partitions.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray
-      val locs = BlockManager.blockIdsToExecutorLocations(blockIds, env, blockManagerMaster)
-      cacheLocs(rdd.id) = blockIds.map(locs.getOrElse(_, Nil))
+      val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster)
+      cacheLocs(rdd.id) = blockIds.map { id =>
+        locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId))
+      }
     }
     cacheLocs(rdd.id)
   }
@@ -345,8 +360,8 @@ class DAGScheduler(
           submitStage(finalStage)
         }
 
-      case ExecutorGained(execId, hostPort) =>
-        handleExecutorGained(execId, hostPort)
+      case ExecutorGained(execId, host) =>
+        handleExecutorGained(execId, host)
 
       case ExecutorLost(execId) =>
         handleExecutorLost(execId)
@@ -508,7 +523,7 @@ class DAGScheduler(
     } else {
       // This is a final stage; figure out its job's missing partitions
       val job = resultStageToJob(stage)
-      for (id <- 0 until job.numPartitions if (!job.finished(id))) {
+      for (id <- 0 until job.numPartitions if !job.finished(id)) {
         val partition = job.partitions(id)
         val locs = getPreferredLocs(stage.rdd, partition)
         tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
@@ -518,7 +533,7 @@ class DAGScheduler(
     // should be "StageSubmitted" first and then "JobEnded"
     val properties = idToActiveJob(stage.priority).properties
     listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties))
-    
+
     if (tasks.size > 0) {
       // Preemptively serialize a task to make sure it can be serialized. We are catching this
       // exception here because it would be fairly hard to catch the non-serializable exception
@@ -599,7 +614,7 @@ class DAGScheduler(
             val status = event.result.asInstanceOf[MapStatus]
             val execId = status.location.executorId
             logDebug("ShuffleMapTask finished on " + execId)
-            if (failedGeneration.contains(execId) && smt.generation <= failedGeneration(execId)) {
+            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
               logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
             } else {
               stage.addOutputLoc(smt.partition, status)
@@ -611,11 +626,11 @@ class DAGScheduler(
               logInfo("waiting: " + waiting)
               logInfo("failed: " + failed)
               if (stage.shuffleDep != None) {
-                // We supply true to increment the generation number here in case this is a
+                // We supply true to increment the epoch number here in case this is a
                 // recomputation of the map outputs. In that case, some nodes may have cached
                 // locations with holes (from when we detected the error) and will need the
-                // generation incremented to refetch them.
-                // TODO: Only increment the generation number if this is not the first time
+                // epoch incremented to refetch them.
+                // TODO: Only increment the epoch number if this is not the first time
                 //       we registered these map outputs.
                 mapOutputTracker.registerMapOutputs(
                   stage.shuffleDep.get.shuffleId,
@@ -674,7 +689,7 @@ class DAGScheduler(
         lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock
         // TODO: mark the executor as failed only if there were lots of fetch failures on it
         if (bmAddress != null) {
-          handleExecutorLost(bmAddress.executorId, Some(task.generation))
+          handleExecutorLost(bmAddress.executorId, Some(task.epoch))
         }
 
       case ExceptionFailure(className, description, stackTrace, metrics) =>
@@ -690,14 +705,14 @@ class DAGScheduler(
    * Responds to an executor being lost. This is called inside the event loop, so it assumes it can
    * modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
    *
-   * Optionally the generation during which the failure was caught can be passed to avoid allowing
+   * Optionally the epoch during which the failure was caught can be passed to avoid allowing
    * stray fetch failures from possibly retriggering the detection of a node as lost.
    */
-  private def handleExecutorLost(execId: String, maybeGeneration: Option[Long] = None) {
-    val currentGeneration = maybeGeneration.getOrElse(mapOutputTracker.getGeneration)
-    if (!failedGeneration.contains(execId) || failedGeneration(execId) < currentGeneration) {
-      failedGeneration(execId) = currentGeneration
-      logInfo("Executor lost: %s (generation %d)".format(execId, currentGeneration))
+  private def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) {
+    val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
+    if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
+      failedEpoch(execId) = currentEpoch
+      logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
       blockManagerMaster.removeExecutor(execId)
       // TODO: This will be really slow if we keep accumulating shuffle map stages
       for ((shuffleId, stage) <- shuffleToMapStage) {
@@ -706,20 +721,20 @@ class DAGScheduler(
         mapOutputTracker.registerMapOutputs(shuffleId, locs, true)
       }
       if (shuffleToMapStage.isEmpty) {
-        mapOutputTracker.incrementGeneration()
+        mapOutputTracker.incrementEpoch()
       }
       clearCacheLocs()
     } else {
       logDebug("Additional executor lost message for " + execId +
-               "(generation " + currentGeneration + ")")
+               "(epoch " + currentEpoch + ")")
     }
   }
 
-  private def handleExecutorGained(execId: String, hostPort: String) {
-    // remove from failedGeneration(execId) ?
-    if (failedGeneration.contains(execId)) {
-      logInfo("Host gained which was in lost list earlier: " + hostPort)
-      failedGeneration -= execId
+  private def handleExecutorGained(execId: String, host: String) {
+    // remove from failedEpoch(execId) ?
+    if (failedEpoch.contains(execId)) {
+      logInfo("Host gained which was in lost list earlier: " + host)
+      failedEpoch -= execId
     }
   }
 
@@ -774,16 +789,16 @@ class DAGScheduler(
     visitedRdds.contains(target.rdd)
   }
 
-  private def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = {
+  private def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
     // If the partition is cached, return the cache locations
     val cached = getCacheLocs(rdd)(partition)
-    if (cached != Nil) {
+    if (!cached.isEmpty) {
       return cached
     }
     // If the RDD has some placement preferences (as is the case for input RDDs), get those
     val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
-    if (rddPrefs != Nil) {
-      return rddPrefs
+    if (!rddPrefs.isEmpty) {
+      return rddPrefs.map(host => TaskLocation(host))
     }
     // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
     // that has any placement preferences. Ideally we would choose based on transfer sizes,
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
index 3b4ee6287aa20cffa7cd7277a7b4309087da9f87..b8ba0e92394c257aa5432253f1e9f6030ba141f1 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
@@ -54,9 +54,7 @@ private[spark] case class CompletionEvent(
     taskMetrics: TaskMetrics)
   extends DAGSchedulerEvent
 
-private[spark] case class ExecutorGained(execId: String, hostPort: String) extends DAGSchedulerEvent {
-  Utils.checkHostPort(hostPort, "Required hostport")
-}
+private[spark] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent
 
 private[spark] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
 
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index 832ca18b8ca3fd04a6f246107660c1ffd986d758..d066df5dc1dddfc288efc1deed9610bda6d30d9b 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -73,7 +73,7 @@ private[spark] class ResultTask[T, U](
     var rdd: RDD[T],
     var func: (TaskContext, Iterator[T]) => U,
     var partition: Int,
-    @transient locs: Seq[String],
+    @transient locs: Seq[TaskLocation],
     val outputId: Int)
   extends Task[U](stageId) with Externalizable {
 
@@ -85,11 +85,8 @@ private[spark] class ResultTask[T, U](
     rdd.partitions(partition)
   }
 
-  private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq
-
-  {
-    // DEBUG code
-    preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
+  @transient private val preferredLocs: Seq[TaskLocation] = {
+    if (locs == null) Nil else locs.toSet.toSeq
   }
 
   override def run(attemptId: Long): U = {
@@ -102,7 +99,7 @@ private[spark] class ResultTask[T, U](
     }
   }
 
-  override def preferredLocations: Seq[String] = preferredLocs
+  override def preferredLocations: Seq[TaskLocation] = preferredLocs
 
   override def toString = "ResultTask(" + stageId + ", " + partition + ")"
 
@@ -116,7 +113,7 @@ private[spark] class ResultTask[T, U](
       out.write(bytes)
       out.writeInt(partition)
       out.writeInt(outputId)
-      out.writeLong(generation)
+      out.writeLong(epoch)
       out.writeObject(split)
     }
   }
@@ -131,7 +128,7 @@ private[spark] class ResultTask[T, U](
     func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U]
     partition = in.readInt()
     val outputId = in.readInt()
-    generation = in.readLong()
+    epoch = in.readLong()
     split = in.readObject().asInstanceOf[Partition]
   }
 }
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 121ff31121d3ab6a1191fbe89f1b3ecac47d177d..f2a038576b722cce8c8d2737d7856eeef8091c32 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -88,18 +88,15 @@ private[spark] class ShuffleMapTask(
     var rdd: RDD[_],
     var dep: ShuffleDependency[_,_],
     var partition: Int,
-    @transient private var locs: Seq[String])
+    @transient private var locs: Seq[TaskLocation])
   extends Task[MapStatus](stageId)
   with Externalizable
   with Logging {
 
   protected def this() = this(0, null, null, 0, null)
 
-  @transient private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq
-
-  {
-    // DEBUG code
-    preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
+  @transient private val preferredLocs: Seq[TaskLocation] = {
+    if (locs == null) Nil else locs.toSet.toSeq
   }
 
   var split = if (rdd == null) null else rdd.partitions(partition)
@@ -112,7 +109,7 @@ private[spark] class ShuffleMapTask(
       out.writeInt(bytes.length)
       out.write(bytes)
       out.writeInt(partition)
-      out.writeLong(generation)
+      out.writeLong(epoch)
       out.writeObject(split)
     }
   }
@@ -126,7 +123,7 @@ private[spark] class ShuffleMapTask(
     rdd = rdd_
     dep = dep_
     partition = in.readInt()
-    generation = in.readLong()
+    epoch = in.readLong()
     split = in.readObject().asInstanceOf[Partition]
   }
 
@@ -186,7 +183,7 @@ private[spark] class ShuffleMapTask(
     }
   }
 
-  override def preferredLocations: Seq[String] = preferredLocs
+  override def preferredLocations: Seq[TaskLocation] = preferredLocs
 
   override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partition)
 }
diff --git a/core/src/main/scala/spark/scheduler/Task.scala b/core/src/main/scala/spark/scheduler/Task.scala
index 50768d43e02baf953d7e6773dc1fb1f9b877a498..0ab2ae6cfe9f655ea74420a693f1938c8352beea 100644
--- a/core/src/main/scala/spark/scheduler/Task.scala
+++ b/core/src/main/scala/spark/scheduler/Task.scala
@@ -30,9 +30,9 @@ import spark.executor.TaskMetrics
  */
 private[spark] abstract class Task[T](val stageId: Int) extends Serializable {
   def run(attemptId: Long): T
-  def preferredLocations: Seq[String] = Nil
+  def preferredLocations: Seq[TaskLocation] = Nil
 
-  var generation: Long = -1   // Map output tracker generation. Will be set by TaskScheduler.
+  var epoch: Long = -1   // Map output tracker epoch. Will be set by TaskScheduler.
 
   var metrics: Option[TaskMetrics] = None
 
diff --git a/core/src/main/scala/spark/scheduler/TaskLocation.scala b/core/src/main/scala/spark/scheduler/TaskLocation.scala
new file mode 100644
index 0000000000000000000000000000000000000000..fea117e956d454e7eaa1bea3aa2c7506a6423f08
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/TaskLocation.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.scheduler
+
+/**
+ * A location where a task should run. This can either be a host or a (host, executorID) pair.
+ * In the latter case, we will prefer to launch the task on that executorID, but our next level
+ * of preference will be executors on the same host if this is not possible.
+ */
+private[spark]
+class TaskLocation private (val host: String, val executorId: Option[String]) extends Serializable {
+  override def toString: String = "TaskLocation(" + host + ", " + executorId + ")"
+}
+
+private[spark] object TaskLocation {
+  def apply(host: String, executorId: String) = new TaskLocation(host, Some(executorId))
+
+  def apply(host: String) = new TaskLocation(host, None)
+}
diff --git a/core/src/main/scala/spark/scheduler/TaskResult.scala b/core/src/main/scala/spark/scheduler/TaskResult.scala
index 89793e0e8287839f62300488c0fe7ccdbcc25885..fc4856756b127d4cee32e34231c4b55365b15f54 100644
--- a/core/src/main/scala/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/spark/scheduler/TaskResult.scala
@@ -28,7 +28,9 @@ import java.nio.ByteBuffer
 // TODO: Use of distributed cache to return result is a hack to get around
 // what seems to be a bug with messages over 60KB in libprocess; fix it
 private[spark]
-class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics) extends Externalizable {
+class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
+  extends Externalizable
+{
   def this() = this(null.asInstanceOf[T], null, null)
 
   override def writeExternal(out: ObjectOutput) {
diff --git a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
index 2cdeb1c8c0c6ca874b8e79dcda9285cab04db280..64be50b2d02b8e13b2795024c933379f5312830c 100644
--- a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
+++ b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
@@ -35,7 +35,7 @@ private[spark] trait TaskSchedulerListener {
                 taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
 
   // A node was added to the cluster.
-  def executorGained(execId: String, hostPort: String): Unit
+  def executorGained(execId: String, host: String): Unit
 
   // A node was lost from the cluster.
   def executorLost(execId: String): Unit
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 96568e0d276cdb57311654b098d5b54b5af639b7..679d899b472a280805f29abcb901548ddef253c8 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -33,45 +33,27 @@ import java.util.{TimerTask, Timer}
 
 /**
  * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
- * start(), then submit task sets through the runTasks method.
+ * initialize() and start(), then submit task sets through the runTasks method.
+ *
+ * This class can work with multiple types of clusters by acting through a SchedulerBackend.
+ * It handles common logic, like determining a scheduling order across jobs, waking up to launch
+ * speculative tasks, etc.
+ *
+ * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
+ * threads, so it needs locks in public API methods to maintain its state. In addition, some
+ * SchedulerBackends sycnchronize on themselves when they want to send events here, and then
+ * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
+ * we are holding a lock on ourselves.
  */
 private[spark] class ClusterScheduler(val sc: SparkContext)
   extends TaskScheduler
-  with Logging {
-
+  with Logging
+{
   // How often to check for speculative tasks
   val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
+
   // Threshold above which we warn user initial TaskSet may be starved
   val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
-  // How often to revive offers in case there are pending tasks - that is how often to try to get
-  // tasks scheduled in case there are nodes available : default 0 is to disable it - to preserve existing behavior
-  // Note that this is required due to delayed scheduling due to data locality waits, etc.
-  // TODO: rename property ?
-  val TASK_REVIVAL_INTERVAL = System.getProperty("spark.tasks.revive.interval", "0").toLong
-
-  /*
-   This property controls how aggressive we should be to modulate waiting for node local task scheduling.
-   To elaborate, currently there is a time limit (3 sec def) to ensure that spark attempts to wait for node locality of tasks before
-   scheduling on other nodes. We have modified this in yarn branch such that offers to task set happen in prioritized order :
-   node-local, rack-local and then others
-   But once all available node local (and no pref) tasks are scheduled, instead of waiting for 3 sec before
-   scheduling to other nodes (which degrades performance for time sensitive tasks and on larger clusters), we can
-   modulate that : to also allow rack local nodes or any node. The default is still set to HOST - so that previous behavior is
-   maintained. This is to allow tuning the tension between pulling rdd data off node and scheduling computation asap.
-
-   TODO: rename property ? The value is one of
-   - NODE_LOCAL (default, no change w.r.t current behavior),
-   - RACK_LOCAL and
-   - ANY
-
-   Note that this property makes more sense when used in conjugation with spark.tasks.revive.interval > 0 : else it is not very effective.
-
-   Additional Note: For non trivial clusters, there is a 4x - 5x reduction in running time (in some of our experiments) based on whether
-   it is left at default NODE_LOCAL, RACK_LOCAL (if cluster is configured to be rack aware) or ANY.
-   If cluster is rack aware, then setting it to RACK_LOCAL gives best tradeoff and a 3x - 4x performance improvement while minimizing IO impact.
-   Also, it brings down the variance in running time drastically.
-    */
-  val TASK_SCHEDULING_AGGRESSION = TaskLocality.parse(System.getProperty("spark.tasks.schedule.aggression", "NODE_LOCAL"))
 
   val activeTaskSets = new HashMap[String, TaskSetManager]
 
@@ -89,16 +71,11 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
   // Which executor IDs we have executors on
   val activeExecutorIds = new HashSet[String]
 
-  // TODO: We might want to remove this and merge it with execId datastructures - but later.
-  // Which hosts in the cluster are alive (contains hostPort's) - used for process local and node local task locality.
-  private val hostPortsAlive = new HashSet[String]
-  private val hostToAliveHostPorts = new HashMap[String, HashSet[String]]
-
   // The set of executors we have on each host; this is used to compute hostsAlive, which
   // in turn is used to decide when we can attain data locality on a given host
-  private val executorsByHostPort = new HashMap[String, HashSet[String]]
+  private val executorsByHost = new HashMap[String, HashSet[String]]
 
-  private val executorIdToHostPort = new HashMap[String, String]
+  private val executorIdToHost = new HashMap[String, String]
 
   // JAR server, if any JARs were added by the user to the SparkContext
   var jarServer: HttpServer = null
@@ -136,23 +113,14 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       }
     }
     schedulableBuilder.buildPools()
-    // resolve executorId to hostPort mapping.
-    def executorToHostPort(executorId: String, defaultHostPort: String): String = {
-      executorIdToHostPort.getOrElse(executorId, defaultHostPort)
-    }
-
-    // Unfortunately, this means that SparkEnv is indirectly referencing ClusterScheduler
-    // Will that be a design violation ?
-    SparkEnv.get.executorIdToHostPort = Some(executorToHostPort)
   }
 
-
   def newTaskId(): Long = nextTaskId.getAndIncrement()
 
   override def start() {
     backend.start()
 
-    if (JBoolean.getBoolean("spark.speculation")) {
+    if (System.getProperty("spark.speculation", "false").toBoolean) {
       new Thread("ClusterScheduler speculation check") {
         setDaemon(true)
 
@@ -169,27 +137,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
         }
       }.start()
     }
-
-
-    // Change to always run with some default if TASK_REVIVAL_INTERVAL <= 0 ?
-    if (TASK_REVIVAL_INTERVAL > 0) {
-      new Thread("ClusterScheduler task offer revival check") {
-        setDaemon(true)
-
-        override def run() {
-          logInfo("Starting speculative task offer revival thread")
-          while (true) {
-            try {
-              Thread.sleep(TASK_REVIVAL_INTERVAL)
-            } catch {
-              case e: InterruptedException => {}
-            }
-
-            if (hasPendingTasks()) backend.reviveOffers()
-          }
-        }
-      }.start()
-    }
   }
 
   override def submitTasks(taskSet: TaskSet) {
@@ -201,7 +148,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
       taskSetTaskIds(taskSet.id) = new HashSet[Long]()
 
-      if (hasReceivedTask == false) {
+      if (!hasReceivedTask) {
         starvationTimer.scheduleAtFixedRate(new TimerTask() {
           override def run() {
             if (!hasLaunchedTask) {
@@ -214,7 +161,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
           }
         }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
       }
-      hasReceivedTask = true;
+      hasReceivedTask = true
     }
     backend.reviveOffers()
   }
@@ -235,172 +182,55 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
    * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
    * that tasks are balanced across the cluster.
    */
-  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = {
-    synchronized {
-      SparkEnv.set(sc.env)
-      // Mark each slave as alive and remember its hostname
-      for (o <- offers) {
-        // DEBUG Code
-        Utils.checkHostPort(o.hostPort)
-
-        executorIdToHostPort(o.executorId) = o.hostPort
-        if (! executorsByHostPort.contains(o.hostPort)) {
-          executorsByHostPort(o.hostPort) = new HashSet[String]()
-        }
-
-        hostPortsAlive += o.hostPort
-        hostToAliveHostPorts.getOrElseUpdate(Utils.parseHostPort(o.hostPort)._1, new HashSet[String]).add(o.hostPort)
-        executorGained(o.executorId, o.hostPort)
+  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
+    SparkEnv.set(sc.env)
+
+    // Mark each slave as alive and remember its hostname
+    for (o <- offers) {
+      executorIdToHost(o.executorId) = o.host
+      if (!executorsByHost.contains(o.host)) {
+        executorsByHost(o.host) = new HashSet[String]()
+        executorGained(o.executorId, o.host)
       }
-      // Build a list of tasks to assign to each slave
-      val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
-      // merge availableCpus into nodeToAvailableCpus block ?
-      val availableCpus = offers.map(o => o.cores).toArray
-      val nodeToAvailableCpus = {
-        val map = new HashMap[String, Int]()
-        for (offer <- offers) {
-          val hostPort = offer.hostPort
-          val cores = offer.cores
-          // DEBUG code
-          Utils.checkHostPort(hostPort)
-
-          val host = Utils.parseHostPort(hostPort)._1
-
-          map.put(host, map.getOrElse(host, 0) + cores)
-        }
-
-        map
-      }
-      var launchedTask = false
-      val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
-
-      for (manager <- sortedTaskSetQueue) {
-        logDebug("parentName:%s, name:%s, runningTasks:%s".format(
-          manager.parent.name, manager.name, manager.runningTasks))
-      }
-
-      for (manager <- sortedTaskSetQueue) {
+    }
 
-        // Split offers based on node local, rack local and off-rack tasks.
-        val processLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
-        val nodeLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
-        val rackLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
-        val otherOffers = new HashMap[String, ArrayBuffer[Int]]()
+    // Build a list of tasks to assign to each worker
+    val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
+    val availableCpus = offers.map(o => o.cores).toArray
+    val sortedTaskSets = rootPool.getSortedTaskSetQueue()
+    for (taskSet <- sortedTaskSets) {
+      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
+        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
+    }
 
+    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
+    // of locality levels so that it gets a chance to launch local tasks on all of them.
+    var launchedTask = false
+    for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
+      do {
+        launchedTask = false
         for (i <- 0 until offers.size) {
-          val hostPort = offers(i).hostPort
-          // DEBUG code
-          Utils.checkHostPort(hostPort)
-
-          val numProcessLocalTasks =  math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i)))
-          if (numProcessLocalTasks > 0){
-            val list = processLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int])
-            for (j <- 0 until numProcessLocalTasks) list += i
-          }
-
-          val host = Utils.parseHostPort(hostPort)._1
-          val numNodeLocalTasks =  math.max(0,
-            // Remove process local tasks (which are also host local btw !) from this
-            math.min(manager.numPendingTasksForHost(hostPort) - numProcessLocalTasks, nodeToAvailableCpus(host)))
-          if (numNodeLocalTasks > 0){
-            val list = nodeLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
-            for (j <- 0 until numNodeLocalTasks) list += i
-          }
-
-          val numRackLocalTasks =  math.max(0,
-            // Remove node local tasks (which are also rack local btw !) from this
-            math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numProcessLocalTasks - numNodeLocalTasks, nodeToAvailableCpus(host)))
-          if (numRackLocalTasks > 0){
-            val list = rackLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
-            for (j <- 0 until numRackLocalTasks) list += i
-          }
-          if (numNodeLocalTasks <= 0 && numRackLocalTasks <= 0){
-            // add to others list - spread even this across cluster.
-            val list = otherOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
-            list += i
+          val execId = offers(i).executorId
+          val host = offers(i).host
+          for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
+            tasks(i) += task
+            val tid = task.taskId
+            taskIdToTaskSetId(tid) = taskSet.taskSet.id
+            taskSetTaskIds(taskSet.taskSet.id) += tid
+            taskIdToExecutorId(tid) = execId
+            activeExecutorIds += execId
+            executorsByHost(host) += execId
+            availableCpus(i) -= 1
+            launchedTask = true
           }
         }
+      } while (launchedTask)
+    }
 
-        val offersPriorityList = new ArrayBuffer[Int](
-          processLocalOffers.size + nodeLocalOffers.size + rackLocalOffers.size + otherOffers.size)
-
-        // First process local, then host local, then rack, then others
-
-        // numNodeLocalOffers contains count of both process local and host offers.
-        val numNodeLocalOffers = {
-          val processLocalPriorityList = ClusterScheduler.prioritizeContainers(processLocalOffers)
-          offersPriorityList ++= processLocalPriorityList
-
-          val nodeLocalPriorityList = ClusterScheduler.prioritizeContainers(nodeLocalOffers)
-          offersPriorityList ++= nodeLocalPriorityList
-
-          processLocalPriorityList.size + nodeLocalPriorityList.size
-        }
-        val numRackLocalOffers = {
-          val rackLocalPriorityList = ClusterScheduler.prioritizeContainers(rackLocalOffers)
-          offersPriorityList ++= rackLocalPriorityList
-          rackLocalPriorityList.size
-        }
-        offersPriorityList ++= ClusterScheduler.prioritizeContainers(otherOffers)
-
-        var lastLoop = false
-        val lastLoopIndex = TASK_SCHEDULING_AGGRESSION match {
-          case TaskLocality.NODE_LOCAL => numNodeLocalOffers
-          case TaskLocality.RACK_LOCAL => numRackLocalOffers + numNodeLocalOffers
-          case TaskLocality.ANY => offersPriorityList.size
-        }
-
-        do {
-          launchedTask = false
-          var loopCount = 0
-          for (i <- offersPriorityList) {
-            val execId = offers(i).executorId
-            val hostPort = offers(i).hostPort
-
-            // If last loop and within the lastLoopIndex, expand scope - else use null (which will use default/existing)
-            val overrideLocality = if (lastLoop && loopCount < lastLoopIndex) TASK_SCHEDULING_AGGRESSION else null
-
-            // If last loop, override waiting for host locality - we scheduled all local tasks already and there might be more available ...
-            loopCount += 1
-
-            manager.slaveOffer(execId, hostPort, availableCpus(i), overrideLocality) match {
-              case Some(task) =>
-                tasks(i) += task
-                val tid = task.taskId
-                taskIdToTaskSetId(tid) = manager.taskSet.id
-                taskSetTaskIds(manager.taskSet.id) += tid
-                taskIdToExecutorId(tid) = execId
-                activeExecutorIds += execId
-                executorsByHostPort(hostPort) += execId
-                availableCpus(i) -= 1
-                launchedTask = true
-
-              case None => {}
-              }
-            }
-          // Loop once more - when lastLoop = true, then we try to schedule task on all nodes irrespective of
-          // data locality (we still go in order of priority : but that would not change anything since
-          // if data local tasks had been available, we would have scheduled them already)
-          if (lastLoop) {
-            // prevent more looping
-            launchedTask = false
-          } else if (!lastLoop && !launchedTask) {
-            // Do this only if TASK_SCHEDULING_AGGRESSION != NODE_LOCAL
-            if (TASK_SCHEDULING_AGGRESSION != TaskLocality.NODE_LOCAL) {
-              // fudge launchedTask to ensure we loop once more
-              launchedTask = true
-              // dont loop anymore
-              lastLoop = true
-            }
-          }
-        } while (launchedTask)
-      }
-
-      if (tasks.size > 0) {
-        hasLaunchedTask = true
-      }
-      return tasks
+    if (tasks.size > 0) {
+      hasLaunchedTask = true
     }
+    return tasks
   }
 
   def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
@@ -448,7 +278,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       backend.reviveOffers()
     }
     if (taskFailed) {
-
       // Also revive offers if a task had failed for some reason other than host lost
       backend.reviveOffers()
     }
@@ -503,7 +332,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
   }
 
   // Check for pending tasks in all our active jobs.
-  def hasPendingTasks(): Boolean = {
+  def hasPendingTasks: Boolean = {
     synchronized {
       rootPool.hasPendingTasks()
     }
@@ -514,7 +343,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 
     synchronized {
       if (activeExecutorIds.contains(executorId)) {
-        val hostPort = executorIdToHostPort(executorId)
+        val hostPort = executorIdToHost(executorId)
         logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
         removeExecutor(executorId)
         failedExecutor = Some(executorId)
@@ -536,88 +365,63 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
   /** Remove an executor from all our data structures and mark it as lost */
   private def removeExecutor(executorId: String) {
     activeExecutorIds -= executorId
-    val hostPort = executorIdToHostPort(executorId)
-    if (hostPortsAlive.contains(hostPort)) {
-      // DEBUG Code
-      Utils.checkHostPort(hostPort)
-
-      hostPortsAlive -= hostPort
-      hostToAliveHostPorts.getOrElseUpdate(Utils.parseHostPort(hostPort)._1, new HashSet[String]).remove(hostPort)
-    }
-
-    val execs = executorsByHostPort.getOrElse(hostPort, new HashSet)
+    val host = executorIdToHost(executorId)
+    val execs = executorsByHost.getOrElse(host, new HashSet)
     execs -= executorId
     if (execs.isEmpty) {
-      executorsByHostPort -= hostPort
+      executorsByHost -= host
     }
-    executorIdToHostPort -= executorId
-    rootPool.executorLost(executorId, hostPort)
+    executorIdToHost -= executorId
+    rootPool.executorLost(executorId, host)
   }
 
-  def executorGained(execId: String, hostPort: String) {
-    listener.executorGained(execId, hostPort)
+  def executorGained(execId: String, host: String) {
+    listener.executorGained(execId, host)
   }
 
-  def getExecutorsAliveOnHost(host: String): Option[Set[String]] = {
-    Utils.checkHost(host)
-
-    val retval = hostToAliveHostPorts.get(host)
-    if (retval.isDefined) {
-      return Some(retval.get.toSet)
-    }
+  def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {
+    executorsByHost.get(host).map(_.toSet)
+  }
 
-    None
+  def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
+    executorsByHost.contains(host)
   }
 
-  def isExecutorAliveOnHostPort(hostPort: String): Boolean = {
-    // Even if hostPort is a host, it does not matter - it is just a specific check.
-    // But we do have to ensure that only hostPort get into hostPortsAlive !
-    // So no check against Utils.checkHostPort
-    hostPortsAlive.contains(hostPort)
+  def isExecutorAlive(execId: String): Boolean = synchronized {
+    activeExecutorIds.contains(execId)
   }
 
   // By default, rack is unknown
   def getRackForHost(value: String): Option[String] = None
-
-  // By default, (cached) hosts for rack is unknown
-  def getCachedHostsForRack(rack: String): Option[Set[String]] = None
 }
 
 
 object ClusterScheduler {
-
-  // Used to 'spray' available containers across the available set to ensure too many containers on same host
-  // are not used up. Used in yarn mode and in task scheduling (when there are multiple containers available
-  // to execute a task)
-  // For example: yarn can returns more containers than we would have requested under ANY, this method
-  // prioritizes how to use the allocated containers.
-  // flatten the map such that the array buffer entries are spread out across the returned value.
-  // given <host, list[container]> == <h1, [c1 .. c5]>, <h2, [c1 .. c3]>, <h3, [c1, c2]>, <h4, c1>, <h5, c1>, i
-  // the return value would be something like : h1c1, h2c1, h3c1, h4c1, h5c1, h1c2, h2c2, h3c2, h1c3, h2c3, h1c4, h1c5
-  // We then 'use' the containers in this order (consuming only the top K from this list where
-  // K = number to be user). This is to ensure that if we have multiple eligible allocations,
-  // they dont end up allocating all containers on a small number of hosts - increasing probability of
-  // multiple container failure when a host goes down.
-  // Note, there is bias for keys with higher number of entries in value to be picked first (by design)
-  // Also note that invocation of this method is expected to have containers of same 'type'
-  // (host-local, rack-local, off-rack) and not across types : so that reordering is simply better from
-  // the available list - everything else being same.
-  // That is, we we first consume data local, then rack local and finally off rack nodes. So the
-  // prioritization from this method applies to within each category
+  /**
+   * Used to balance containers across hosts.
+   *
+   * Accepts a map of hosts to resource offers for that host, and returns a prioritized list of
+   * resource offers representing the order in which the offers should be used.  The resource
+   * offers are ordered such that we'll allocate one container on each host before allocating a
+   * second container on any host, and so on, in order to reduce the damage if a host fails.
+   *
+   * For example, given <h1, [o1, o2, o3]>, <h2, [o4]>, <h1, [o5, o6]>, returns
+   * [o1, o5, o4, 02, o6, o3]
+   */
   def prioritizeContainers[K, T] (map: HashMap[K, ArrayBuffer[T]]): List[T] = {
     val _keyList = new ArrayBuffer[K](map.size)
     _keyList ++= map.keys
 
     // order keyList based on population of value in map
     val keyList = _keyList.sortWith(
-      (left, right) => map.get(left).getOrElse(Set()).size > map.get(right).getOrElse(Set()).size
+      (left, right) => map(left).size > map(right).size
     )
 
     val retval = new ArrayBuffer[T](keyList.size * 2)
     var index = 0
     var found = true
 
-    while (found){
+    while (found) {
       found = false
       for (key <- keyList) {
         val containerList: ArrayBuffer[T] = map.get(key).getOrElse(null)
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 7f855cd345b7f6ab4ea600d0c9aed39f86574f87..a4d6880abb55d7b6d2c35c98fb6574a31c45439c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -29,60 +29,33 @@ import scala.math.min
 import spark.{FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskState, Utils}
 import spark.{ExceptionFailure, SparkException, TaskResultTooBigFailure}
 import spark.TaskState.TaskState
-import spark.scheduler.{ShuffleMapTask, Task, TaskResult, TaskSet}
+import spark.scheduler._
+import scala.Some
+import spark.FetchFailed
+import spark.ExceptionFailure
+import spark.TaskResultTooBigFailure
+import spark.util.{SystemClock, Clock}
 
 
-private[spark] object TaskLocality
-  extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") with Logging {
-
-  // process local is expected to be used ONLY within tasksetmanager for now.
-  val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
-
-  type TaskLocality = Value
-
-  def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
-
-    // Must not be the constraint.
-    assert (constraint != TaskLocality.PROCESS_LOCAL)
-
-    constraint match {
-      case TaskLocality.NODE_LOCAL =>
-        condition == TaskLocality.NODE_LOCAL
-      case TaskLocality.RACK_LOCAL =>
-        condition == TaskLocality.NODE_LOCAL || condition == TaskLocality.RACK_LOCAL
-      // For anything else, allow
-      case _ => true
-    }
-  }
-
-  def parse(str: String): TaskLocality = {
-    // better way to do this ?
-    try {
-      val retval = TaskLocality.withName(str)
-      // Must not specify PROCESS_LOCAL !
-      assert (retval != TaskLocality.PROCESS_LOCAL)
-      retval
-    } catch {
-      case nEx: NoSuchElementException => {
-        logWarning("Invalid task locality specified '" + str + "', defaulting to NODE_LOCAL")
-        // default to preserve earlier behavior
-        NODE_LOCAL
-      }
-    }
-  }
-}
-
 /**
- * Schedules the tasks within a single TaskSet in the ClusterScheduler.
+ * Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of
+ * the status of each task, retries tasks if they fail (up to a limited number of times), and
+ * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
+ * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
+ * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
+ *
+ * THREADING: This class is designed to only be called from code with a lock on the
+ * ClusterScheduler (e.g. its event handlers). It should not be called from other threads.
  */
-private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet: TaskSet)
-  extends TaskSetManager with Logging {
-
-  // Maximum time to wait to run a task in a preferred location (in ms)
-  val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
-
+private[spark] class ClusterTaskSetManager(
+    sched: ClusterScheduler,
+    val taskSet: TaskSet,
+    clock: Clock = SystemClock)
+  extends TaskSetManager
+  with Logging
+{
   // CPUs to request per task
-  val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
+  val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
 
   // Maximum times a task is allowed to fail before failing the job
   val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
@@ -110,31 +83,27 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
   var stageId = taskSet.stageId
   var name = "TaskSet_"+taskSet.stageId.toString
   var parent: Schedulable = null
-  // Last time when we launched a preferred task (for delay scheduling)
-  var lastPreferredLaunchTime = System.currentTimeMillis
 
-  // List of pending tasks for each node (process local to container).
-  // These collections are actually
+  // Set of pending tasks for each executor. These collections are actually
   // treated as stacks, in which new tasks are added to the end of the
   // ArrayBuffer and removed from the end. This makes it faster to detect
   // tasks that repeatedly fail because whenever a task failed, it is put
   // back at the head of the stack. They are also only cleaned up lazily;
   // when a task is launched, it remains in all the pending lists except
   // the one that it was launched from, but gets removed from them later.
-  private val pendingTasksForHostPort = new HashMap[String, ArrayBuffer[Int]]
+  private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
 
-  // List of pending tasks for each node.
-  // Essentially, similar to pendingTasksForHostPort, except at host level
+  // Set of pending tasks for each host. Similar to pendingTasksForExecutor,
+  // but at host level.
   private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
 
-  // List of pending tasks for each node based on rack locality.
-  // Essentially, similar to pendingTasksForHost, except at rack level
-  private val pendingRackLocalTasksForHost = new HashMap[String, ArrayBuffer[Int]]
+  // Set of pending tasks for each rack -- similar to the above.
+  private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
 
-  // List containing pending tasks with no locality preferences
+  // Set containing pending tasks with no locality preferences.
   val pendingTasksWithNoPrefs = new ArrayBuffer[Int]
 
-  // List containing all pending tasks (also used as a stack, as above)
+  // Set containing all pending tasks (also used as a stack, as above).
   val allPendingTasks = new ArrayBuffer[Int]
 
   // Tasks that can be speculated. Since these will be a small fraction of total
@@ -144,25 +113,24 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
   // Task index, start and finish time for each task attempt (indexed by task ID)
   val taskInfos = new HashMap[Long, TaskInfo]
 
-  // Did the job fail?
+  // Did the TaskSet fail?
   var failed = false
   var causeOfFailure = ""
 
   // How frequently to reprint duplicate exceptions in full, in milliseconds
   val EXCEPTION_PRINT_INTERVAL =
     System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
-  // Map of recent exceptions (identified by string representation and
-  // top stack frame) to duplicate count (how many times the same
-  // exception has appeared) and time the full exception was
-  // printed. This should ideally be an LRU map that can drop old
-  // exceptions automatically.
+
+  // Map of recent exceptions (identified by string representation and top stack frame) to
+  // duplicate count (how many times the same exception has appeared) and time the full exception
+  // was printed. This should ideally be an LRU map that can drop old exceptions automatically.
   val recentExceptions = HashMap[String, (Int, Long)]()
 
-  // Figure out the current map output tracker generation and set it on all tasks
-  val generation = sched.mapOutputTracker.getGeneration
-  logDebug("Generation for " + taskSet.id + ": " + generation)
+  // Figure out the current map output tracker epoch and set it on all tasks
+  val epoch = sched.mapOutputTracker.getEpoch
+  logDebug("Epoch for " + taskSet + ": " + epoch)
   for (t <- tasks) {
-    t.generation = generation
+    t.epoch = epoch
   }
 
   // Add all our tasks to the pending lists. We do this in reverse order
@@ -171,166 +139,86 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
     addPendingTask(i)
   }
 
-  // Note that it follows the hierarchy.
-  // if we search for NODE_LOCAL, the output will include PROCESS_LOCAL and
-  // if we search for RACK_LOCAL, it will include PROCESS_LOCAL & NODE_LOCAL
-  private def findPreferredLocations(
-      _taskPreferredLocations: Seq[String],
-      scheduler: ClusterScheduler,
-      taskLocality: TaskLocality.TaskLocality): HashSet[String] =
-  {
-    if (TaskLocality.PROCESS_LOCAL == taskLocality) {
-      // straight forward comparison ! Special case it.
-      val retval = new HashSet[String]()
-      scheduler.synchronized {
-        for (location <- _taskPreferredLocations) {
-          if (scheduler.isExecutorAliveOnHostPort(location)) {
-            retval += location
-          }
-        }
-      }
-
-      return retval
-    }
-
-    val taskPreferredLocations = {
-      if (TaskLocality.NODE_LOCAL == taskLocality) {
-        _taskPreferredLocations
-      } else {
-        assert (TaskLocality.RACK_LOCAL == taskLocality)
-        // Expand set to include all 'seen' rack local hosts.
-        // This works since container allocation/management happens within master -
-        //  so any rack locality information is updated in msater.
-        // Best case effort, and maybe sort of kludge for now ... rework it later ?
-        val hosts = new HashSet[String]
-        _taskPreferredLocations.foreach(h => {
-          val rackOpt = scheduler.getRackForHost(h)
-          if (rackOpt.isDefined) {
-            val hostsOpt = scheduler.getCachedHostsForRack(rackOpt.get)
-            if (hostsOpt.isDefined) {
-              hosts ++= hostsOpt.get
-            }
-          }
+  // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
+  val myLocalityLevels = computeValidLocalityLevels()
+  val localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
 
-          // Ensure that irrespective of what scheduler says, host is always added !
-          hosts += h
-        })
+  // Delay scheduling variables: we keep track of our current locality level and the time we
+  // last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
+  // We then move down if we manage to launch a "more local" task.
+  var currentLocalityIndex = 0    // Index of our current locality level in validLocalityLevels
+  var lastLaunchTime = clock.getTime()  // Time we last launched a task at this level
 
-        hosts
+  /**
+   * Add a task to all the pending-task lists that it should be on. If readding is set, we are
+   * re-adding the task so only include it in each list if it's not already there.
+   */
+  private def addPendingTask(index: Int, readding: Boolean = false) {
+    // Utility method that adds `index` to a list only if readding=false or it's not already there
+    def addTo(list: ArrayBuffer[Int]) {
+      if (!readding || !list.contains(index)) {
+        list += index
       }
     }
 
-    val retval = new HashSet[String]
-    scheduler.synchronized {
-      for (prefLocation <- taskPreferredLocations) {
-        val aliveLocationsOpt = scheduler.getExecutorsAliveOnHost(Utils.parseHostPort(prefLocation)._1)
-        if (aliveLocationsOpt.isDefined) {
-          retval ++= aliveLocationsOpt.get
+    var hadAliveLocations = false
+    for (loc <- tasks(index).preferredLocations) {
+      for (execId <- loc.executorId) {
+        if (sched.isExecutorAlive(execId)) {
+          addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
+          hadAliveLocations = true
         }
       }
-    }
-
-    retval
-  }
-
-  // Add a task to all the pending-task lists that it should be on.
-  private def addPendingTask(index: Int) {
-    // We can infer hostLocalLocations from rackLocalLocations by joining it against
-    // tasks(index).preferredLocations (with appropriate hostPort <-> host conversion).
-    // But not doing it for simplicity sake. If this becomes a performance issue, modify it.
-    val locs = tasks(index).preferredLocations
-    val processLocalLocations = findPreferredLocations(locs, sched, TaskLocality.PROCESS_LOCAL)
-    val hostLocalLocations = findPreferredLocations(locs, sched, TaskLocality.NODE_LOCAL)
-    val rackLocalLocations = findPreferredLocations(locs, sched, TaskLocality.RACK_LOCAL)
-
-    if (rackLocalLocations.size == 0) {
-      // Current impl ensures this.
-      assert (processLocalLocations.size == 0)
-      assert (hostLocalLocations.size == 0)
-      pendingTasksWithNoPrefs += index
-    } else {
-
-      // process local locality
-      for (hostPort <- processLocalLocations) {
-        // DEBUG Code
-        Utils.checkHostPort(hostPort)
-
-        val hostPortList = pendingTasksForHostPort.getOrElseUpdate(hostPort, ArrayBuffer())
-        hostPortList += index
-      }
-
-      // host locality (includes process local)
-      for (hostPort <- hostLocalLocations) {
-        // DEBUG Code
-        Utils.checkHostPort(hostPort)
-
-        val host = Utils.parseHostPort(hostPort)._1
-        val hostList = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer())
-        hostList += index
+      if (sched.hasExecutorsAliveOnHost(loc.host)) {
+        addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
+        for (rack <- sched.getRackForHost(loc.host)) {
+          addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
+        }
+        hadAliveLocations = true
       }
+    }
 
-      // rack locality (includes process local and host local)
-      for (rackLocalHostPort <- rackLocalLocations) {
-        // DEBUG Code
-        Utils.checkHostPort(rackLocalHostPort)
-
-        val rackLocalHost = Utils.parseHostPort(rackLocalHostPort)._1
-        val list = pendingRackLocalTasksForHost.getOrElseUpdate(rackLocalHost, ArrayBuffer())
-        list += index
-      }
+    if (!hadAliveLocations) {
+      // Even though the task might've had preferred locations, all of those hosts or executors
+      // are dead; put it in the no-prefs list so we can schedule it elsewhere right away.
+      addTo(pendingTasksWithNoPrefs)
     }
 
-    allPendingTasks += index
+    if (!readding) {
+      allPendingTasks += index  // No point scanning this whole list to find the old task there
+    }
   }
 
-  // Return the pending tasks list for a given host port (process local), or an empty list if
-  // there is no map entry for that host
-  private def getPendingTasksForHostPort(hostPort: String): ArrayBuffer[Int] = {
-    // DEBUG Code
-    Utils.checkHostPort(hostPort)
-    pendingTasksForHostPort.getOrElse(hostPort, ArrayBuffer())
+  /**
+   * Return the pending tasks list for a given executor ID, or an empty list if
+   * there is no map entry for that host
+   */
+  private def getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int] = {
+    pendingTasksForExecutor.getOrElse(executorId, ArrayBuffer())
   }
 
-  // Return the pending tasks list for a given host, or an empty list if
-  // there is no map entry for that host
-  private def getPendingTasksForHost(hostPort: String): ArrayBuffer[Int] = {
-    val host = Utils.parseHostPort(hostPort)._1
+  /**
+   * Return the pending tasks list for a given host, or an empty list if
+   * there is no map entry for that host
+   */
+  private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
     pendingTasksForHost.getOrElse(host, ArrayBuffer())
   }
 
-  // Return the pending tasks (rack level) list for a given host, or an empty list if
-  // there is no map entry for that host
-  private def getRackLocalPendingTasksForHost(hostPort: String): ArrayBuffer[Int] = {
-    val host = Utils.parseHostPort(hostPort)._1
-    pendingRackLocalTasksForHost.getOrElse(host, ArrayBuffer())
-  }
-
-  // Number of pending tasks for a given host Port (which would be process local)
-  override def numPendingTasksForHostPort(hostPort: String): Int = {
-    getPendingTasksForHostPort(hostPort).count { index =>
-      copiesRunning(index) == 0 && !finished(index)
-    }
-  }
-
-  // Number of pending tasks for a given host (which would be data local)
-  override def numPendingTasksForHost(hostPort: String): Int = {
-    getPendingTasksForHost(hostPort).count { index =>
-      copiesRunning(index) == 0 && !finished(index)
-    }
-  }
-
-  // Number of pending rack local tasks for a given host
-  override def numRackLocalPendingTasksForHost(hostPort: String): Int = {
-    getRackLocalPendingTasksForHost(hostPort).count { index =>
-      copiesRunning(index) == 0 && !finished(index)
-    }
+  /**
+   * Return the pending rack-local task list for a given rack, or an empty list if
+   * there is no map entry for that rack
+   */
+  private def getPendingTasksForRack(rack: String): ArrayBuffer[Int] = {
+    pendingTasksForRack.getOrElse(rack, ArrayBuffer())
   }
 
-
-  // Dequeue a pending task from the given list and return its index.
-  // Return None if the list is empty.
-  // This method also cleans up any tasks in the list that have already
-  // been launched, since we want that to happen lazily.
+  /**
+   * Dequeue a pending task from the given list and return its index.
+   * Return None if the list is empty.
+   * This method also cleans up any tasks in the list that have already
+   * been launched, since we want that to happen lazily.
+   */
   private def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
     while (!list.isEmpty) {
       val index = list.last
@@ -342,191 +230,158 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
     return None
   }
 
-  // Return a speculative task for a given host if any are available. The task should not have an
-  // attempt running on this host, in case the host is slow. In addition, if locality is set, the
-  // task must have a preference for this host/rack/no preferred locations at all.
-  private def findSpeculativeTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
+  /** Check whether a task is currently running an attempt on a given host */
+  private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = {
+    !taskAttempts(taskIndex).exists(_.host == host)
+  }
 
-    assert (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL))
+  /**
+   * Return a speculative task for a given executor if any are available. The task should not have
+   * an attempt running on this host, in case the host is slow. In addition, the task should meet
+   * the given locality constraint.
+   */
+  private def findSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
+    : Option[(Int, TaskLocality.Value)] =
+  {
     speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set
 
-    if (speculatableTasks.size > 0) {
-      val localTask = speculatableTasks.find { index =>
-        val locations = findPreferredLocations(tasks(index).preferredLocations, sched,
-          TaskLocality.NODE_LOCAL)
-        val attemptLocs = taskAttempts(index).map(_.hostPort)
-        (locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort)
+    if (!speculatableTasks.isEmpty) {
+      // Check for process-local or preference-less tasks; note that tasks can be process-local
+      // on multiple nodes when we replicate cached blocks, as in Spark Streaming
+      for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+        val prefs = tasks(index).preferredLocations
+        val executors = prefs.flatMap(_.executorId)
+        if (prefs.size == 0 || executors.contains(execId)) {
+          speculatableTasks -= index
+          return Some((index, TaskLocality.PROCESS_LOCAL))
+        }
       }
 
-      if (localTask != None) {
-        speculatableTasks -= localTask.get
-        return localTask
+      // Check for node-local tasks
+      if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
+        for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+          val locations = tasks(index).preferredLocations.map(_.host)
+          if (locations.contains(host)) {
+            speculatableTasks -= index
+            return Some((index, TaskLocality.NODE_LOCAL))
+          }
+        }
       }
 
-      // check for rack locality
+      // Check for rack-local tasks
       if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
-        val rackTask = speculatableTasks.find { index =>
-          val locations = findPreferredLocations(tasks(index).preferredLocations, sched,
-            TaskLocality.RACK_LOCAL)
-          val attemptLocs = taskAttempts(index).map(_.hostPort)
-          locations.contains(hostPort) && !attemptLocs.contains(hostPort)
-        }
-
-        if (rackTask != None) {
-          speculatableTasks -= rackTask.get
-          return rackTask
+        for (rack <- sched.getRackForHost(host)) {
+          for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+            val racks = tasks(index).preferredLocations.map(_.host).map(sched.getRackForHost)
+            if (racks.contains(rack)) {
+              speculatableTasks -= index
+              return Some((index, TaskLocality.RACK_LOCAL))
+            }
+          }
         }
       }
 
-      // Any task ...
+      // Check for non-local tasks
       if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
-        // Check for attemptLocs also ?
-        val nonLocalTask = speculatableTasks.find { i =>
-          !taskAttempts(i).map(_.hostPort).contains(hostPort)
-        }
-        if (nonLocalTask != None) {
-          speculatableTasks -= nonLocalTask.get
-          return nonLocalTask
+        for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+          speculatableTasks -= index
+          return Some((index, TaskLocality.ANY))
         }
       }
     }
+
     return None
   }
 
-  // Dequeue a pending task for a given node and return its index.
-  // If localOnly is set to false, allow non-local tasks as well.
-  private def findTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
-    val processLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort))
-    if (processLocalTask != None) {
-      return processLocalTask
+  /**
+   * Dequeue a pending task for a given node and return its index and locality level.
+   * Only search for tasks matching the given locality constraint.
+   */
+  private def findTask(execId: String, host: String, locality: TaskLocality.Value)
+    : Option[(Int, TaskLocality.Value)] =
+  {
+    for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) {
+      return Some((index, TaskLocality.PROCESS_LOCAL))
     }
 
-    val localTask = findTaskFromList(getPendingTasksForHost(hostPort))
-    if (localTask != None) {
-      return localTask
+    if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
+      for (index <- findTaskFromList(getPendingTasksForHost(host))) {
+        return Some((index, TaskLocality.NODE_LOCAL))
+      }
     }
 
     if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
-      val rackLocalTask = findTaskFromList(getRackLocalPendingTasksForHost(hostPort))
-      if (rackLocalTask != None) {
-        return rackLocalTask
+      for {
+        rack <- sched.getRackForHost(host)
+        index <- findTaskFromList(getPendingTasksForRack(rack))
+      } {
+        return Some((index, TaskLocality.RACK_LOCAL))
       }
     }
 
-    // Look for no pref tasks AFTER rack local tasks - this has side effect that we will get to
-    // failed tasks later rather than sooner.
-    // TODO: That code path needs to be revisited (adding to no prefs list when host:port goes down).
-    val noPrefTask = findTaskFromList(pendingTasksWithNoPrefs)
-    if (noPrefTask != None) {
-      return noPrefTask
+    // Look for no-pref tasks after rack-local tasks since they can run anywhere.
+    for (index <- findTaskFromList(pendingTasksWithNoPrefs)) {
+      return Some((index, TaskLocality.PROCESS_LOCAL))
     }
 
     if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
-      val nonLocalTask = findTaskFromList(allPendingTasks)
-      if (nonLocalTask != None) {
-        return nonLocalTask
+      for (index <- findTaskFromList(allPendingTasks)) {
+        return Some((index, TaskLocality.ANY))
       }
     }
 
     // Finally, if all else has failed, find a speculative task
-    return findSpeculativeTask(hostPort, locality)
-  }
-
-  private def isProcessLocalLocation(task: Task[_], hostPort: String): Boolean = {
-    Utils.checkHostPort(hostPort)
-
-    val locs = task.preferredLocations
-
-    locs.contains(hostPort)
-  }
-
-  private def isHostLocalLocation(task: Task[_], hostPort: String): Boolean = {
-    val locs = task.preferredLocations
-
-    // If no preference, consider it as host local
-    if (locs.isEmpty) return true
-
-    val host = Utils.parseHostPort(hostPort)._1
-    locs.find(h => Utils.parseHostPort(h)._1 == host).isDefined
-  }
-
-  // Does a host count as a rack local preferred location for a task?
-  // (assumes host is NOT preferred location).
-  // This is true if either the task has preferred locations and this host is one, or it has
-  // no preferred locations (in which we still count the launch as preferred).
-  private def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = {
-
-    val locs = task.preferredLocations
-
-    val preferredRacks = new HashSet[String]()
-    for (preferredHost <- locs) {
-      val rack = sched.getRackForHost(preferredHost)
-      if (None != rack) preferredRacks += rack.get
-    }
-
-    if (preferredRacks.isEmpty) return false
-
-    val hostRack = sched.getRackForHost(hostPort)
-
-    return None != hostRack && preferredRacks.contains(hostRack.get)
+    return findSpeculativeTask(execId, host, locality)
   }
 
-  // Respond to an offer of a single slave from the scheduler by finding a task
-  override def slaveOffer(
+  /**
+   * Respond to an offer of a single slave from the scheduler by finding a task
+   */
+  override def resourceOffer(
       execId: String,
-      hostPort: String,
-      availableCpus: Double,
-      overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] =
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
+    : Option[TaskDescription] =
   {
     if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
-      // If explicitly specified, use that
-      val locality = if (overrideLocality != null) overrideLocality else {
-        // expand only if we have waited for more than LOCALITY_WAIT for a host local task ...
-        val time = System.currentTimeMillis
-        if (time - lastPreferredLaunchTime < LOCALITY_WAIT) {
-          TaskLocality.NODE_LOCAL
-        } else {
-          TaskLocality.ANY
-        }
+      val curTime = clock.getTime()
+
+      var allowedLocality = getAllowedLocalityLevel(curTime)
+      if (allowedLocality > maxLocality) {
+        allowedLocality = maxLocality   // We're not allowed to search for farther-away tasks
       }
 
-      findTask(hostPort, locality) match {
-        case Some(index) => {
-          // Found a task; do some bookkeeping and return a Mesos task for it
+      findTask(execId, host, allowedLocality) match {
+        case Some((index, taskLocality)) => {
+          // Found a task; do some bookkeeping and return a task description
           val task = tasks(index)
           val taskId = sched.newTaskId()
           // Figure out whether this should count as a preferred launch
-          val taskLocality =
-            if (isProcessLocalLocation(task, hostPort)) TaskLocality.PROCESS_LOCAL
-            else if (isHostLocalLocation(task, hostPort)) TaskLocality.NODE_LOCAL
-            else if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL
-            else  TaskLocality.ANY
-          val prefStr = taskLocality.toString
           logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format(
-            taskSet.id, index, taskId, execId, hostPort, prefStr))
+            taskSet.id, index, taskId, execId, host, taskLocality))
           // Do various bookkeeping
           copiesRunning(index) += 1
-          val time = System.currentTimeMillis
-          val info = new TaskInfo(taskId, index, time, execId, hostPort, taskLocality)
+          val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)
           taskInfos(taskId) = info
           taskAttempts(index) = info :: taskAttempts(index)
-          if (taskLocality == TaskLocality.PROCESS_LOCAL || taskLocality == TaskLocality.NODE_LOCAL) {
-            lastPreferredLaunchTime = time
-          }
+          // Update our locality level for delay scheduling
+          currentLocalityIndex = getLocalityIndex(taskLocality)
+          lastLaunchTime = curTime
           // Serialize and return the task
-          val startTime = System.currentTimeMillis
+          val startTime = clock.getTime()
           // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
           // we assume the task can be serialized without exceptions.
           val serializedTask = Task.serializeWithDependencies(
             task, sched.sc.addedFiles, sched.sc.addedJars, ser)
-          val timeTaken = System.currentTimeMillis - startTime
+          val timeTaken = clock.getTime() - startTime
           increaseRunningTasks(1)
           logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
             taskSet.id, index, serializedTask.limit, timeTaken))
           val taskName = "task %s:%d".format(taskSet.id, index)
           if (taskAttempts(index).size == 1)
             taskStarted(task,info)
-          return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
+          return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
         }
         case _ =>
       }
@@ -534,6 +389,35 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
     return None
   }
 
+  /**
+   * Get the level we can launch tasks according to delay scheduling, based on current wait time.
+   */
+  private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
+    while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&
+        currentLocalityIndex < myLocalityLevels.length - 1)
+    {
+      // Jump to the next locality level, and remove our waiting time for the current one since
+      // we don't want to count it again on the next one
+      lastLaunchTime += localityWaits(currentLocalityIndex)
+      currentLocalityIndex += 1
+    }
+    myLocalityLevels(currentLocalityIndex)
+  }
+
+  /**
+   * Find the index in myLocalityLevels for a given locality. This is also designed to work with
+   * localities that are not in myLocalityLevels (in case we somehow get those) by returning the
+   * next-biggest level we have. Uses the fact that the last value in myLocalityLevels is ANY.
+   */
+  def getLocalityIndex(locality: TaskLocality.TaskLocality): Int = {
+    var index = 0
+    while (locality > myLocalityLevels(index)) {
+      index += 1
+    }
+    index
+  }
+
+  /** Called by cluster scheduler when one of our tasks changes state */
   override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
     SparkEnv.set(env)
     state match {
@@ -566,7 +450,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
     if (!finished(index)) {
       tasksFinished += 1
       logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
-        tid, info.duration, info.hostPort, tasksFinished, numTasks))
+        tid, info.duration, info.host, tasksFinished, numTasks))
       // Deserialize task result and pass it to the scheduler
       try {
         val result = ser.deserialize[TaskResult[_]](serializedData)
@@ -626,7 +510,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
           case ef: ExceptionFailure =>
             sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
             val key = ef.description
-            val now = System.currentTimeMillis
+            val now = clock.getTime()
             val (printFull, dupCount) = {
               if (recentExceptions.contains(key)) {
                 val (dupCount, printTime) = recentExceptions(key)
@@ -698,44 +582,33 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
     }
   }
 
-  // TODO(xiajunluan): for now we just find Pool not TaskSetManager
-  // we can extend this function in future if needed
   override def getSchedulableByName(name: String): Schedulable = {
     return null
   }
 
-  override def addSchedulable(schedulable:Schedulable) {
-    // nothing
-  }
+  override def addSchedulable(schedulable: Schedulable) {}
 
-  override def removeSchedulable(schedulable:Schedulable) {
-    // nothing
-  }
+  override def removeSchedulable(schedulable: Schedulable) {}
 
   override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
-    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
+    var sortedTaskSetQueue = ArrayBuffer[TaskSetManager](this)
     sortedTaskSetQueue += this
     return sortedTaskSetQueue
   }
 
-  override def executorLost(execId: String, hostPort: String) {
+  /** Called by cluster scheduler when an executor is lost so we can re-enqueue our tasks */
+  override def executorLost(execId: String, host: String) {
     logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
 
-    // If some task has preferred locations only on hostname, and there are no more executors there,
-    // put it in the no-prefs list to avoid the wait from delay scheduling
-
-    // host local tasks - should we push this to rack local or no pref list ? For now, preserving
-    // behavior and moving to no prefs list. Note, this was done due to impliations related to
-    // 'waiting' for data local tasks, etc.
-    // Note: NOT checking process local list - since host local list is super set of that. We need
-    // to ad to no prefs only if there is no host local node for the task (not if there is no
-    // process local node for the task)
-    for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) {
-      val newLocs = findPreferredLocations(
-        tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
-      if (newLocs.isEmpty) {
-        pendingTasksWithNoPrefs += index
-      }
+    // Re-enqueue pending tasks for this host based on the status of the cluster -- for example, a
+    // task that used to have locations on only this host might now go to the no-prefs list. Note
+    // that it's okay if we add a task to the same queue twice (if it had multiple preferred
+    // locations), because findTaskFromList will skip already-running tasks.
+    for (index <- getPendingTasksForExecutor(execId)) {
+      addPendingTask(index, readding=true)
+    }
+    for (index <- getPendingTasksForHost(host)) {
+      addPendingTask(index, readding=true)
     }
 
     // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
@@ -775,7 +648,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
     val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
     logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
     if (tasksFinished >= minFinishedForSpeculation) {
-      val time = System.currentTimeMillis()
+      val time = clock.getTime()
       val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
       Arrays.sort(durations)
       val medianDuration = durations(min((0.5 * numTasks).round.toInt, durations.size - 1))
@@ -789,7 +662,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
           !speculatableTasks.contains(index)) {
           logInfo(
             "Marking task %s:%d (on %s) as speculatable because it ran more than %.0f ms".format(
-              taskSet.id, index, info.hostPort, threshold))
+              taskSet.id, index, info.host, threshold))
           speculatableTasks += index
           foundTasks = true
         }
@@ -801,4 +674,39 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
   override def hasPendingTasks(): Boolean = {
     numTasks > 0 && tasksFinished < numTasks
   }
+
+  private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
+    val defaultWait = System.getProperty("spark.locality.wait", "3000")
+    level match {
+      case TaskLocality.PROCESS_LOCAL =>
+        System.getProperty("spark.locality.wait.process", defaultWait).toLong
+      case TaskLocality.NODE_LOCAL =>
+        System.getProperty("spark.locality.wait.node", defaultWait).toLong
+      case TaskLocality.RACK_LOCAL =>
+        System.getProperty("spark.locality.wait.rack", defaultWait).toLong
+      case TaskLocality.ANY =>
+        0L
+    }
+  }
+
+  /**
+   * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
+   * added to queues using addPendingTask.
+   */
+  private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
+    import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
+    val levels = new ArrayBuffer[TaskLocality.TaskLocality]
+    if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0) {
+      levels += PROCESS_LOCAL
+    }
+    if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0) {
+      levels += NODE_LOCAL
+    }
+    if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) {
+      levels += RACK_LOCAL
+    }
+    levels += ANY
+    logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
+    levels.toArray
+  }
 }
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 075a7cbf7e2d6c01b7d5b509960bdc22eead7f80..3203be10294d375d860e0e14ed24912a2edb31d0 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -26,6 +26,7 @@ import akka.dispatch.Await
 import akka.pattern.ask
 import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
 import akka.util.Duration
+import akka.util.duration._
 
 import spark.{Utils, SparkException, Logging, TaskState}
 import spark.scheduler.cluster.StandaloneClusterMessages._
@@ -37,15 +38,15 @@ import spark.scheduler.cluster.StandaloneClusterMessages._
  */
 private[spark]
 class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
-  extends SchedulerBackend with Logging {
-
+  extends SchedulerBackend with Logging
+{
   // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
 
   class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
     private val executorActor = new HashMap[String, ActorRef]
     private val executorAddress = new HashMap[String, Address]
-    private val executorHostPort = new HashMap[String, String]
+    private val executorHost = new HashMap[String, String]
     private val freeCores = new HashMap[String, Int]
     private val actorToExecutorId = new HashMap[ActorRef, String]
     private val addressToExecutorId = new HashMap[Address, String]
@@ -53,6 +54,10 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
     override def preStart() {
       // Listen for remote client disconnection events, since they don't go through Akka's watch()
       context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+
+      // Periodically revive offers to allow delay scheduling to work
+      val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
+      context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
     }
 
     def receive = {
@@ -65,7 +70,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
           sender ! RegisteredExecutor(sparkProperties)
           context.watch(sender)
           executorActor(executorId) = sender
-          executorHostPort(executorId) = hostPort
+          executorHost(executorId) = Utils.parseHostPort(hostPort)._1
           freeCores(executorId) = cores
           executorAddress(executorId) = sender.path.address
           actorToExecutorId(sender) = executorId
@@ -105,13 +110,13 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
     // Make fake resource offers on all executors
     def makeOffers() {
       launchTasks(scheduler.resourceOffers(
-        executorHostPort.toArray.map {case (id, hostPort) => new WorkerOffer(id, hostPort, freeCores(id))}))
+        executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
     }
 
     // Make fake resource offers on just one executor
     def makeOffers(executorId: String) {
       launchTasks(scheduler.resourceOffers(
-        Seq(new WorkerOffer(executorId, executorHostPort(executorId), freeCores(executorId)))))
+        Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
     }
 
     // Launch tasks returned by a set of resource offers
@@ -130,9 +135,8 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
         actorToExecutorId -= executorActor(executorId)
         addressToExecutorId -= executorAddress(executorId)
         executorActor -= executorId
-        executorHostPort -= executorId
+        executorHost -= executorId
         freeCores -= executorId
-        executorHostPort -= executorId
         totalCoreCount.addAndGet(-numCores)
         scheduler.executorLost(executorId, SlaveLost(reason))
       }
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
index 761fdf691982dac161232aab1023249edb51a48e..187553233fb42a28ec85c95fc4f8710eccb08c2a 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
@@ -24,6 +24,7 @@ private[spark] class TaskDescription(
     val taskId: Long,
     val executorId: String,
     val name: String,
+    val index: Int,    // Index within this task's TaskSet
     _serializedTask: ByteBuffer)
   extends Serializable {
 
@@ -31,4 +32,6 @@ private[spark] class TaskDescription(
   private val buffer = new SerializableBuffer(_serializedTask)
 
   def serializedTask: ByteBuffer = buffer.value
+
+  override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
 }
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
index c693b722ac3d6c604ed2b664da884d19ea873318..c2c5522686f74d62384a5acb0d25d50d7e988085 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
@@ -28,11 +28,9 @@ class TaskInfo(
     val index: Int,
     val launchTime: Long,
     val executorId: String,
-    val hostPort: String,
+    val host: String,
     val taskLocality: TaskLocality.TaskLocality) {
 
-  Utils.checkHostPort(hostPort, "Expected hostport")
-
   var finishTime: Long = 0
   var failed = false
 
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskLocality.scala b/core/src/main/scala/spark/scheduler/cluster/TaskLocality.scala
new file mode 100644
index 0000000000000000000000000000000000000000..1c33e41f87e81d082bd3151c4f55fd39df32f32f
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskLocality.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.scheduler.cluster
+
+
+private[spark] object TaskLocality
+  extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY")
+{
+  // process local is expected to be used ONLY within tasksetmanager for now.
+  val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
+
+  type TaskLocality = Value
+
+  def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
+    condition <= constraint
+  }
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 1a92a5ed6fa06bb1a2dcf212db1c7de99f7e402d..0248830b7a24c250dbdd6b41d2c9a4068f692f88 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -22,6 +22,15 @@ import java.nio.ByteBuffer
 import spark.TaskState.TaskState
 import spark.scheduler.TaskSet
 
+/**
+ * Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of
+ * each task and is responsible for retries on failure and locality. The main interfaces to it
+ * are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, and
+ * statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
+ *
+ * THREADING: This class is designed to only be called from code with a lock on the TaskScheduler
+ * (e.g. its event handlers). It should not be called from other threads.
+ */
 private[spark] trait TaskSetManager extends Schedulable {
   def schedulableQueue = null
   
@@ -29,17 +38,12 @@ private[spark] trait TaskSetManager extends Schedulable {
   
   def taskSet: TaskSet
 
-  def slaveOffer(
+  def resourceOffer(
       execId: String,
-      hostPort: String,
-      availableCpus: Double,
-      overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription]
-
-  def numPendingTasksForHostPort(hostPort: String): Int
-
-  def numRackLocalPendingTasksForHost(hostPort: String): Int
-
-  def numPendingTasksForHost(hostPort: String): Int
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
+    : Option[TaskDescription]
 
   def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
 
diff --git a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
index 06d1203f70b0fd00a8397fc76e34f3a52bac70c3..1d09bd9b034193f71bf22e2aa8bb5c44424c2994 100644
--- a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
@@ -21,5 +21,4 @@ package spark.scheduler.cluster
  * Represents free resources available on an executor.
  */
 private[spark]
-class WorkerOffer(val executorId: String, val hostPort: String, val cores: Int) {
-}
+class WorkerOffer(val executorId: String, val host: String, val cores: Int)
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index 6c43928bc855ea7b16fa429c28310a22292fcb6b..5be4dbd9f0b527489eeba69ac234cbc8b8593ad8 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -141,7 +141,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
       for (manager <- sortedTaskSetQueue) {
         do {
           launchTask = false
-          manager.slaveOffer(null, null, freeCpuCores) match {
+          manager.resourceOffer(null, null, freeCpuCores, null) match {
             case Some(task) =>
               tasks += task
               taskIdToTaskSetId(task.taskId) = manager.taskSet.id
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index c38eeb9e11eb96b23fad0efd906cfc927ce179af..e237f289e367bd31062ad0e99eff8d7b571ea1b7 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -98,14 +98,15 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
     return None
   }
 
-  override def slaveOffer(
+  override def resourceOffer(
       execId: String,
-      hostPort: String,
-      availableCpus: Double,
-      overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] =
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
+    : Option[TaskDescription] =
   {
     SparkEnv.set(sched.env)
-    logDebug("availableCpus:%d,numFinished:%d,numTasks:%d".format(
+    logDebug("availableCpus:%d, numFinished:%d, numTasks:%d".format(
       availableCpus.toInt, numFinished, numTasks))
     if (availableCpus > 0 && numFinished < numTasks) {
       findTask() match {
@@ -124,25 +125,13 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
           copiesRunning(index) += 1
           increaseRunningTasks(1)
           taskStarted(task, info)
-          return Some(new TaskDescription(taskId, null, taskName, bytes))
+          return Some(new TaskDescription(taskId, null, taskName, index, bytes))
         case None => {}
       }
     }
     return None
   }
 
-  override def numPendingTasksForHostPort(hostPort: String): Int = {
-    return 0
-  }
-
-  override def numRackLocalPendingTasksForHost(hostPort :String): Int = {
-    return 0
-  }
-
-  override def numPendingTasksForHost(hostPort: String): Int = {
-    return 0
-  }
-
   override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
     SparkEnv.set(env)
     state match {
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 3a72474419a05c03764889a2f10219731b709425..2a6ec2a55dddd893a2b14e2843fc1bf0e22dc63b 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -1004,43 +1004,43 @@ private[spark] object BlockManager extends Logging {
     }
   }
 
-  def blockIdsToExecutorLocations(blockIds: Array[String], env: SparkEnv, blockManagerMaster: BlockManagerMaster = null): HashMap[String, List[String]] = {
+  def blockIdsToBlockManagers(
+      blockIds: Array[String],
+      env: SparkEnv,
+      blockManagerMaster: BlockManagerMaster = null)
+  : Map[String, Seq[BlockManagerId]] =
+  {
     // env == null and blockManagerMaster != null is used in tests
     assert (env != null || blockManagerMaster != null)
-    val locationBlockIds: Seq[Seq[BlockManagerId]] =
-      if (env != null) {
-        env.blockManager.getLocationBlockIds(blockIds)
-      } else {
-        blockManagerMaster.getLocations(blockIds)
-      }
+    val blockLocations: Seq[Seq[BlockManagerId]] = if (env != null) {
+      env.blockManager.getLocationBlockIds(blockIds)
+    } else {
+      blockManagerMaster.getLocations(blockIds)
+    }
 
-    // Convert from block master locations to executor locations (we need that for task scheduling)
-    val executorLocations = new HashMap[String, List[String]]()
+    val blockManagers = new HashMap[String, Seq[BlockManagerId]]
     for (i <- 0 until blockIds.length) {
-      val blockId = blockIds(i)
-      val blockLocations = locationBlockIds(i)
-
-      val executors = new HashSet[String]()
-
-      if (env != null) {
-        for (bkLocation <- blockLocations) {
-          val executorHostPort = env.resolveExecutorIdToHostPort(bkLocation.executorId, bkLocation.host)
-          executors += executorHostPort
-          // logInfo("bkLocation = " + bkLocation + ", executorHostPort = " + executorHostPort)
-        }
-      } else {
-        // Typically while testing, etc - revert to simply using host.
-        for (bkLocation <- blockLocations) {
-          executors += bkLocation.host
-          // logInfo("bkLocation = " + bkLocation + ", executorHostPort = " + executorHostPort)
-        }
-      }
-
-      executorLocations.put(blockId, executors.toSeq.toList)
+      blockManagers(blockIds(i)) = blockLocations(i)
     }
+    blockManagers.toMap
+  }
 
-    executorLocations
+  def blockIdsToExecutorIds(
+      blockIds: Array[String],
+      env: SparkEnv,
+      blockManagerMaster: BlockManagerMaster = null)
+    : Map[String, Seq[String]] =
+  {
+    blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.executorId))
   }
 
+  def blockIdsToHosts(
+      blockIds: Array[String],
+      env: SparkEnv,
+      blockManagerMaster: BlockManagerMaster = null)
+    : Map[String, Seq[String]] =
+  {
+    blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.host))
+  }
 }
 
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 797513f26639ad73dfb1cad233d087d39ad60e50..6948ea4dd907bbc26f15f25bbf88244df1029634 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -156,7 +156,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
       <td>{info.taskId}</td>
       <td>{info.status}</td>
       <td>{info.taskLocality}</td>
-      <td>{info.hostPort}</td>
+      <td>{info.host}</td>
       <td>{dateFmt.format(new Date(info.launchTime))}</td>
       <td sorttable_customkey={duration.toString}>
         {formatDuration}
diff --git a/core/src/main/scala/spark/util/Clock.scala b/core/src/main/scala/spark/util/Clock.scala
new file mode 100644
index 0000000000000000000000000000000000000000..aa71a5b4424640bff202c2efbc9d631bd0c9ee95
--- /dev/null
+++ b/core/src/main/scala/spark/util/Clock.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.util
+
+/**
+ * An interface to represent clocks, so that they can be mocked out in unit tests.
+ */
+private[spark] trait Clock {
+  def getTime(): Long
+}
+
+private[spark] object SystemClock extends Clock {
+  def getTime(): Long = System.currentTimeMillis()
+}
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index ce6cec0451e3ed08ccc432344154b6db8462df15..c21f3331d0876b769092e3454f496b96b7cda49a 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -112,22 +112,22 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
         "akka://spark@localhost:" + boundPort + "/user/MapOutputTracker")
 
     masterTracker.registerShuffle(10, 1)
-    masterTracker.incrementGeneration()
-    slaveTracker.updateGeneration(masterTracker.getGeneration)
+    masterTracker.incrementEpoch()
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
     intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
 
     val compressedSize1000 = MapOutputTracker.compressSize(1000L)
     val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
     masterTracker.registerMapOutput(10, 0, new MapStatus(
       BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000)))
-    masterTracker.incrementGeneration()
-    slaveTracker.updateGeneration(masterTracker.getGeneration)
+    masterTracker.incrementEpoch()
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
     assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
            Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
 
     masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
-    masterTracker.incrementGeneration()
-    slaveTracker.updateGeneration(masterTracker.getGeneration)
+    masterTracker.incrementEpoch()
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
     intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
 
     // failure should be cached
diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
index caaf3209fd660f7f88b2b1eace053732d4e8bb51..3b4a0d52fc79e98ba4bf0d8e28078dae53cf07d1 100644
--- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
@@ -59,7 +59,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     override def stop() = {}
     override def submitTasks(taskSet: TaskSet) = {
       // normally done by TaskSetManager
-      taskSet.tasks.foreach(_.generation = mapOutputTracker.getGeneration)
+      taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
       taskSets += taskSet
     }
     override def setListener(listener: TaskSchedulerListener) = {}
@@ -299,10 +299,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     val reduceRdd = makeRdd(2, List(shuffleDep))
     submit(reduceRdd, Array(0, 1))
     // pretend we were told hostA went away
-    val oldGeneration = mapOutputTracker.getGeneration
+    val oldEpoch = mapOutputTracker.getEpoch
     runEvent(ExecutorLost("exec-hostA"))
-    val newGeneration = mapOutputTracker.getGeneration
-    assert(newGeneration > oldGeneration)
+    val newEpoch = mapOutputTracker.getEpoch
+    assert(newEpoch > oldEpoch)
     val noAccum = Map[Long, Any]()
     val taskSet = taskSets(0)
     // should be ignored for being too old
@@ -311,8 +311,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum, null, null))
     // should be ignored for being too old
     runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
-    // should work because it's a new generation
-    taskSet.tasks(1).generation = newGeneration
+    // should work because it's a new epoch
+    taskSet.tasks(1).epoch = newEpoch
     runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null))
     assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
            Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
@@ -401,12 +401,14 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     assert(results === Map(0 -> 42))
   }
 
-  /** Assert that the supplied TaskSet has exactly the given preferredLocations. Note, converts taskSet's locations to host only. */
-  private def assertLocations(taskSet: TaskSet, locations: Seq[Seq[String]]) {
-    assert(locations.size === taskSet.tasks.size)
-    for ((expectLocs, taskLocs) <-
-            taskSet.tasks.map(_.preferredLocations).zip(locations)) {
-      assert(expectLocs.map(loc => spark.Utils.parseHostPort(loc)._1) === taskLocs)
+  /**
+   * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
+   * Note that this checks only the host and not the executor ID.
+   */
+  private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]) {
+    assert(hosts.size === taskSet.tasks.size)
+    for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) {
+      assert(taskLocs.map(_.host) === expectedLocs)
     }
   }
 
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
similarity index 92%
rename from core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
rename to core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
index 05afcd656760bff60f54c1111cea4f03cc4b22b9..abfdabf5fe634f5c135a7f1c04467120a4c8cc4c 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package spark.scheduler
+package spark.scheduler.cluster
 
 import org.scalatest.FunSuite
 import org.scalatest.BeforeAndAfter
@@ -27,7 +27,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import java.util.Properties
 
-class DummyTaskSetManager(
+class FakeTaskSetManager(
     initPriority: Int,
     initStageId: Int,
     initNumTasks: Int,
@@ -72,10 +72,16 @@ class DummyTaskSetManager(
   override def executorLost(executorId: String, host: String): Unit = {
   }
 
-  override def slaveOffer(execId: String, host: String, avaiableCpus: Double, overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] = {
+  override def resourceOffer(
+      execId: String,
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
+    : Option[TaskDescription] =
+  {
     if (tasksFinished + runningTasks < numTasks) {
       increaseRunningTasks(1)
-      return Some(new TaskDescription(0, execId, "task 0:0", null))
+      return Some(new TaskDescription(0, execId, "task 0:0", 0, null))
     }
     return None
   }
@@ -98,17 +104,10 @@ class DummyTaskSetManager(
   }
 }
 
-class DummyTask(stageId: Int) extends Task[Int](stageId)
-{
-  def run(attemptId: Long): Int = {
-    return 0
-  }
-}
-
 class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
 
-  def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = {
-    new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet)
+  def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): FakeTaskSetManager = {
+    new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
   }
 
   def resourceOffer(rootPool: Pool): Int = {
@@ -118,7 +117,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
        logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
     }
     for (taskSet <- taskSetQueue) {
-      taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
+      taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
         case Some(task) =>
           return taskSet.stageId
         case None => {}
@@ -135,7 +134,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
     sc = new SparkContext("local", "ClusterSchedulerSuite")
     val clusterScheduler = new ClusterScheduler(sc)
     var tasks = ArrayBuffer[Task[_]]()
-    val task = new DummyTask(0)
+    val task = new FakeTask(0)
     tasks += task
     val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
 
@@ -162,7 +161,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
     sc = new SparkContext("local", "ClusterSchedulerSuite")
     val clusterScheduler = new ClusterScheduler(sc)
     var tasks = ArrayBuffer[Task[_]]()
-    val task = new DummyTask(0)
+    val task = new FakeTask(0)
     tasks += task
     val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
 
@@ -219,7 +218,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
     sc = new SparkContext("local", "ClusterSchedulerSuite")
     val clusterScheduler = new ClusterScheduler(sc)
     var tasks = ArrayBuffer[Task[_]]()
-    val task = new DummyTask(0)
+    val task = new FakeTask(0)
     tasks += task
     val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
 
diff --git a/core/src/test/scala/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..5a0b949ef5f65e87595b294d18df0e7e14e19ebd
--- /dev/null
+++ b/core/src/test/scala/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.scheduler.cluster
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import org.scalatest.FunSuite
+
+import spark._
+import spark.scheduler._
+import spark.executor.TaskMetrics
+import java.nio.ByteBuffer
+import spark.util.FakeClock
+
+/**
+ * A mock ClusterScheduler implementation that just remembers information about tasks started and
+ * feedback received from the TaskSetManagers. Note that it's important to initialize this with
+ * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
+ * to work, and these are required for locality in ClusterTaskSetManager.
+ */
+class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
+  extends ClusterScheduler(sc)
+{
+  val startedTasks = new ArrayBuffer[Long]
+  val endedTasks = new mutable.HashMap[Long, TaskEndReason]
+  val finishedManagers = new ArrayBuffer[TaskSetManager]
+
+  val executors = new mutable.HashMap[String, String] ++ liveExecutors
+
+  listener = new TaskSchedulerListener {
+    def taskStarted(task: Task[_], taskInfo: TaskInfo) {
+      startedTasks += taskInfo.index
+    }
+
+    def taskEnded(
+        task: Task[_],
+        reason: TaskEndReason,
+        result: Any,
+        accumUpdates: mutable.Map[Long, Any],
+        taskInfo: TaskInfo,
+        taskMetrics: TaskMetrics)
+    {
+      endedTasks(taskInfo.index) = reason
+    }
+
+    def executorGained(execId: String, host: String) {}
+
+    def executorLost(execId: String) {}
+
+    def taskSetFailed(taskSet: TaskSet, reason: String) {}
+  }
+
+  def removeExecutor(execId: String): Unit = executors -= execId
+
+  override def taskSetFinished(manager: TaskSetManager): Unit = finishedManagers += manager
+
+  override def isExecutorAlive(execId: String): Boolean = executors.contains(execId)
+
+  override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
+}
+
+class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
+  import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
+
+  val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
+
+  test("TaskSet with no preferences") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+    val taskSet = createTaskSet(1)
+    val manager = new ClusterTaskSetManager(sched, taskSet)
+
+    // Offer a host with no CPUs
+    assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None)
+
+    // Offer a host with process-local as the constraint; this should work because the TaskSet
+    // above won't have any locality preferences
+    val taskOption = manager.resourceOffer("exec1", "host1", 2, TaskLocality.PROCESS_LOCAL)
+    assert(taskOption.isDefined)
+    val task = taskOption.get
+    assert(task.executorId === "exec1")
+    assert(sched.startedTasks.contains(0))
+
+    // Re-offer the host -- now we should get no more tasks
+    assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None)
+
+    // Tell it the task has finished
+    manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0))
+    assert(sched.endedTasks(0) === Success)
+    assert(sched.finishedManagers.contains(manager))
+  }
+
+  test("multiple offers with no preferences") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+    val taskSet = createTaskSet(3)
+    val manager = new ClusterTaskSetManager(sched, taskSet)
+
+    // First three offers should all find tasks
+    for (i <- 0 until 3) {
+      val taskOption = manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL)
+      assert(taskOption.isDefined)
+      val task = taskOption.get
+      assert(task.executorId === "exec1")
+    }
+    assert(sched.startedTasks.toSet === Set(0, 1, 2))
+
+    // Re-offer the host -- now we should get no more tasks
+    assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+
+    // Finish the first two tasks
+    manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0))
+    manager.statusUpdate(1, TaskState.FINISHED, createTaskResult(1))
+    assert(sched.endedTasks(0) === Success)
+    assert(sched.endedTasks(1) === Success)
+    assert(!sched.finishedManagers.contains(manager))
+
+    // Finish the last task
+    manager.statusUpdate(2, TaskState.FINISHED, createTaskResult(2))
+    assert(sched.endedTasks(2) === Success)
+    assert(sched.finishedManagers.contains(manager))
+  }
+
+  test("basic delay scheduling") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    val taskSet = createTaskSet(4,
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host2", "exec2")),
+      Seq(TaskLocation("host1"), TaskLocation("host2", "exec2")),
+      Seq()   // Last task has no locality prefs
+    )
+    val clock = new FakeClock
+    val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+
+    // First offer host1, exec1: first task should be chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+    // Offer host1, exec1 again: the last task, which has no prefs, should be chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 3)
+
+    // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+
+    clock.advance(LOCALITY_WAIT)
+
+    // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+
+    // Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
+    assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL).get.index == 2)
+
+    // Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL) === None)
+
+    // Offer host1, exec1 again, at ANY level: nothing should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+
+    clock.advance(LOCALITY_WAIT)
+
+    // Offer host1, exec1 again, at ANY level: task 1 should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+
+    // Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+  }
+
+  test("delay scheduling with fallback") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeClusterScheduler(sc,
+      ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
+    val taskSet = createTaskSet(5,
+      Seq(TaskLocation("host1")),
+      Seq(TaskLocation("host2")),
+      Seq(TaskLocation("host2")),
+      Seq(TaskLocation("host3")),
+      Seq(TaskLocation("host2"))
+    )
+    val clock = new FakeClock
+    val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+
+    // First offer host1: first task should be chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+    // Offer host1 again: nothing should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+
+    clock.advance(LOCALITY_WAIT)
+
+    // Offer host1 again: second task (on host2) should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+
+    // Offer host1 again: third task (on host2) should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
+
+    // Offer host2: fifth task (also on host2) should get chosen
+    assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 4)
+
+    // Now that we've launched a local task, we should no longer launch the task for host3
+    assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
+
+    clock.advance(LOCALITY_WAIT)
+
+    // After another delay, we can go ahead and launch that task non-locally
+    assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 3)
+  }
+
+  test("delay scheduling with failed hosts") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    val taskSet = createTaskSet(3,
+      Seq(TaskLocation("host1")),
+      Seq(TaskLocation("host2")),
+      Seq(TaskLocation("host3"))
+    )
+    val clock = new FakeClock
+    val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+
+    // First offer host1: first task should be chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+    // Offer host1 again: third task should be chosen immediately because host3 is not up
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
+
+    // After this, nothing should get chosen
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+
+    // Now mark host2 as dead
+    sched.removeExecutor("exec2")
+    manager.executorLost("exec2", "host2")
+
+    // Task 1 should immediately be launched on host1 because its original host is gone
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+
+    // Now that all tasks have launched, nothing new should be launched anywhere else
+    assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+    assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
+  }
+
+  /**
+   * Utility method to create a TaskSet, potentially setting a particular sequence of preferred
+   * locations for each task (given as varargs) if this sequence is not empty.
+   */
+  def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
+    if (prefLocs.size != 0 && prefLocs.size != numTasks) {
+      throw new IllegalArgumentException("Wrong number of task locations")
+    }
+    val tasks = Array.tabulate[Task[_]](numTasks) { i =>
+      new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil)
+    }
+    new TaskSet(tasks, 0, 0, 0, null)
+  }
+
+  def createTaskResult(id: Int): ByteBuffer = {
+    ByteBuffer.wrap(Utils.serialize(new TaskResult[Int](id, mutable.Map.empty, new TaskMetrics)))
+  }
+}
diff --git a/core/src/test/scala/spark/scheduler/cluster/FakeTask.scala b/core/src/test/scala/spark/scheduler/cluster/FakeTask.scala
new file mode 100644
index 0000000000000000000000000000000000000000..de9e66be20ae6116828bae17650dd0d0bb06f768
--- /dev/null
+++ b/core/src/test/scala/spark/scheduler/cluster/FakeTask.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.scheduler.cluster
+
+import spark.scheduler.{TaskLocation, Task}
+
+class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId) {
+  override def run(attemptId: Long): Int = 0
+
+  override def preferredLocations: Seq[TaskLocation] = prefLocs
+}
diff --git a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala
similarity index 99%
rename from core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
rename to core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala
index a79b8bf256064e8834174ea1ba7ff916ff7c6d0a..d28ee47fa335ef81e512b943f674c8a4f6ea5a8d 100644
--- a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package spark.scheduler
+package spark.scheduler.local
 
 import org.scalatest.FunSuite
 import org.scalatest.BeforeAndAfter
diff --git a/core/src/test/scala/spark/util/FakeClock.scala b/core/src/test/scala/spark/util/FakeClock.scala
new file mode 100644
index 0000000000000000000000000000000000000000..236706317e351c7b5fadbf90754473a82cc6e95d
--- /dev/null
+++ b/core/src/test/scala/spark/util/FakeClock.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.util
+
+class FakeClock extends Clock {
+  private var time = 0L
+
+  def advance(millis: Long): Unit = time += millis
+
+  def getTime(): Long = time
+}
diff --git a/docs/configuration.md b/docs/configuration.md
index 99624a44aa963f1e17260a679a958b44229dc37e..dff08a06f5651b5de2a8bf08f93d4abcccd80b7a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -243,8 +243,34 @@ Apart from these, the following properties are also available, and may be useful
   <td>3000</td>
   <td>
     Number of milliseconds to wait to launch a data-local task before giving up and launching it
-    in a non-data-local location. You should increase this if your tasks are long and you are seeing
-    poor data locality, but the default generally works well.
+    on a less-local node. The same wait will be used to step through multiple locality levels
+    (process-local, node-local, rack-local and then any). It is also possible to customize the
+    waiting time for each level by setting <code>spark.locality.wait.node</code>, etc.
+    You should increase this setting if your tasks are long and see poor locality, but the
+    default usually works well.
+  </td>
+</tr>
+<tr>
+  <td>spark.locality.wait.process</td>
+  <td>spark.locality.wait</td>
+  <td>
+    Customize the locality wait for process locality. This affects tasks that attempt to access
+    cached data in a particular executor process.
+  </td>
+</tr>
+<tr>
+  <td>spark.locality.wait.node</td>
+  <td>spark.locality.wait</td>
+  <td>
+    Customize the locality wait for node locality. For example, you can set this to 0 to skip
+    node locality and search immediately for rack locality (if your cluster has rack information).
+  </td>
+</tr>
+<tr>
+  <td>spark.locality.wait.rack</td>
+  <td>spark.locality.wait</td>
+  <td>
+    Customize the locality wait for rack locality.
   </td>
 </tr>
 <tr>