Skip to content
Snippets Groups Projects
Commit b6aa5573 authored by Kay Ousterhout's avatar Kay Ousterhout Committed by Patrick Wendell
Browse files

[SPARK-1143] Separate pool tests into their own suite.

The current TaskSchedulerImplSuite includes some tests that are
actually for the TaskSchedulerImpl, but the remainder of the tests avoid using
the TaskSchedulerImpl entirely, and actually test the pool and scheduling
algorithm mechanisms. This commit separates the pool/scheduling algorithm
tests into their own suite, and also simplifies those tests.

The pull request replaces #339.

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #3967 from kayousterhout/SPARK-1143 and squashes the following commits:

8a898c4 [Kay Ousterhout] [SPARK-1143] Separate pool tests into their own suite.
parent 1790b386
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler
import java.util.Properties
import org.scalatest.FunSuite
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
/**
* Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work
* correctly.
*/
class PoolSuite extends FunSuite with LocalSparkContext {
def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl)
: TaskSetManager = {
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
new FakeTask(i, Nil)
}
new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0)
}
def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int) {
val taskSetQueue = rootPool.getSortedTaskSetQueue
val nextTaskSetToSchedule =
taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks)
assert(nextTaskSetToSchedule.isDefined)
nextTaskSetToSchedule.get.addRunningTask(taskId)
assert(nextTaskSetToSchedule.get.stageId === expectedStageId)
}
test("FIFO Scheduler Test") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()
val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler)
val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler)
val taskSetManager2 = createTaskSetManager(2, 2, taskScheduler)
schedulableBuilder.addTaskSetManager(taskSetManager0, null)
schedulableBuilder.addTaskSetManager(taskSetManager1, null)
schedulableBuilder.addTaskSetManager(taskSetManager2, null)
scheduleTaskAndVerifyId(0, rootPool, 0)
scheduleTaskAndVerifyId(1, rootPool, 0)
scheduleTaskAndVerifyId(2, rootPool, 1)
scheduleTaskAndVerifyId(3, rootPool, 1)
scheduleTaskAndVerifyId(4, rootPool, 2)
scheduleTaskAndVerifyId(5, rootPool, 2)
}
/**
* This test creates three scheduling pools, and creates task set managers in the first
* two scheduling pools. The test verifies that as tasks are scheduled, the fair scheduling
* algorithm properly orders the two scheduling pools.
*/
test("Fair Scheduler Test") {
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
val taskScheduler = new TaskSchedulerImpl(sc)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
schedulableBuilder.buildPools()
// Ensure that the XML file was read in correctly.
assert(rootPool.getSchedulableByName("default") != null)
assert(rootPool.getSchedulableByName("1") != null)
assert(rootPool.getSchedulableByName("2") != null)
assert(rootPool.getSchedulableByName("3") != null)
assert(rootPool.getSchedulableByName("1").minShare === 2)
assert(rootPool.getSchedulableByName("1").weight === 1)
assert(rootPool.getSchedulableByName("2").minShare === 3)
assert(rootPool.getSchedulableByName("2").weight === 1)
assert(rootPool.getSchedulableByName("3").minShare === 0)
assert(rootPool.getSchedulableByName("3").weight === 1)
val properties1 = new Properties()
properties1.setProperty("spark.scheduler.pool","1")
val properties2 = new Properties()
properties2.setProperty("spark.scheduler.pool","2")
val taskSetManager10 = createTaskSetManager(0, 1, taskScheduler)
val taskSetManager11 = createTaskSetManager(1, 1, taskScheduler)
val taskSetManager12 = createTaskSetManager(2, 2, taskScheduler)
schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
val taskSetManager23 = createTaskSetManager(3, 2, taskScheduler)
val taskSetManager24 = createTaskSetManager(4, 2, taskScheduler)
schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
// Pool 1 share ratio: 0. Pool 2 share ratio: 0. 1 gets scheduled based on ordering of names.
scheduleTaskAndVerifyId(0, rootPool, 0)
// Pool 1 share ratio: 1/2. Pool 2 share ratio: 0. 2 gets scheduled because ratio is lower.
scheduleTaskAndVerifyId(1, rootPool, 3)
// Pool 1 share ratio: 1/2. Pool 2 share ratio: 1/3. 2 gets scheduled because ratio is lower.
scheduleTaskAndVerifyId(2, rootPool, 3)
// Pool 1 share ratio: 1/2. Pool 2 share ratio: 2/3. 1 gets scheduled because ratio is lower.
scheduleTaskAndVerifyId(3, rootPool, 1)
// Pool 1 share ratio: 1. Pool 2 share ratio: 2/3. 2 gets scheduled because ratio is lower.
scheduleTaskAndVerifyId(4, rootPool, 4)
// Neither pool is needy so ordering is based on number of running tasks.
// Pool 1 running tasks: 2, Pool 2 running tasks: 3. 1 gets scheduled because fewer running
// tasks.
scheduleTaskAndVerifyId(5, rootPool, 2)
// Pool 1 running tasks: 3, Pool 2 running tasks: 3. 1 gets scheduled because of naming
// ordering.
scheduleTaskAndVerifyId(6, rootPool, 2)
// Pool 1 running tasks: 4, Pool 2 running tasks: 3. 2 gets scheduled because fewer running
// tasks.
scheduleTaskAndVerifyId(7, rootPool, 4)
}
test("Nested Pool Test") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
rootPool.addSchedulable(pool0)
rootPool.addSchedulable(pool1)
val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
pool0.addSchedulable(pool00)
pool0.addSchedulable(pool01)
val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
pool1.addSchedulable(pool10)
pool1.addSchedulable(pool11)
val taskSetManager000 = createTaskSetManager(0, 5, taskScheduler)
val taskSetManager001 = createTaskSetManager(1, 5, taskScheduler)
pool00.addSchedulable(taskSetManager000)
pool00.addSchedulable(taskSetManager001)
val taskSetManager010 = createTaskSetManager(2, 5, taskScheduler)
val taskSetManager011 = createTaskSetManager(3, 5, taskScheduler)
pool01.addSchedulable(taskSetManager010)
pool01.addSchedulable(taskSetManager011)
val taskSetManager100 = createTaskSetManager(4, 5, taskScheduler)
val taskSetManager101 = createTaskSetManager(5, 5, taskScheduler)
pool10.addSchedulable(taskSetManager100)
pool10.addSchedulable(taskSetManager101)
val taskSetManager110 = createTaskSetManager(6, 5, taskScheduler)
val taskSetManager111 = createTaskSetManager(7, 5, taskScheduler)
pool11.addSchedulable(taskSetManager110)
pool11.addSchedulable(taskSetManager111)
scheduleTaskAndVerifyId(0, rootPool, 0)
scheduleTaskAndVerifyId(1, rootPool, 4)
scheduleTaskAndVerifyId(2, rootPool, 6)
scheduleTaskAndVerifyId(3, rootPool, 2)
}
}
...@@ -30,238 +30,8 @@ class FakeSchedulerBackend extends SchedulerBackend { ...@@ -30,238 +30,8 @@ class FakeSchedulerBackend extends SchedulerBackend {
def defaultParallelism() = 1 def defaultParallelism() = 1
} }
class FakeTaskSetManager(
initPriority: Int,
initStageId: Int,
initNumTasks: Int,
taskScheduler: TaskSchedulerImpl,
taskSet: TaskSet)
extends TaskSetManager(taskScheduler, taskSet, 0) {
parent = null
weight = 1
minShare = 2
priority = initPriority
stageId = initStageId
name = "TaskSet_"+stageId
override val numTasks = initNumTasks
tasksSuccessful = 0
var numRunningTasks = 0
override def runningTasks = numRunningTasks
def increaseRunningTasks(taskNum: Int) {
numRunningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
def decreaseRunningTasks(taskNum: Int) {
numRunningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
}
}
override def addSchedulable(schedulable: Schedulable) {
}
override def removeSchedulable(schedulable: Schedulable) {
}
override def getSchedulableByName(name: String): Schedulable = {
null
}
override def executorLost(executorId: String, host: String): Unit = {
}
override def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (tasksSuccessful + numRunningTasks < numTasks) {
increaseRunningTasks(1)
Some(new TaskDescription(0, execId, "task 0:0", 0, null))
} else {
None
}
}
override def checkSpeculatableTasks(): Boolean = {
true
}
def taskFinished() {
decreaseRunningTasks(1)
tasksSuccessful +=1
if (tasksSuccessful == numTasks) {
parent.removeSchedulable(this)
}
}
def abort() {
decreaseRunningTasks(numRunningTasks)
parent.removeSchedulable(this)
}
}
class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Logging { class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Logging {
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl,
taskSet: TaskSet): FakeTaskSetManager = {
new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
}
def resourceOffer(rootPool: Pool): Int = {
val taskSetQueue = rootPool.getSortedTaskSetQueue
/* Just for Test*/
for (manager <- taskSetQueue) {
logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(
manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
}
for (taskSet <- taskSetQueue) {
taskSet.resourceOffer("execId_1", "hostname_1", TaskLocality.ANY) match {
case Some(task) =>
return taskSet.stageId
case None => {}
}
}
-1
}
def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) {
assert(resourceOffer(rootPool) === expectedTaskSetId)
}
test("FIFO Scheduler Test") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
val taskSet = FakeTask.createTaskSet(1)
val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()
val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, taskScheduler, taskSet)
val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, taskScheduler, taskSet)
val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, taskScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager0, null)
schedulableBuilder.addTaskSetManager(taskSetManager1, null)
schedulableBuilder.addTaskSetManager(taskSetManager2, null)
checkTaskSetId(rootPool, 0)
resourceOffer(rootPool)
checkTaskSetId(rootPool, 1)
resourceOffer(rootPool)
taskSetManager1.abort()
checkTaskSetId(rootPool, 2)
}
test("Fair Scheduler Test") {
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
val taskScheduler = new TaskSchedulerImpl(sc)
val taskSet = FakeTask.createTaskSet(1)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
schedulableBuilder.buildPools()
assert(rootPool.getSchedulableByName("default") != null)
assert(rootPool.getSchedulableByName("1") != null)
assert(rootPool.getSchedulableByName("2") != null)
assert(rootPool.getSchedulableByName("3") != null)
assert(rootPool.getSchedulableByName("1").minShare === 2)
assert(rootPool.getSchedulableByName("1").weight === 1)
assert(rootPool.getSchedulableByName("2").minShare === 3)
assert(rootPool.getSchedulableByName("2").weight === 1)
assert(rootPool.getSchedulableByName("3").minShare === 0)
assert(rootPool.getSchedulableByName("3").weight === 1)
val properties1 = new Properties()
properties1.setProperty("spark.scheduler.pool","1")
val properties2 = new Properties()
properties2.setProperty("spark.scheduler.pool","2")
val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, taskScheduler, taskSet)
val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, taskScheduler, taskSet)
val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, taskScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, taskScheduler, taskSet)
val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, taskScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
checkTaskSetId(rootPool, 0)
checkTaskSetId(rootPool, 3)
checkTaskSetId(rootPool, 3)
checkTaskSetId(rootPool, 1)
checkTaskSetId(rootPool, 4)
checkTaskSetId(rootPool, 2)
checkTaskSetId(rootPool, 2)
checkTaskSetId(rootPool, 4)
taskSetManager12.taskFinished()
assert(rootPool.getSchedulableByName("1").runningTasks === 3)
taskSetManager24.abort()
assert(rootPool.getSchedulableByName("2").runningTasks === 2)
}
test("Nested Pool Test") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
val taskSet = FakeTask.createTaskSet(1)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
rootPool.addSchedulable(pool0)
rootPool.addSchedulable(pool1)
val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
pool0.addSchedulable(pool00)
pool0.addSchedulable(pool01)
val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
pool1.addSchedulable(pool10)
pool1.addSchedulable(pool11)
val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, taskScheduler, taskSet)
val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, taskScheduler, taskSet)
pool00.addSchedulable(taskSetManager000)
pool00.addSchedulable(taskSetManager001)
val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, taskScheduler, taskSet)
val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, taskScheduler, taskSet)
pool01.addSchedulable(taskSetManager010)
pool01.addSchedulable(taskSetManager011)
val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, taskScheduler, taskSet)
val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, taskScheduler, taskSet)
pool10.addSchedulable(taskSetManager100)
pool10.addSchedulable(taskSetManager101)
val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, taskScheduler, taskSet)
val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, taskScheduler, taskSet)
pool11.addSchedulable(taskSetManager110)
pool11.addSchedulable(taskSetManager111)
checkTaskSetId(rootPool, 0)
checkTaskSetId(rootPool, 4)
checkTaskSetId(rootPool, 6)
checkTaskSetId(rootPool, 2)
}
test("Scheduler does not always schedule tasks on the same workers") { test("Scheduler does not always schedule tasks on the same workers") {
sc = new SparkContext("local", "TaskSchedulerImplSuite") sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc) val taskScheduler = new TaskSchedulerImpl(sc)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment