From 222c8971285190761354456c2fe07f5c31edf330 Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Wed, 14 Aug 2013 13:56:40 -0700
Subject: [PATCH] Comment cleanup (via Kay) and some debug messages

---
 .../scheduler/cluster/ClusterScheduler.scala  | 30 +++++++------------
 .../cluster/ClusterTaskSetManager.scala       |  5 ++--
 .../{ => cluster}/ClusterSchedulerSuite.scala |  2 +-
 .../{ => local}/LocalSchedulerSuite.scala     |  2 +-
 4 files changed, 16 insertions(+), 23 deletions(-)
 rename core/src/test/scala/spark/scheduler/{ => cluster}/ClusterSchedulerSuite.scala (99%)
 rename core/src/test/scala/spark/scheduler/{ => local}/LocalSchedulerSuite.scala (99%)

diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index ec76e90185..028f4d3283 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -385,25 +385,17 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 
 
 object ClusterScheduler {
-
-  // Used to 'spray' available containers across the available set to ensure too many containers on same host
-  // are not used up. Used in yarn mode and in task scheduling (when there are multiple containers available
-  // to execute a task)
-  // For example: yarn can returns more containers than we would have requested under ANY, this method
-  // prioritizes how to use the allocated containers.
-  // flatten the map such that the array buffer entries are spread out across the returned value.
-  // given <host, list[container]> == <h1, [c1 .. c5]>, <h2, [c1 .. c3]>, <h3, [c1, c2]>, <h4, c1>, <h5, c1>, i
-  // the return value would be something like : h1c1, h2c1, h3c1, h4c1, h5c1, h1c2, h2c2, h3c2, h1c3, h2c3, h1c4, h1c5
-  // We then 'use' the containers in this order (consuming only the top K from this list where
-  // K = number to be user). This is to ensure that if we have multiple eligible allocations,
-  // they dont end up allocating all containers on a small number of hosts - increasing probability of
-  // multiple container failure when a host goes down.
-  // Note, there is bias for keys with higher number of entries in value to be picked first (by design)
-  // Also note that invocation of this method is expected to have containers of same 'type'
-  // (host-local, rack-local, off-rack) and not across types : so that reordering is simply better from
-  // the available list - everything else being same.
-  // That is, we we first consume data local, then rack local and finally off rack nodes. So the
-  // prioritization from this method applies to within each category
+  /**
+   * Used to balance containers across hosts.
+   *
+   * Accepts a map of hosts to resource offers for that host, and returns a prioritized list of
+   * resource offers representing the order in which the offers should be used.  The resource
+   * offers are ordered such that we'll allocate one container on each host before allocating a
+   * second container on any host, and so on, in order to reduce the damage if a host fails.
+   *
+   * For example, given <h1, [o1, o2, o3]>, <h2, [o4]>, <h1, [o5, o6]>, returns
+   * [o1, o5, o4, 02, o6, o3]
+   */
   def prioritizeContainers[K, T] (map: HashMap[K, ArrayBuffer[T]]): List[T] = {
     val _keyList = new ArrayBuffer[K](map.size)
     _keyList ++= map.keys
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index cf406f876f..5316a7aed1 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -116,7 +116,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
 
   // Figure out the current map output tracker epoch and set it on all tasks
   val epoch = sched.mapOutputTracker.getEpoch
-  logDebug("Epoch for " + taskSet.id + ": " + epoch)
+  logDebug("Epoch for " + taskSet + ": " + epoch)
   for (t <- tasks) {
     t.epoch = epoch
   }
@@ -129,7 +129,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
 
   // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
   val myLocalityLevels = computeValidLocalityLevels()
-  val localityWaits = myLocalityLevels.map(getLocalityWait)  // spark.locality.wait
+  val localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
 
   // Delay scheduling variables: we keep track of our current locality level and the time we
   // last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
@@ -687,6 +687,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
       levels += RACK_LOCAL
     }
     levels += ANY
+    logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
     levels.toArray
   }
 }
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
similarity index 99%
rename from core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
rename to core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
index 6327155157..8618009ea6 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package spark.scheduler
+package spark.scheduler.cluster
 
 import org.scalatest.FunSuite
 import org.scalatest.BeforeAndAfter
diff --git a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala
similarity index 99%
rename from core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
rename to core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala
index a79b8bf256..d28ee47fa3 100644
--- a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package spark.scheduler
+package spark.scheduler.local
 
 import org.scalatest.FunSuite
 import org.scalatest.BeforeAndAfter
-- 
GitLab