diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 028f4d3283a6a51a0f29e45dee805b566d0e9436..e88edc5b2ac3f7689f19f3d95f468e149ff99d41 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -184,27 +184,29 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       }
     }
 
-    // Build a list of tasks to assign to each slave
+    // Build a list of tasks to assign to each worker
     val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
     val availableCpus = offers.map(o => o.cores).toArray
-    val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
-    for (manager <- sortedTaskSetQueue) {
+    val sortedTaskSets = rootPool.getSortedTaskSetQueue()
+    for (taskSet <- sortedTaskSets) {
       logDebug("parentName: %s, name: %s, runningTasks: %s".format(
-        manager.parent.name, manager.name, manager.runningTasks))
+        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
     }
 
+    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
+    // of locality levels so that it gets a chance to launch local tasks on all of them.
     var launchedTask = false
-    for (manager <- sortedTaskSetQueue; offer <- offers) {
+    for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
       do {
         launchedTask = false
         for (i <- 0 until offers.size) {
           val execId = offers(i).executorId
           val host = offers(i).host
-          for (task <- manager.resourceOffer(execId, host, availableCpus(i))) {
+          for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
             tasks(i) += task
             val tid = task.taskId
-            taskIdToTaskSetId(tid) = manager.taskSet.id
-            taskSetTaskIds(manager.taskSet.id) += tid
+            taskIdToTaskSetId(tid) = taskSet.taskSet.id
+            taskSetTaskIds(taskSet.taskSet.id) += tid
             taskIdToExecutorId(tid) = execId
             activeExecutorIds += execId
             executorsByHost(host) += execId
@@ -402,8 +404,7 @@ object ClusterScheduler {
 
     // order keyList based on population of value in map
     val keyList = _keyList.sortWith(
-      // TODO(matei): not sure why we're using getOrElse if keyList = map.keys... see if it matters
-      (left, right) => map.get(left).getOrElse(Set()).size > map.get(right).getOrElse(Set()).size
+      (left, right) => map(left).size > map(right).size
     )
 
     val retval = new ArrayBuffer[T](keyList.size * 2)
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 5316a7aed107a565cd213859ee0afdb91d2462f7..91de25254c073077493c792ee4e86831209016f9 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -43,7 +43,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
   extends TaskSetManager with Logging {
 
   // CPUs to request per task
-  val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
+  val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
 
   // Maximum times a task is allowed to fail before failing the job
   val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
@@ -325,15 +325,22 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
   /**
    * Respond to an offer of a single slave from the scheduler by finding a task
    */
-  override def resourceOffer(execId: String, host: String, availableCpus: Double)
+  override def resourceOffer(
+      execId: String,
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
     : Option[TaskDescription] =
   {
     if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
       val curTime = System.currentTimeMillis()
 
-      val locality = getAllowedLocalityLevel(curTime)
+      var allowedLocality = getAllowedLocalityLevel(curTime)
+      if (allowedLocality > maxLocality) {
+        allowedLocality = maxLocality   // We're not allowed to search for farther-away tasks
+      }
 
-      findTask(execId, host, locality) match {
+      findTask(execId, host, allowedLocality) match {
         case Some((index, taskLocality)) => {
           // Found a task; do some bookkeeping and return a task description
           val task = tasks(index)
@@ -347,7 +354,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
           taskInfos(taskId) = info
           taskAttempts(index) = info :: taskAttempts(index)
           // Update our locality level for delay scheduling
-          currentLocalityIndex = getLocalityIndex(locality)
+          currentLocalityIndex = getLocalityIndex(allowedLocality)
           lastLaunchTime = curTime
           // Serialize and return the task
           val startTime = System.currentTimeMillis()
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 277654edc076c6825bd6fdca0e29e6de382727bb..5ab6ab9aadbbad7d65394f542f35e6f9a0b3ae81 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -29,7 +29,11 @@ private[spark] trait TaskSetManager extends Schedulable {
   
   def taskSet: TaskSet
 
-  def resourceOffer(execId: String, hostPort: String, availableCpus: Double)
+  def resourceOffer(
+      execId: String,
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
     : Option[TaskDescription]
 
   def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index a4f5f46777a40de92f8ea4d841e3887b20375432..5be4dbd9f0b527489eeba69ac234cbc8b8593ad8 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -141,8 +141,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
       for (manager <- sortedTaskSetQueue) {
         do {
           launchTask = false
-          // TODO(matei): don't pass null here?
-          manager.resourceOffer(null, null, freeCpuCores) match {
+          manager.resourceOffer(null, null, freeCpuCores, null) match {
             case Some(task) =>
               tasks += task
               taskIdToTaskSetId(task.taskId) = manager.taskSet.id
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index 698c777bec6a3044a94d989ed6a7af71c7c8b1f6..3ef636ff075c7883d0ffe39f442866d787f88b1e 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -98,7 +98,11 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
     return None
   }
 
-  override def resourceOffer(execId: String, host: String, availableCpus: Double)
+  override def resourceOffer(
+      execId: String,
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
     : Option[TaskDescription] =
   {
     SparkEnv.set(sched.env)
diff --git a/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
index 8618009ea64861d585099a16b2c4f43e9111ba36..aeeed147864b9657b865f67c6c9822f3d47c1351 100644
--- a/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
@@ -72,7 +72,11 @@ class DummyTaskSetManager(
   override def executorLost(executorId: String, host: String): Unit = {
   }
 
-  override def resourceOffer(execId: String, host: String, availableCpus: Double)
+  override def resourceOffer(
+      execId: String,
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
     : Option[TaskDescription] =
   {
     if (tasksFinished + runningTasks < numTasks) {
@@ -120,7 +124,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
        logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
     }
     for (taskSet <- taskSetQueue) {
-      taskSet.resourceOffer("execId_1", "hostname_1", 1) match {
+      taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
         case Some(task) =>
           return taskSet.stageId
         case None => {}
diff --git a/docs/configuration.md b/docs/configuration.md
index 99624a44aa963f1e17260a679a958b44229dc37e..dff08a06f5651b5de2a8bf08f93d4abcccd80b7a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -243,8 +243,34 @@ Apart from these, the following properties are also available, and may be useful
   <td>3000</td>
   <td>
     Number of milliseconds to wait to launch a data-local task before giving up and launching it
-    in a non-data-local location. You should increase this if your tasks are long and you are seeing
-    poor data locality, but the default generally works well.
+    on a less-local node. The same wait will be used to step through multiple locality levels
+    (process-local, node-local, rack-local and then any). It is also possible to customize the
+    waiting time for each level by setting <code>spark.locality.wait.node</code>, etc.
+    You should increase this setting if your tasks are long and see poor locality, but the
+    default usually works well.
+  </td>
+</tr>
+<tr>
+  <td>spark.locality.wait.process</td>
+  <td>spark.locality.wait</td>
+  <td>
+    Customize the locality wait for process locality. This affects tasks that attempt to access
+    cached data in a particular executor process.
+  </td>
+</tr>
+<tr>
+  <td>spark.locality.wait.node</td>
+  <td>spark.locality.wait</td>
+  <td>
+    Customize the locality wait for node locality. For example, you can set this to 0 to skip
+    node locality and search immediately for rack locality (if your cluster has rack information).
+  </td>
+</tr>
+<tr>
+  <td>spark.locality.wait.rack</td>
+  <td>spark.locality.wait</td>
+  <td>
+    Customize the locality wait for rack locality.
   </td>
 </tr>
 <tr>