From ecd6d75c6a88232c40070baed3dd67bdf77f0c69 Mon Sep 17 00:00:00 2001
From: Andrew xia <junluan.xia@intel.com>
Date: Tue, 21 May 2013 06:49:23 +0800
Subject: [PATCH] fix bug of unit tests

---
 .../scheduler/cluster/ClusterScheduler.scala  |  6 +-
 .../scheduler/ClusterSchedulerSuite.scala     | 72 ++++++++++---------
 2 files changed, 43 insertions(+), 35 deletions(-)

diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 9547f4f6dd..053d4b8e4a 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -352,7 +352,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
                 executorsByHostPort(hostPort) += execId
                 availableCpus(i) -= 1
                 launchedTask = true
-                
+
               case None => {}
               }
             }
@@ -373,7 +373,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
           }
         } while (launchedTask)
       }
-      
+
       if (tasks.size > 0) {
         hasLaunchedTask = true
       }
@@ -522,7 +522,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       hostPortsAlive -= hostPort
       hostToAliveHostPorts.getOrElseUpdate(Utils.parseHostPort(hostPort)._1, new HashSet[String]).remove(hostPort)
     }
-      
+
     val execs = executorsByHostPort.getOrElse(hostPort, new HashSet)
     execs -= executorId
     if (execs.isEmpty) {
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
index 7af749fb5c..a39418b716 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
@@ -67,12 +67,6 @@ class DummyTaskSetManager(
     return true
   }
 
-//  override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
-//    var leafSchedulableQueue = new ArrayBuffer[Schedulable]
-//    leafSchedulableQueue += this
-//    return leafSchedulableQueue
-//  }
-
   def taskFinished() {
     decreaseRunningTasks(1)
     tasksFinished +=1
@@ -94,17 +88,10 @@ class DummyTask(stageId: Int) extends Task[Int](stageId)
   }
 }
 
-class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
-
-  val sc = new SparkContext("local", "ClusterSchedulerSuite")
-  val clusterScheduler = new ClusterScheduler(sc)
-  var tasks = ArrayBuffer[Task[_]]()
-  val task = new DummyTask(0)
-  val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
-  tasks += task
+class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
 
-  def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int): DummyTaskSetManager = {
-    new DummyTaskSetManager(priority, stage, numTasks, clusterScheduler, taskSet)
+  def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = {
+    new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet)
   }
 
   def resourceOffer(rootPool: Pool): Int = {
@@ -125,13 +112,20 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
   }
 
   test("FIFO Scheduler Test") {
+    sc = new SparkContext("local", "ClusterSchedulerSuite")
+    val clusterScheduler = new ClusterScheduler(sc)
+    var tasks = ArrayBuffer[Task[_]]()
+    val task = new DummyTask(0)
+    tasks += task
+    val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
     val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
     val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
     schedulableBuilder.buildPools()
 
-    val taskSetManager0 = createDummyTaskSetManager(0, 0, 2)
-    val taskSetManager1 = createDummyTaskSetManager(0, 1, 2)
-    val taskSetManager2 = createDummyTaskSetManager(0, 2, 2)
+    val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet)
+    val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet)
+    val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet)
     schedulableBuilder.addTaskSetManager(taskSetManager0, null)
     schedulableBuilder.addTaskSetManager(taskSetManager1, null)
     schedulableBuilder.addTaskSetManager(taskSetManager2, null)
@@ -145,6 +139,13 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
   }
 
   test("Fair Scheduler Test") {
+    sc = new SparkContext("local", "ClusterSchedulerSuite")
+    val clusterScheduler = new ClusterScheduler(sc)
+    var tasks = ArrayBuffer[Task[_]]()
+    val task = new DummyTask(0)
+    tasks += task
+    val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
     val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
     System.setProperty("spark.fairscheduler.allocation.file", xmlPath)
     val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
@@ -167,15 +168,15 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
     val properties2 = new Properties()
     properties2.setProperty("spark.scheduler.cluster.fair.pool","2")
 
-    val taskSetManager10 = createDummyTaskSetManager(1, 0, 1)
-    val taskSetManager11 = createDummyTaskSetManager(1, 1, 1)
-    val taskSetManager12 = createDummyTaskSetManager(1, 2, 2)
+    val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet)
+    val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet)
+    val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet)
     schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
     schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
     schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
 
-    val taskSetManager23 = createDummyTaskSetManager(2, 3, 2)
-    val taskSetManager24 = createDummyTaskSetManager(2, 4, 2)
+    val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet)
+    val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet)
     schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
     schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
 
@@ -195,6 +196,13 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
   }
 
   test("Nested Pool Test") {
+    sc = new SparkContext("local", "ClusterSchedulerSuite")
+    val clusterScheduler = new ClusterScheduler(sc)
+    var tasks = ArrayBuffer[Task[_]]()
+    val task = new DummyTask(0)
+    tasks += task
+    val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
     val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
     val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
     val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
@@ -211,23 +219,23 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
     pool1.addSchedulable(pool10)
     pool1.addSchedulable(pool11)
 
-    val taskSetManager000 = createDummyTaskSetManager(0, 0, 5)
-    val taskSetManager001 = createDummyTaskSetManager(0, 1, 5)
+    val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet)
+    val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet)
     pool00.addSchedulable(taskSetManager000)
     pool00.addSchedulable(taskSetManager001)
 
-    val taskSetManager010 = createDummyTaskSetManager(1, 2, 5)
-    val taskSetManager011 = createDummyTaskSetManager(1, 3, 5)
+    val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet)
+    val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet)
     pool01.addSchedulable(taskSetManager010)
     pool01.addSchedulable(taskSetManager011)
 
-    val taskSetManager100 = createDummyTaskSetManager(2, 4, 5)
-    val taskSetManager101 = createDummyTaskSetManager(2, 5, 5)
+    val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet)
+    val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet)
     pool10.addSchedulable(taskSetManager100)
     pool10.addSchedulable(taskSetManager101)
 
-    val taskSetManager110 = createDummyTaskSetManager(3, 6, 5)
-    val taskSetManager111 = createDummyTaskSetManager(3, 7, 5)
+    val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet)
+    val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet)
     pool11.addSchedulable(taskSetManager110)
     pool11.addSchedulable(taskSetManager111)
 
-- 
GitLab