Skip to content
Snippets Groups Projects
Commit 3d8760d7 authored by Andrew Or's avatar Andrew Or
Browse files

[SPARK-7771] [SPARK-7779] Dynamic allocation: lower default timeouts further

The default add time of 5s is still too slow for small jobs. Also, the current default remove time of 10 minutes seem rather high. This patch lowers both and rephrases a few log messages.

Author: Andrew Or <andrew@databricks.com>

Closes #6301 from andrewor14/da-minor and squashes the following commits:

6d614a6 [Andrew Or] Lower log level
2811492 [Andrew Or] Log information when requests are canceled
5fcd3eb [Andrew Or] Fix tests
3320710 [Andrew Or] Lower timeouts + rephrase a few log messages
parent 3c130510
No related branches found
No related tags found
No related merge requests found
...@@ -91,7 +91,7 @@ private[spark] class ExecutorAllocationManager( ...@@ -91,7 +91,7 @@ private[spark] class ExecutorAllocationManager(
// How long there must be backlogged tasks for before an addition is triggered (seconds) // How long there must be backlogged tasks for before an addition is triggered (seconds)
private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds( private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.schedulerBacklogTimeout", "5s") "spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
// Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded // Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded
private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds( private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
...@@ -99,7 +99,7 @@ private[spark] class ExecutorAllocationManager( ...@@ -99,7 +99,7 @@ private[spark] class ExecutorAllocationManager(
// How long an executor must be idle for before it is removed (seconds) // How long an executor must be idle for before it is removed (seconds)
private val executorIdleTimeoutS = conf.getTimeAsSeconds( private val executorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.executorIdleTimeout", "600s") "spark.dynamicAllocation.executorIdleTimeout", "60s")
// During testing, the methods to actually kill and add executors are mocked out // During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
...@@ -268,6 +268,8 @@ private[spark] class ExecutorAllocationManager( ...@@ -268,6 +268,8 @@ private[spark] class ExecutorAllocationManager(
numExecutorsTarget = math.max(maxNeeded, minNumExecutors) numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
client.requestTotalExecutors(numExecutorsTarget) client.requestTotalExecutors(numExecutorsTarget)
numExecutorsToAdd = 1 numExecutorsToAdd = 1
logInfo(s"Lowering target number of executors to $numExecutorsTarget because " +
s"not all requests are actually needed (previously $oldNumExecutorsTarget)")
numExecutorsTarget - oldNumExecutorsTarget numExecutorsTarget - oldNumExecutorsTarget
} else if (addTime != NOT_SET && now >= addTime) { } else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded) val delta = addExecutors(maxNeeded)
...@@ -292,9 +294,8 @@ private[spark] class ExecutorAllocationManager( ...@@ -292,9 +294,8 @@ private[spark] class ExecutorAllocationManager(
private def addExecutors(maxNumExecutorsNeeded: Int): Int = { private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
// Do not request more executors if it would put our target over the upper bound // Do not request more executors if it would put our target over the upper bound
if (numExecutorsTarget >= maxNumExecutors) { if (numExecutorsTarget >= maxNumExecutors) {
val numExecutorsPending = numExecutorsTarget - executorIds.size logDebug(s"Not adding executors because our current target total " +
logDebug(s"Not adding executors because there are already ${executorIds.size} registered " + s"is already $numExecutorsTarget (limit $maxNumExecutors)")
s"and ${numExecutorsPending} pending executor(s) (limit $maxNumExecutors)")
numExecutorsToAdd = 1 numExecutorsToAdd = 1
return 0 return 0
} }
...@@ -310,10 +311,19 @@ private[spark] class ExecutorAllocationManager( ...@@ -310,10 +311,19 @@ private[spark] class ExecutorAllocationManager(
// Ensure that our target fits within configured bounds: // Ensure that our target fits within configured bounds:
numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors) numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
val delta = numExecutorsTarget - oldNumExecutorsTarget
// If our target has not changed, do not send a message
// to the cluster manager and reset our exponential growth
if (delta == 0) {
numExecutorsToAdd = 1
return 0
}
val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget) val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
if (addRequestAcknowledged) { if (addRequestAcknowledged) {
val delta = numExecutorsTarget - oldNumExecutorsTarget val executorsString = "executor" + { if (delta > 1) "s" else "" }
logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" + logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
s" (new desired total will be $numExecutorsTarget)") s" (new desired total will be $numExecutorsTarget)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) { numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2 numExecutorsToAdd * 2
...@@ -420,7 +430,7 @@ private[spark] class ExecutorAllocationManager( ...@@ -420,7 +430,7 @@ private[spark] class ExecutorAllocationManager(
* This resets all variables used for adding executors. * This resets all variables used for adding executors.
*/ */
private def onSchedulerQueueEmpty(): Unit = synchronized { private def onSchedulerQueueEmpty(): Unit = synchronized {
logDebug(s"Clearing timer to add executors because there are no more pending tasks") logDebug("Clearing timer to add executors because there are no more pending tasks")
addTime = NOT_SET addTime = NOT_SET
numExecutorsToAdd = 1 numExecutorsToAdd = 1
} }
......
...@@ -1194,7 +1194,7 @@ Apart from these, the following properties are also available, and may be useful ...@@ -1194,7 +1194,7 @@ Apart from these, the following properties are also available, and may be useful
</tr> </tr>
<tr> <tr>
<td><code>spark.dynamicAllocation.executorIdleTimeout</code></td> <td><code>spark.dynamicAllocation.executorIdleTimeout</code></td>
<td>600s</td> <td>60s</td>
<td> <td>
If dynamic allocation is enabled and an executor has been idle for more than this duration, If dynamic allocation is enabled and an executor has been idle for more than this duration,
the executor will be removed. For more detail, see this the executor will be removed. For more detail, see this
...@@ -1224,7 +1224,7 @@ Apart from these, the following properties are also available, and may be useful ...@@ -1224,7 +1224,7 @@ Apart from these, the following properties are also available, and may be useful
</tr> </tr>
<tr> <tr>
<td><code>spark.dynamicAllocation.schedulerBacklogTimeout</code></td> <td><code>spark.dynamicAllocation.schedulerBacklogTimeout</code></td>
<td>5s</td> <td>1s</td>
<td> <td>
If dynamic allocation is enabled and there have been pending tasks backlogged for more than If dynamic allocation is enabled and there have been pending tasks backlogged for more than
this duration, new executors will be requested. For more detail, see this this duration, new executors will be requested. For more detail, see this
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment