diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 66bda68088502e02b005c8d58dde8faf774993f9..9514604752640da1108dfce38a8da0ad216df96c 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -91,7 +91,7 @@ private[spark] class ExecutorAllocationManager( // How long there must be backlogged tasks for before an addition is triggered (seconds) private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds( - "spark.dynamicAllocation.schedulerBacklogTimeout", "5s") + "spark.dynamicAllocation.schedulerBacklogTimeout", "1s") // Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds( @@ -99,7 +99,7 @@ private[spark] class ExecutorAllocationManager( // How long an executor must be idle for before it is removed (seconds) private val executorIdleTimeoutS = conf.getTimeAsSeconds( - "spark.dynamicAllocation.executorIdleTimeout", "600s") + "spark.dynamicAllocation.executorIdleTimeout", "60s") // During testing, the methods to actually kill and add executors are mocked out private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) @@ -268,6 +268,8 @@ private[spark] class ExecutorAllocationManager( numExecutorsTarget = math.max(maxNeeded, minNumExecutors) client.requestTotalExecutors(numExecutorsTarget) numExecutorsToAdd = 1 + logInfo(s"Lowering target number of executors to $numExecutorsTarget because " + + s"not all requests are actually needed (previously $oldNumExecutorsTarget)") numExecutorsTarget - oldNumExecutorsTarget } else if (addTime != NOT_SET && now >= addTime) { val delta = addExecutors(maxNeeded) @@ -292,9 +294,8 @@ private[spark] class ExecutorAllocationManager( private def addExecutors(maxNumExecutorsNeeded: Int): Int = { // Do not request more executors if it would put our target over the upper bound if (numExecutorsTarget >= maxNumExecutors) { - val numExecutorsPending = numExecutorsTarget - executorIds.size - logDebug(s"Not adding executors because there are already ${executorIds.size} registered " + - s"and ${numExecutorsPending} pending executor(s) (limit $maxNumExecutors)") + logDebug(s"Not adding executors because our current target total " + + s"is already $numExecutorsTarget (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 } @@ -310,10 +311,19 @@ private[spark] class ExecutorAllocationManager( // Ensure that our target fits within configured bounds: numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors) + val delta = numExecutorsTarget - oldNumExecutorsTarget + + // If our target has not changed, do not send a message + // to the cluster manager and reset our exponential growth + if (delta == 0) { + numExecutorsToAdd = 1 + return 0 + } + val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget) if (addRequestAcknowledged) { - val delta = numExecutorsTarget - oldNumExecutorsTarget - logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" + + val executorsString = "executor" + { if (delta > 1) "s" else "" } + logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" + s" (new desired total will be $numExecutorsTarget)") numExecutorsToAdd = if (delta == numExecutorsToAdd) { numExecutorsToAdd * 2 @@ -420,7 +430,7 @@ private[spark] class ExecutorAllocationManager( * This resets all variables used for adding executors. */ private def onSchedulerQueueEmpty(): Unit = synchronized { - logDebug(s"Clearing timer to add executors because there are no more pending tasks") + logDebug("Clearing timer to add executors because there are no more pending tasks") addTime = NOT_SET numExecutorsToAdd = 1 } diff --git a/docs/configuration.md b/docs/configuration.md index 0de824546c75164ed39582df4e032c0f58c0d8f9..30508a617fdd8a5c4237a952db62bb9278779750 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1194,7 +1194,7 @@ Apart from these, the following properties are also available, and may be useful </tr> <tr> <td><code>spark.dynamicAllocation.executorIdleTimeout</code></td> - <td>600s</td> + <td>60s</td> <td> If dynamic allocation is enabled and an executor has been idle for more than this duration, the executor will be removed. For more detail, see this @@ -1224,7 +1224,7 @@ Apart from these, the following properties are also available, and may be useful </tr> <tr> <td><code>spark.dynamicAllocation.schedulerBacklogTimeout</code></td> - <td>5s</td> + <td>1s</td> <td> If dynamic allocation is enabled and there have been pending tasks backlogged for more than this duration, new executors will be requested. For more detail, see this