diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..e8f461e2f56c9e705225a66c8fd298b9e7fb3845 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.Properties + +import org.scalatest.FunSuite + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext} + +/** + * Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work + * correctly. + */ +class PoolSuite extends FunSuite with LocalSparkContext { + + def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl) + : TaskSetManager = { + val tasks = Array.tabulate[Task[_]](numTasks) { i => + new FakeTask(i, Nil) + } + new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0) + } + + def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int) { + val taskSetQueue = rootPool.getSortedTaskSetQueue + val nextTaskSetToSchedule = + taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks) + assert(nextTaskSetToSchedule.isDefined) + nextTaskSetToSchedule.get.addRunningTask(taskId) + assert(nextTaskSetToSchedule.get.stageId === expectedStageId) + } + + test("FIFO Scheduler Test") { + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0) + val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) + schedulableBuilder.buildPools() + + val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) + val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) + val taskSetManager2 = createTaskSetManager(2, 2, taskScheduler) + schedulableBuilder.addTaskSetManager(taskSetManager0, null) + schedulableBuilder.addTaskSetManager(taskSetManager1, null) + schedulableBuilder.addTaskSetManager(taskSetManager2, null) + + scheduleTaskAndVerifyId(0, rootPool, 0) + scheduleTaskAndVerifyId(1, rootPool, 0) + scheduleTaskAndVerifyId(2, rootPool, 1) + scheduleTaskAndVerifyId(3, rootPool, 1) + scheduleTaskAndVerifyId(4, rootPool, 2) + scheduleTaskAndVerifyId(5, rootPool, 2) + } + + /** + * This test creates three scheduling pools, and creates task set managers in the first + * two scheduling pools. The test verifies that as tasks are scheduled, the fair scheduling + * algorithm properly orders the two scheduling pools. + */ + test("Fair Scheduler Test") { + val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() + val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath) + sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + schedulableBuilder.buildPools() + + // Ensure that the XML file was read in correctly. + assert(rootPool.getSchedulableByName("default") != null) + assert(rootPool.getSchedulableByName("1") != null) + assert(rootPool.getSchedulableByName("2") != null) + assert(rootPool.getSchedulableByName("3") != null) + assert(rootPool.getSchedulableByName("1").minShare === 2) + assert(rootPool.getSchedulableByName("1").weight === 1) + assert(rootPool.getSchedulableByName("2").minShare === 3) + assert(rootPool.getSchedulableByName("2").weight === 1) + assert(rootPool.getSchedulableByName("3").minShare === 0) + assert(rootPool.getSchedulableByName("3").weight === 1) + + val properties1 = new Properties() + properties1.setProperty("spark.scheduler.pool","1") + val properties2 = new Properties() + properties2.setProperty("spark.scheduler.pool","2") + + val taskSetManager10 = createTaskSetManager(0, 1, taskScheduler) + val taskSetManager11 = createTaskSetManager(1, 1, taskScheduler) + val taskSetManager12 = createTaskSetManager(2, 2, taskScheduler) + schedulableBuilder.addTaskSetManager(taskSetManager10, properties1) + schedulableBuilder.addTaskSetManager(taskSetManager11, properties1) + schedulableBuilder.addTaskSetManager(taskSetManager12, properties1) + + val taskSetManager23 = createTaskSetManager(3, 2, taskScheduler) + val taskSetManager24 = createTaskSetManager(4, 2, taskScheduler) + schedulableBuilder.addTaskSetManager(taskSetManager23, properties2) + schedulableBuilder.addTaskSetManager(taskSetManager24, properties2) + + // Pool 1 share ratio: 0. Pool 2 share ratio: 0. 1 gets scheduled based on ordering of names. + scheduleTaskAndVerifyId(0, rootPool, 0) + // Pool 1 share ratio: 1/2. Pool 2 share ratio: 0. 2 gets scheduled because ratio is lower. + scheduleTaskAndVerifyId(1, rootPool, 3) + // Pool 1 share ratio: 1/2. Pool 2 share ratio: 1/3. 2 gets scheduled because ratio is lower. + scheduleTaskAndVerifyId(2, rootPool, 3) + // Pool 1 share ratio: 1/2. Pool 2 share ratio: 2/3. 1 gets scheduled because ratio is lower. + scheduleTaskAndVerifyId(3, rootPool, 1) + // Pool 1 share ratio: 1. Pool 2 share ratio: 2/3. 2 gets scheduled because ratio is lower. + scheduleTaskAndVerifyId(4, rootPool, 4) + // Neither pool is needy so ordering is based on number of running tasks. + // Pool 1 running tasks: 2, Pool 2 running tasks: 3. 1 gets scheduled because fewer running + // tasks. + scheduleTaskAndVerifyId(5, rootPool, 2) + // Pool 1 running tasks: 3, Pool 2 running tasks: 3. 1 gets scheduled because of naming + // ordering. + scheduleTaskAndVerifyId(6, rootPool, 2) + // Pool 1 running tasks: 4, Pool 2 running tasks: 3. 2 gets scheduled because fewer running + // tasks. + scheduleTaskAndVerifyId(7, rootPool, 4) + } + + test("Nested Pool Test") { + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1) + val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1) + rootPool.addSchedulable(pool0) + rootPool.addSchedulable(pool1) + + val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2) + val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1) + pool0.addSchedulable(pool00) + pool0.addSchedulable(pool01) + + val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2) + val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1) + pool1.addSchedulable(pool10) + pool1.addSchedulable(pool11) + + val taskSetManager000 = createTaskSetManager(0, 5, taskScheduler) + val taskSetManager001 = createTaskSetManager(1, 5, taskScheduler) + pool00.addSchedulable(taskSetManager000) + pool00.addSchedulable(taskSetManager001) + + val taskSetManager010 = createTaskSetManager(2, 5, taskScheduler) + val taskSetManager011 = createTaskSetManager(3, 5, taskScheduler) + pool01.addSchedulable(taskSetManager010) + pool01.addSchedulable(taskSetManager011) + + val taskSetManager100 = createTaskSetManager(4, 5, taskScheduler) + val taskSetManager101 = createTaskSetManager(5, 5, taskScheduler) + pool10.addSchedulable(taskSetManager100) + pool10.addSchedulable(taskSetManager101) + + val taskSetManager110 = createTaskSetManager(6, 5, taskScheduler) + val taskSetManager111 = createTaskSetManager(7, 5, taskScheduler) + pool11.addSchedulable(taskSetManager110) + pool11.addSchedulable(taskSetManager111) + + scheduleTaskAndVerifyId(0, rootPool, 0) + scheduleTaskAndVerifyId(1, rootPool, 4) + scheduleTaskAndVerifyId(2, rootPool, 6) + scheduleTaskAndVerifyId(3, rootPool, 2) + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 00812e6018d1f6c0f17519266ae4f5caf3ec0d7c..8874cf00e999336819bca053b664a6477b89d3a9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -30,238 +30,8 @@ class FakeSchedulerBackend extends SchedulerBackend { def defaultParallelism() = 1 } -class FakeTaskSetManager( - initPriority: Int, - initStageId: Int, - initNumTasks: Int, - taskScheduler: TaskSchedulerImpl, - taskSet: TaskSet) - extends TaskSetManager(taskScheduler, taskSet, 0) { - - parent = null - weight = 1 - minShare = 2 - priority = initPriority - stageId = initStageId - name = "TaskSet_"+stageId - override val numTasks = initNumTasks - tasksSuccessful = 0 - - var numRunningTasks = 0 - override def runningTasks = numRunningTasks - - def increaseRunningTasks(taskNum: Int) { - numRunningTasks += taskNum - if (parent != null) { - parent.increaseRunningTasks(taskNum) - } - } - - def decreaseRunningTasks(taskNum: Int) { - numRunningTasks -= taskNum - if (parent != null) { - parent.decreaseRunningTasks(taskNum) - } - } - - override def addSchedulable(schedulable: Schedulable) { - } - - override def removeSchedulable(schedulable: Schedulable) { - } - - override def getSchedulableByName(name: String): Schedulable = { - null - } - - override def executorLost(executorId: String, host: String): Unit = { - } - - override def resourceOffer( - execId: String, - host: String, - maxLocality: TaskLocality.TaskLocality) - : Option[TaskDescription] = - { - if (tasksSuccessful + numRunningTasks < numTasks) { - increaseRunningTasks(1) - Some(new TaskDescription(0, execId, "task 0:0", 0, null)) - } else { - None - } - } - - override def checkSpeculatableTasks(): Boolean = { - true - } - - def taskFinished() { - decreaseRunningTasks(1) - tasksSuccessful +=1 - if (tasksSuccessful == numTasks) { - parent.removeSchedulable(this) - } - } - - def abort() { - decreaseRunningTasks(numRunningTasks) - parent.removeSchedulable(this) - } -} - class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Logging { - def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl, - taskSet: TaskSet): FakeTaskSetManager = { - new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet) - } - - def resourceOffer(rootPool: Pool): Int = { - val taskSetQueue = rootPool.getSortedTaskSetQueue - /* Just for Test*/ - for (manager <- taskSetQueue) { - logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format( - manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks)) - } - for (taskSet <- taskSetQueue) { - taskSet.resourceOffer("execId_1", "hostname_1", TaskLocality.ANY) match { - case Some(task) => - return taskSet.stageId - case None => {} - } - } - -1 - } - - def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) { - assert(resourceOffer(rootPool) === expectedTaskSetId) - } - - test("FIFO Scheduler Test") { - sc = new SparkContext("local", "TaskSchedulerImplSuite") - val taskScheduler = new TaskSchedulerImpl(sc) - val taskSet = FakeTask.createTaskSet(1) - - val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0) - val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) - schedulableBuilder.buildPools() - - val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, taskScheduler, taskSet) - val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, taskScheduler, taskSet) - val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, taskScheduler, taskSet) - schedulableBuilder.addTaskSetManager(taskSetManager0, null) - schedulableBuilder.addTaskSetManager(taskSetManager1, null) - schedulableBuilder.addTaskSetManager(taskSetManager2, null) - - checkTaskSetId(rootPool, 0) - resourceOffer(rootPool) - checkTaskSetId(rootPool, 1) - resourceOffer(rootPool) - taskSetManager1.abort() - checkTaskSetId(rootPool, 2) - } - - test("Fair Scheduler Test") { - val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath) - sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) - val taskScheduler = new TaskSchedulerImpl(sc) - val taskSet = FakeTask.createTaskSet(1) - - val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) - schedulableBuilder.buildPools() - - assert(rootPool.getSchedulableByName("default") != null) - assert(rootPool.getSchedulableByName("1") != null) - assert(rootPool.getSchedulableByName("2") != null) - assert(rootPool.getSchedulableByName("3") != null) - assert(rootPool.getSchedulableByName("1").minShare === 2) - assert(rootPool.getSchedulableByName("1").weight === 1) - assert(rootPool.getSchedulableByName("2").minShare === 3) - assert(rootPool.getSchedulableByName("2").weight === 1) - assert(rootPool.getSchedulableByName("3").minShare === 0) - assert(rootPool.getSchedulableByName("3").weight === 1) - - val properties1 = new Properties() - properties1.setProperty("spark.scheduler.pool","1") - val properties2 = new Properties() - properties2.setProperty("spark.scheduler.pool","2") - - val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, taskScheduler, taskSet) - val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, taskScheduler, taskSet) - val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, taskScheduler, taskSet) - schedulableBuilder.addTaskSetManager(taskSetManager10, properties1) - schedulableBuilder.addTaskSetManager(taskSetManager11, properties1) - schedulableBuilder.addTaskSetManager(taskSetManager12, properties1) - - val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, taskScheduler, taskSet) - val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, taskScheduler, taskSet) - schedulableBuilder.addTaskSetManager(taskSetManager23, properties2) - schedulableBuilder.addTaskSetManager(taskSetManager24, properties2) - - checkTaskSetId(rootPool, 0) - checkTaskSetId(rootPool, 3) - checkTaskSetId(rootPool, 3) - checkTaskSetId(rootPool, 1) - checkTaskSetId(rootPool, 4) - checkTaskSetId(rootPool, 2) - checkTaskSetId(rootPool, 2) - checkTaskSetId(rootPool, 4) - - taskSetManager12.taskFinished() - assert(rootPool.getSchedulableByName("1").runningTasks === 3) - taskSetManager24.abort() - assert(rootPool.getSchedulableByName("2").runningTasks === 2) - } - - test("Nested Pool Test") { - sc = new SparkContext("local", "TaskSchedulerImplSuite") - val taskScheduler = new TaskSchedulerImpl(sc) - val taskSet = FakeTask.createTaskSet(1) - - val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1) - val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1) - rootPool.addSchedulable(pool0) - rootPool.addSchedulable(pool1) - - val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2) - val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1) - pool0.addSchedulable(pool00) - pool0.addSchedulable(pool01) - - val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2) - val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1) - pool1.addSchedulable(pool10) - pool1.addSchedulable(pool11) - - val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, taskScheduler, taskSet) - val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, taskScheduler, taskSet) - pool00.addSchedulable(taskSetManager000) - pool00.addSchedulable(taskSetManager001) - - val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, taskScheduler, taskSet) - val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, taskScheduler, taskSet) - pool01.addSchedulable(taskSetManager010) - pool01.addSchedulable(taskSetManager011) - - val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, taskScheduler, taskSet) - val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, taskScheduler, taskSet) - pool10.addSchedulable(taskSetManager100) - pool10.addSchedulable(taskSetManager101) - - val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, taskScheduler, taskSet) - val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, taskScheduler, taskSet) - pool11.addSchedulable(taskSetManager110) - pool11.addSchedulable(taskSetManager111) - - checkTaskSetId(rootPool, 0) - checkTaskSetId(rootPool, 4) - checkTaskSetId(rootPool, 6) - checkTaskSetId(rootPool, 2) - } - test("Scheduler does not always schedule tasks on the same workers") { sc = new SparkContext("local", "TaskSchedulerImplSuite") val taskScheduler = new TaskSchedulerImpl(sc)