From 2a4ed10210f9ee32f472e2465094d88561c0ff18 Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Thu, 15 Aug 2013 17:22:49 -0700
Subject: [PATCH] Address some review comments:

- When a resourceOffers() call has multiple offers, force the TaskSets
  to consider them in increasing order of locality levels so that they
  get a chance to launch stuff locally across all offers

- Simplify ClusterScheduler.prioritizeContainers

- Add docs on the new configuration options
---
 .../scheduler/cluster/ClusterScheduler.scala  | 21 ++++++-------
 .../cluster/ClusterTaskSetManager.scala       | 17 +++++++----
 .../scheduler/cluster/TaskSetManager.scala    |  6 +++-
 .../scheduler/local/LocalScheduler.scala      |  3 +-
 .../scheduler/local/LocalTaskSetManager.scala |  6 +++-
 .../cluster/ClusterSchedulerSuite.scala       |  8 +++--
 docs/configuration.md                         | 30 +++++++++++++++++--
 7 files changed, 68 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 028f4d3283..e88edc5b2a 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -184,27 +184,29 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       }
     }
 
-    // Build a list of tasks to assign to each slave
+    // Build a list of tasks to assign to each worker
     val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
     val availableCpus = offers.map(o => o.cores).toArray
-    val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
-    for (manager <- sortedTaskSetQueue) {
+    val sortedTaskSets = rootPool.getSortedTaskSetQueue()
+    for (taskSet <- sortedTaskSets) {
       logDebug("parentName: %s, name: %s, runningTasks: %s".format(
-        manager.parent.name, manager.name, manager.runningTasks))
+        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
     }
 
+    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
+    // of locality levels so that it gets a chance to launch local tasks on all of them.
     var launchedTask = false
-    for (manager <- sortedTaskSetQueue; offer <- offers) {
+    for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
       do {
         launchedTask = false
         for (i <- 0 until offers.size) {
           val execId = offers(i).executorId
           val host = offers(i).host
-          for (task <- manager.resourceOffer(execId, host, availableCpus(i))) {
+          for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
             tasks(i) += task
             val tid = task.taskId
-            taskIdToTaskSetId(tid) = manager.taskSet.id
-            taskSetTaskIds(manager.taskSet.id) += tid
+            taskIdToTaskSetId(tid) = taskSet.taskSet.id
+            taskSetTaskIds(taskSet.taskSet.id) += tid
             taskIdToExecutorId(tid) = execId
             activeExecutorIds += execId
             executorsByHost(host) += execId
@@ -402,8 +404,7 @@ object ClusterScheduler {
 
     // order keyList based on population of value in map
     val keyList = _keyList.sortWith(
-      // TODO(matei): not sure why we're using getOrElse if keyList = map.keys... see if it matters
-      (left, right) => map.get(left).getOrElse(Set()).size > map.get(right).getOrElse(Set()).size
+      (left, right) => map(left).size > map(right).size
     )
 
     val retval = new ArrayBuffer[T](keyList.size * 2)
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 5316a7aed1..91de25254c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -43,7 +43,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
   extends TaskSetManager with Logging {
 
   // CPUs to request per task
-  val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
+  val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
 
   // Maximum times a task is allowed to fail before failing the job
   val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
@@ -325,15 +325,22 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
   /**
    * Respond to an offer of a single slave from the scheduler by finding a task
    */
-  override def resourceOffer(execId: String, host: String, availableCpus: Double)
+  override def resourceOffer(
+      execId: String,
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
     : Option[TaskDescription] =
   {
     if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
       val curTime = System.currentTimeMillis()
 
-      val locality = getAllowedLocalityLevel(curTime)
+      var allowedLocality = getAllowedLocalityLevel(curTime)
+      if (allowedLocality > maxLocality) {
+        allowedLocality = maxLocality   // We're not allowed to search for farther-away tasks
+      }
 
-      findTask(execId, host, locality) match {
+      findTask(execId, host, allowedLocality) match {
         case Some((index, taskLocality)) => {
           // Found a task; do some bookkeeping and return a task description
           val task = tasks(index)
@@ -347,7 +354,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
           taskInfos(taskId) = info
           taskAttempts(index) = info :: taskAttempts(index)
           // Update our locality level for delay scheduling
-          currentLocalityIndex = getLocalityIndex(locality)
+          currentLocalityIndex = getLocalityIndex(allowedLocality)
           lastLaunchTime = curTime
           // Serialize and return the task
           val startTime = System.currentTimeMillis()
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 277654edc0..5ab6ab9aad 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -29,7 +29,11 @@ private[spark] trait TaskSetManager extends Schedulable {
   
   def taskSet: TaskSet
 
-  def resourceOffer(execId: String, hostPort: String, availableCpus: Double)
+  def resourceOffer(
+      execId: String,
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
     : Option[TaskDescription]
 
   def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index a4f5f46777..5be4dbd9f0 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -141,8 +141,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
       for (manager <- sortedTaskSetQueue) {
         do {
           launchTask = false
-          // TODO(matei): don't pass null here?
-          manager.resourceOffer(null, null, freeCpuCores) match {
+          manager.resourceOffer(null, null, freeCpuCores, null) match {
             case Some(task) =>
               tasks += task
               taskIdToTaskSetId(task.taskId) = manager.taskSet.id
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index 698c777bec..3ef636ff07 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -98,7 +98,11 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
     return None
   }
 
-  override def resourceOffer(execId: String, host: String, availableCpus: Double)
+  override def resourceOffer(
+      execId: String,
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
     : Option[TaskDescription] =
   {
     SparkEnv.set(sched.env)
diff --git a/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
index 8618009ea6..aeeed14786 100644
--- a/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
@@ -72,7 +72,11 @@ class DummyTaskSetManager(
   override def executorLost(executorId: String, host: String): Unit = {
   }
 
-  override def resourceOffer(execId: String, host: String, availableCpus: Double)
+  override def resourceOffer(
+      execId: String,
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
     : Option[TaskDescription] =
   {
     if (tasksFinished + runningTasks < numTasks) {
@@ -120,7 +124,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
        logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
     }
     for (taskSet <- taskSetQueue) {
-      taskSet.resourceOffer("execId_1", "hostname_1", 1) match {
+      taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
         case Some(task) =>
           return taskSet.stageId
         case None => {}
diff --git a/docs/configuration.md b/docs/configuration.md
index 99624a44aa..dff08a06f5 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -243,8 +243,34 @@ Apart from these, the following properties are also available, and may be useful
   <td>3000</td>
   <td>
     Number of milliseconds to wait to launch a data-local task before giving up and launching it
-    in a non-data-local location. You should increase this if your tasks are long and you are seeing
-    poor data locality, but the default generally works well.
+    on a less-local node. The same wait will be used to step through multiple locality levels
+    (process-local, node-local, rack-local and then any). It is also possible to customize the
+    waiting time for each level by setting <code>spark.locality.wait.node</code>, etc.
+    You should increase this setting if your tasks are long and see poor locality, but the
+    default usually works well.
+  </td>
+</tr>
+<tr>
+  <td>spark.locality.wait.process</td>
+  <td>spark.locality.wait</td>
+  <td>
+    Customize the locality wait for process locality. This affects tasks that attempt to access
+    cached data in a particular executor process.
+  </td>
+</tr>
+<tr>
+  <td>spark.locality.wait.node</td>
+  <td>spark.locality.wait</td>
+  <td>
+    Customize the locality wait for node locality. For example, you can set this to 0 to skip
+    node locality and search immediately for rack locality (if your cluster has rack information).
+  </td>
+</tr>
+<tr>
+  <td>spark.locality.wait.rack</td>
+  <td>spark.locality.wait</td>
+  <td>
+    Customize the locality wait for rack locality.
   </td>
 </tr>
 <tr>
-- 
GitLab