Skip to content
Snippets Groups Projects
Commit ad42b283 authored by Sandy Ryza's avatar Sandy Ryza Committed by Andrew Or
Browse files

SPARK-4214. With dynamic allocation, avoid outstanding requests for more...

... executors than pending tasks need.

WIP. Still need to add and fix tests.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #3204 from sryza/sandy-spark-4214 and squashes the following commits:

35cf0e0 [Sandy Ryza] Add comment
13b53df [Sandy Ryza] Review feedback
067465f [Sandy Ryza] Whitespace fix
6ae080c [Sandy Ryza] Add tests and get num pending tasks from ExecutorAllocationListener
531e2b6 [Sandy Ryza] SPARK-4214. With dynamic allocation, avoid outstanding requests for more executors than pending tasks need.
parent 37482ce5
No related branches found
No related tags found
No related merge requests found
......@@ -28,7 +28,9 @@ import org.apache.spark.scheduler._
* the scheduler queue is not drained in N seconds, then new executors are added. If the queue
* persists for another M seconds, then more executors are added and so on. The number added
* in each round increases exponentially from the previous round until an upper bound on the
* number of executors has been reached.
* number of executors has been reached. The upper bound is based both on a configured property
* and on the number of tasks pending: the policy will never increase the number of executor
* requests past the number needed to handle all pending tasks.
*
* The rationale for the exponential increase is twofold: (1) Executors should be added slowly
* in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
......@@ -82,6 +84,12 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
// TODO: The default value of 1 for spark.executor.cores works right now because dynamic
// allocation is only supported for YARN and the default number of cores per executor in YARN is
// 1, but it might need to be attained differently for different cluster managers
private val tasksPerExecutor =
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
validateSettings()
// Number of executors to add in the next round
......@@ -110,6 +118,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// Clock used to schedule when executors should be added and removed
private var clock: Clock = new RealClock
// Listener for Spark events that impact the allocation policy
private val listener = new ExecutorAllocationListener(this)
/**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
......@@ -141,6 +152,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
if (tasksPerExecutor == 0) {
throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.cores")
}
}
/**
......@@ -154,7 +168,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
* Register for scheduler callbacks to decide when to add and remove executors.
*/
def start(): Unit = {
val listener = new ExecutorAllocationListener(this)
sc.addSparkListener(listener)
startPolling()
}
......@@ -218,13 +231,27 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
return 0
}
// Request executors with respect to the upper bound
val actualNumExecutorsToAdd =
if (numExistingExecutors + numExecutorsToAdd <= maxNumExecutors) {
numExecutorsToAdd
} else {
maxNumExecutors - numExistingExecutors
}
// The number of executors needed to satisfy all pending tasks is the number of tasks pending
// divided by the number of tasks each executor can fit, rounded up.
val maxNumExecutorsPending =
(listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor
if (numExecutorsPending >= maxNumExecutorsPending) {
logDebug(s"Not adding executors because there are already $numExecutorsPending " +
s"pending and pending tasks could only fill $maxNumExecutorsPending")
numExecutorsToAdd = 1
return 0
}
// It's never useful to request more executors than could satisfy all the pending tasks, so
// cap request at that amount.
// Also cap request with respect to the configured upper bound.
val maxNumExecutorsToAdd = math.min(
maxNumExecutorsPending - numExecutorsPending,
maxNumExecutors - numExistingExecutors)
assert(maxNumExecutorsToAdd > 0)
val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)
val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd)
if (addRequestAcknowledged) {
......@@ -445,6 +472,16 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
}
/**
* An estimate of the total number of pending tasks remaining for currently running stages. Does
* not account for tasks which may have failed and been resubmitted.
*/
def totalPendingTasks(): Int = {
stageIdToNumTasks.map { case (stageId, numTasks) =>
numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
}.sum
}
}
}
......
......@@ -76,6 +76,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("add executors") {
sc = createSparkContext(1, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
// Keep adding until the limit is reached
assert(numExecutorsPending(manager) === 0)
......@@ -117,6 +118,51 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(numExecutorsToAdd(manager) === 1)
}
test("add executors capped by num pending tasks") {
sc = createSparkContext(1, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5)))
// Verify that we're capped at number of tasks in the stage
assert(numExecutorsPending(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 1)
assert(numExecutorsPending(manager) === 1)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 2)
assert(numExecutorsPending(manager) === 3)
assert(numExecutorsToAdd(manager) === 4)
assert(addExecutors(manager) === 2)
assert(numExecutorsPending(manager) === 5)
assert(numExecutorsToAdd(manager) === 1)
// Verify that running a task reduces the cap
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3)))
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
assert(addExecutors(manager) === 1)
assert(numExecutorsPending(manager) === 6)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 1)
assert(numExecutorsPending(manager) === 7)
assert(numExecutorsToAdd(manager) === 1)
// Verify that re-running a task doesn't reduce the cap further
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3)))
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1")))
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1")))
assert(addExecutors(manager) === 1)
assert(numExecutorsPending(manager) === 8)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 1)
assert(numExecutorsPending(manager) === 9)
assert(numExecutorsToAdd(manager) === 1)
// Verify that running a task once we're at our limit doesn't blow things up
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1")))
assert(addExecutors(manager) === 0)
assert(numExecutorsPending(manager) === 9)
}
test("remove executors") {
sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
......@@ -170,6 +216,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test ("interleaving add and remove") {
sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
// Add a few executors
assert(addExecutors(manager) === 1)
......@@ -343,6 +390,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
val clock = new TestClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
// Scheduler queue backlogged
onSchedulerBacklogged(manager)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment