diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 0926d05414ba716f0b9b37d334fba32a27feb81d..932ba16812bbb15b972aed964ed7d2e79aa77b95 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -25,9 +25,10 @@ import scala.util.control.ControlThrowable import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS} import org.apache.spark.metrics.source.Source import org.apache.spark.scheduler._ -import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} /** * An agent that dynamically allocates and removes executors based on the workload. @@ -87,11 +88,9 @@ private[spark] class ExecutorAllocationManager( import ExecutorAllocationManager._ // Lower and upper bounds on the number of executors. - private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) - private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", - Integer.MAX_VALUE) - private val initialNumExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors", - minNumExecutors) + private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) + private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) // How long there must be backlogged tasks for before an addition is triggered (seconds) private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds( diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 206c130c763732d18ce6b10a9836228755139776..f1761e7c1ec92febadec8f9db8d386030b8e2c2e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -550,6 +550,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | (Default: 1). | --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). | --num-executors NUM Number of executors to launch (Default: 2). + | If dynamic allocation is enabled, the initial number of + | executors will be at least NUM. | --archives ARCHIVES Comma separated list of archives to be extracted into the | working directory of each executor. | --principal PRINCIPAL Principal to be used to login to KDC, while running on diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 17d193b773fd6c7c7f1dd02e5a4e9ecfade213ea..f77cc2f9b7aa0e9c40981d5d1a1e3f4b05df77bc 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -52,6 +52,7 @@ import org.slf4j.Logger import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES} import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} @@ -2309,21 +2310,24 @@ private[spark] object Utils extends Logging { } /** - * Return whether dynamic allocation is enabled in the given conf - * Dynamic allocation and explicitly setting the number of executors are inherently - * incompatible. In environments where dynamic allocation is turned on by default, - * the latter should override the former (SPARK-9092). + * Return whether dynamic allocation is enabled in the given conf. */ def isDynamicAllocationEnabled(conf: SparkConf): Boolean = { - val numExecutor = conf.getInt("spark.executor.instances", 0) val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false) - if (numExecutor != 0 && dynamicAllocationEnabled) { - logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") - } - numExecutor == 0 && dynamicAllocationEnabled && + dynamicAllocationEnabled && (!isLocalMaster(conf) || conf.getBoolean("spark.dynamicAllocation.testing", false)) } + /** + * Return the initial number of executors for dynamic allocation. + */ + def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = { + Seq( + conf.get(DYN_ALLOCATION_MIN_EXECUTORS), + conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS), + conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max + } + def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = { val resource = createResource try f.apply(resource) finally resource.close() diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index a5363f0bfd600e6036d2672e6ff8ac02f8cd5888..e3a8e83f3eaa5a8c312218736ff6824a3e5436df 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -761,13 +761,29 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.isDynamicAllocationEnabled( conf.set("spark.dynamicAllocation.enabled", "true")) === true) assert(Utils.isDynamicAllocationEnabled( - conf.set("spark.executor.instances", "1")) === false) + conf.set("spark.executor.instances", "1")) === true) assert(Utils.isDynamicAllocationEnabled( conf.set("spark.executor.instances", "0")) === true) assert(Utils.isDynamicAllocationEnabled(conf.set("spark.master", "local")) === false) assert(Utils.isDynamicAllocationEnabled(conf.set("spark.dynamicAllocation.testing", "true"))) } + test("getDynamicAllocationInitialExecutors") { + val conf = new SparkConf() + assert(Utils.getDynamicAllocationInitialExecutors(conf) === 0) + assert(Utils.getDynamicAllocationInitialExecutors( + conf.set("spark.dynamicAllocation.minExecutors", "3")) === 3) + assert(Utils.getDynamicAllocationInitialExecutors( // should use minExecutors + conf.set("spark.executor.instances", "2")) === 3) + assert(Utils.getDynamicAllocationInitialExecutors( // should use executor.instances + conf.set("spark.executor.instances", "4")) === 4) + assert(Utils.getDynamicAllocationInitialExecutors( // should use executor.instances + conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4) + assert(Utils.getDynamicAllocationInitialExecutors( // should use initialExecutors + conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5) + } + + test("encodeFileNameToURIRawPath") { assert(Utils.encodeFileNameToURIRawPath("abc") === "abc") assert(Utils.encodeFileNameToURIRawPath("abc xyz") === "abc%20xyz") diff --git a/docs/configuration.md b/docs/configuration.md index fbda91c1096268c2a813bb80e02ca4ace4f43706..cee59cf2aa05f489906aa850f7558c5399a51a24 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1236,6 +1236,9 @@ Apart from these, the following properties are also available, and may be useful <td><code>spark.dynamicAllocation.minExecutors</code></td> <td> Initial number of executors to run if dynamic allocation is enabled. + <br /><br /> + If `--num-executors` (or `spark.executor.instances`) is set and larger than this value, it will + be used as the initial number of executors. </td> </tr> <tr> diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 9833806716aab5a1d0aefb05e1b144c1a17f3129..dbd46cc48c14bdf8c7d14c5b21074dc78c4ba08b 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -244,7 +244,7 @@ To use a custom metrics.properties for the application master and executors, upd <td><code>spark.executor.instances</code></td> <td><code>2</code></td> <td> - The number of executors. Note that this property is incompatible with <code>spark.dynamicAllocation.enabled</code>. If both <code>spark.dynamicAllocation.enabled</code> and <code>spark.executor.instances</code> are specified, dynamic allocation is turned off and the specified number of <code>spark.executor.instances</code> is used. + The number of executors for static allocation. With <code>spark.dynamicAllocation.enabled</code>, the initial set of executors will be at least this large. </td> </tr> <tr> diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index de6cd946137cbd0c853a0d7a2781f005f7c6cb9a..156a7a30eaa93dd599aa886383eadebab3abbdc2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -520,7 +520,7 @@ object YarnSparkHadoopUtil { numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) - val initialNumExecutors = conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS) + val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, s"initial executor number $initialNumExecutors must between min executor number " +