From c6e2770bfe940a4f4f26f75c9ba228faea7316f0 Mon Sep 17 00:00:00 2001
From: Andrew xia <junluan.xia@intel.com>
Date: Fri, 17 May 2013 05:10:38 +0800
Subject: [PATCH] Fix ClusterScheduler bug to avoid allocating tasks to same
 slave

---
 .../scheduler/cluster/ClusterScheduler.scala  | 48 +++++++++++--------
 .../scala/spark/scheduler/cluster/Pool.scala  | 20 ++++----
 .../spark/scheduler/cluster/Schedulable.scala |  3 +-
 .../scheduler/cluster/TaskSetManager.scala    |  8 +++-
 .../scheduler/ClusterSchedulerSuite.scala     | 46 +++++++++++-------
 5 files changed, 75 insertions(+), 50 deletions(-)

diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 1a300c9e8c..4caafcc1d3 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -164,27 +164,35 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       // Build a list of tasks to assign to each slave
       val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
       val availableCpus = offers.map(o => o.cores).toArray
-      for (i <- 0 until offers.size) {
-        var launchedTask = true
-        val execId = offers(i).executorId
-        val host = offers(i).hostname
-        while (availableCpus(i) > 0 && launchedTask) {
+      var launchedTask = false
+      val sortedLeafSchedulable = rootPool.getSortedLeafSchedulable()
+      for (schedulable <- sortedLeafSchedulable)
+      {
+        logDebug("parentName:%s,name:%s,runningTasks:%s".format(schedulable.parent.name,schedulable.name,schedulable.runningTasks))
+      }
+      for (schedulable <- sortedLeafSchedulable) {
+        do {
           launchedTask = false
-          rootPool.receiveOffer(execId,host,availableCpus(i)) match {
-          case Some(task) =>
-            tasks(i) += task
-            val tid = task.taskId
-            taskIdToTaskSetId(tid) = task.taskSetId
-            taskSetTaskIds(task.taskSetId) += tid
-            taskIdToExecutorId(tid) = execId
-            activeExecutorIds += execId
-            executorsByHost(host) += execId
-            availableCpus(i) -= 1
-            launchedTask = true
-
-          case None => {}
-          }
-        }
+          for (i <- 0 until offers.size) {
+            var launchedTask = true
+            val execId = offers(i).executorId
+            val host = offers(i).hostname
+            schedulable.slaveOffer(execId,host,availableCpus(i)) match {
+              case Some(task) =>
+                tasks(i) += task
+                val tid = task.taskId
+                taskIdToTaskSetId(tid) = task.taskSetId
+                taskSetTaskIds(task.taskSetId) += tid
+                taskIdToExecutorId(tid) = execId
+                activeExecutorIds += execId
+                executorsByHost(host) += execId
+                availableCpus(i) -= 1
+                launchedTask = true
+
+              case None => {}
+              }
+            }
+        } while(launchedTask)
       }
       if (tasks.size > 0) {
         hasLaunchedTask = true
diff --git a/core/src/main/scala/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
index d5482f71ad..ae603e7dd9 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Pool.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
@@ -75,19 +75,17 @@ private[spark] class Pool(
     return shouldRevive
   }
 
-  override def receiveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
+  override def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
+    return None
+  }
+
+  override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
+    var leafSchedulableQueue = new ArrayBuffer[Schedulable]
     val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
-    for (manager <- sortedSchedulableQueue) {
-      logInfo("parentName:%s,schedulableName:%s,minShares:%d,weight:%d,runningTasks:%d".format(
-        manager.parent.name, manager.name, manager.minShare, manager.weight, manager.runningTasks))
+    for (schedulable <- sortedSchedulableQueue) {
+      leafSchedulableQueue ++= schedulable.getSortedLeafSchedulable()
     }
-    for (manager <- sortedSchedulableQueue) {
-      val task = manager.receiveOffer(execId, host, availableCpus)
-      if (task != None) {
-        return task
-      }
-    }
-    return None
+    return leafSchedulableQueue
   }
 
   override def increaseRunningTasks(taskNum: Int) {
diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
index 54e8ae95f9..c620588e14 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
@@ -21,6 +21,7 @@ private[spark] trait Schedulable {
   def removeSchedulable(schedulable: Schedulable): Unit
   def getSchedulableByName(name: String): Schedulable
   def executorLost(executorId: String, host: String): Unit
-  def receiveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription]
+  def slaveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription]
   def checkSpeculatableTasks(): Boolean
+  def getSortedLeafSchedulable(): ArrayBuffer[Schedulable]
 }
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index baaaa41a37..80edbe77a1 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -198,7 +198,7 @@ private[spark] class TaskSetManager(
   }
 
   // Respond to an offer of a single slave from the scheduler by finding a task
-  override def receiveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
+  override def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
     if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
       val time = System.currentTimeMillis
       val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
@@ -398,6 +398,12 @@ private[spark] class TaskSetManager(
     //nothing
   }
 
+  override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
+    var leafSchedulableQueue = new ArrayBuffer[Schedulable]
+    leafSchedulableQueue += this
+    return leafSchedulableQueue
+  }
+
   override def executorLost(execId: String, hostname: String) {
     logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
     val newHostsAlive = sched.hostsAlive
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
index 2eda48196b..8426be7575 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
@@ -6,6 +6,7 @@ import org.scalatest.BeforeAndAfter
 import spark._
 import spark.scheduler._
 import spark.scheduler.cluster._
+import scala.collection.mutable.ArrayBuffer
 
 import java.util.Properties
 
@@ -25,34 +26,34 @@ class DummyTaskSetManager(
   var numTasks = initNumTasks
   var tasksFinished = 0
 
-  def increaseRunningTasks(taskNum: Int) {
+  override def increaseRunningTasks(taskNum: Int) {
     runningTasks += taskNum
     if (parent != null) {
       parent.increaseRunningTasks(taskNum)
     }
   }
 
-  def decreaseRunningTasks(taskNum: Int) {
+  override def decreaseRunningTasks(taskNum: Int) {
     runningTasks -= taskNum
     if (parent != null) {
       parent.decreaseRunningTasks(taskNum)
     }
   }
 
-  def addSchedulable(schedulable: Schedulable) {
+  override def addSchedulable(schedulable: Schedulable) {
   } 
   
-  def removeSchedulable(schedulable: Schedulable) {
+  override def removeSchedulable(schedulable: Schedulable) {
   }
   
-  def getSchedulableByName(name: String): Schedulable = {
+  override def getSchedulableByName(name: String): Schedulable = {
     return null
   }
 
-  def executorLost(executorId: String, host: String): Unit = {
+  override def executorLost(executorId: String, host: String): Unit = {
   }
 
-  def receiveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription] = {
+  override def slaveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription] = {
     if (tasksFinished + runningTasks < numTasks) {
       increaseRunningTasks(1)
       return Some(new TaskDescription(0, stageId.toString, execId, "task 0:0", null))
@@ -60,10 +61,16 @@ class DummyTaskSetManager(
     return None
   }
 
-  def checkSpeculatableTasks(): Boolean = {
+  override def checkSpeculatableTasks(): Boolean = {
     return true
   }
 
+  override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
+    var leafSchedulableQueue = new ArrayBuffer[Schedulable]
+    leafSchedulableQueue += this
+    return leafSchedulableQueue
+  }
+
   def taskFinished() {
     decreaseRunningTasks(1)
     tasksFinished +=1
@@ -80,16 +87,21 @@ class DummyTaskSetManager(
 
 class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
   
-  def receiveOffer(rootPool: Pool) : Option[TaskDescription] = {
-    rootPool.receiveOffer("execId_1", "hostname_1", 1)
+  def resourceOffer(rootPool: Pool): Int = {
+    val taskSetQueue = rootPool.getSortedLeafSchedulable()
+    for (taskSet <- taskSetQueue)
+    {
+      taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
+        case Some(task) =>
+          return task.taskSetId.toInt
+        case None => {}
+      }
+    }
+    -1
   }
 
   def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) {
-    receiveOffer(rootPool) match {
-      case Some(task) =>
-        assert(task.taskSetId.toInt === expectedTaskSetId)
-      case _ =>
-    }
+    assert(resourceOffer(rootPool) === expectedTaskSetId)
   }
 
   test("FIFO Scheduler Test") {
@@ -105,9 +117,9 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
     schedulableBuilder.addTaskSetManager(taskSetManager2, null)
     
     checkTaskSetId(rootPool, 0)
-    receiveOffer(rootPool)
+    resourceOffer(rootPool)
     checkTaskSetId(rootPool, 1)
-    receiveOffer(rootPool)
+    resourceOffer(rootPool)
     taskSetManager1.abort()
     checkTaskSetId(rootPool, 2)
   }
-- 
GitLab