Skip to content
Snippets Groups Projects
Commit 046b8d4a authored by erenavsarogullari's avatar erenavsarogullari Committed by Kay Ousterhout
Browse files

[SPARK-18066][CORE][TESTS] Add Pool usage policies test coverage for FIFO & FAIR Schedulers

## What changes were proposed in this pull request?

The following FIFO & FAIR Schedulers Pool usage cases need to have unit test coverage :
- FIFO Scheduler just uses **root pool** so even if `spark.scheduler.pool` property is set, related pool is not created and `TaskSetManagers` are added to **root pool**.
- FAIR Scheduler uses `default pool` when `spark.scheduler.pool` property is not set. This can be happened when
  - `Properties` object is **null**,
  - `Properties` object is **empty**(`new Properties()`),
  - **default pool** is set(`spark.scheduler.pool=default`).
- FAIR Scheduler creates a **new pool** with **default values** when `spark.scheduler.pool` property points a **non-existent** pool. This can be happened when **scheduler allocation file** is not set or it does not contain related pool.
## How was this patch tested?

New Unit tests are added.

Author: erenavsarogullari <erenavsarogullari@gmail.com>

Closes #15604 from erenavsarogullari/SPARK-18066.
parent 54a3697f
No related branches found
No related tags found
No related merge requests found
......@@ -191,8 +191,11 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
logWarning(s"A job was submitted with scheduler pool $poolName, which has not been " +
"configured. This can happen when the file that pools are read from isn't set, or " +
s"when that file doesn't contain $poolName. Created $poolName with default " +
s"configuration (schedulingMode: $DEFAULT_SCHEDULING_MODE, " +
s"minShare: $DEFAULT_MINIMUM_SHARE, weight: $DEFAULT_WEIGHT)")
}
}
parentPool.addSchedulable(manager)
......
......@@ -31,6 +31,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val LOCAL = "local"
val APP_NAME = "PoolSuite"
val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"
val TEST_POOL = "testPool"
def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl)
: TaskSetManager = {
......@@ -40,7 +41,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0)
}
def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int) {
def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int): Unit = {
val taskSetQueue = rootPool.getSortedTaskSetQueue
val nextTaskSetToSchedule =
taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks)
......@@ -201,12 +202,96 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
verifyPool(rootPool, "pool_with_surrounded_whitespace", 3, 2, FAIR)
}
/**
* spark.scheduler.pool property should be ignored for the FIFO scheduler,
* because pools are only needed for fair scheduling.
*/
test("FIFO scheduler uses root pool and not spark.scheduler.pool property") {
sc = new SparkContext("local", "PoolSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
val rootPool = new Pool("", SchedulingMode.FIFO, initMinShare = 0, initWeight = 0)
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler)
val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler)
val properties = new Properties()
properties.setProperty("spark.scheduler.pool", TEST_POOL)
// When FIFO Scheduler is used and task sets are submitted, they should be added to
// the root pool, and no additional pools should be created
// (even though there's a configured default pool).
schedulableBuilder.addTaskSetManager(taskSetManager0, properties)
schedulableBuilder.addTaskSetManager(taskSetManager1, properties)
assert(rootPool.getSchedulableByName(TEST_POOL) === null)
assert(rootPool.schedulableQueue.size === 2)
assert(rootPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0)
assert(rootPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1)
}
test("FAIR Scheduler uses default pool when spark.scheduler.pool property is not set") {
sc = new SparkContext("local", "PoolSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
schedulableBuilder.buildPools()
// Submit a new task set manager with pool properties set to null. This should result
// in the task set manager getting added to the default pool.
val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler)
schedulableBuilder.addTaskSetManager(taskSetManager0, null)
val defaultPool = rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME)
assert(defaultPool !== null)
assert(defaultPool.schedulableQueue.size === 1)
assert(defaultPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0)
// When a task set manager is submitted with spark.scheduler.pool unset, it should be added to
// the default pool (as above).
val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler)
schedulableBuilder.addTaskSetManager(taskSetManager1, new Properties())
assert(defaultPool.schedulableQueue.size === 2)
assert(defaultPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1)
}
test("FAIR Scheduler creates a new pool when spark.scheduler.pool property points to " +
"a non-existent pool") {
sc = new SparkContext("local", "PoolSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
schedulableBuilder.buildPools()
assert(rootPool.getSchedulableByName(TEST_POOL) === null)
val taskSetManager = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler)
val properties = new Properties()
properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, TEST_POOL)
// The fair scheduler should create a new pool with default values when spark.scheduler.pool
// points to a pool that doesn't exist yet (this can happen when the file that pools are read
// from isn't set, or when that file doesn't contain the pool name specified
// by spark.scheduler.pool).
schedulableBuilder.addTaskSetManager(taskSetManager, properties)
verifyPool(rootPool, TEST_POOL, schedulableBuilder.DEFAULT_MINIMUM_SHARE,
schedulableBuilder.DEFAULT_WEIGHT, schedulableBuilder.DEFAULT_SCHEDULING_MODE)
val testPool = rootPool.getSchedulableByName(TEST_POOL)
assert(testPool.getSchedulableByName(taskSetManager.name) === taskSetManager)
}
private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
assert(rootPool.getSchedulableByName(poolName) != null)
assert(rootPool.getSchedulableByName(poolName).minShare === expectedInitMinShare)
assert(rootPool.getSchedulableByName(poolName).weight === expectedInitWeight)
assert(rootPool.getSchedulableByName(poolName).schedulingMode === expectedSchedulingMode)
val selectedPool = rootPool.getSchedulableByName(poolName)
assert(selectedPool !== null)
assert(selectedPool.minShare === expectedInitMinShare)
assert(selectedPool.weight === expectedInitWeight)
assert(selectedPool.schedulingMode === expectedSchedulingMode)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment