From 26f092d4e32cc1f7e279646075eaf1e495395923 Mon Sep 17 00:00:00 2001
From: Andrew Or <andrew@databricks.com>
Date: Thu, 30 Oct 2014 15:31:23 -0700
Subject: [PATCH] [SPARK-4138][SPARK-4139] Improve dynamic allocation settings

This should be merged after #2746 (SPARK-3795).

**SPARK-4138**. If the user sets both the number of executors and `spark.dynamicAllocation.enabled`, we should throw an exception.

**SPARK-4139**. If the user sets `spark.dynamicAllocation.enabled`, we should use the max number of executors as the starting number of executors because the first job is likely to run immediately after application startup. If the latter is not set, throw an exception.

Author: Andrew Or <andrew@databricks.com>

Closes #3002 from andrewor14/yarn-set-executors and squashes the following commits:

c528fce [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-set-executors
55d4699 [Andrew Or] Bug fix: `isDynamicAllocationEnabled` was always false
2b0ccec [Andrew Or] Start the number of executors at the max
022bfde [Andrew Or] Guard against incompatible settings of number of executors
---
 .../yarn/ApplicationMasterArguments.scala     |  3 +-
 .../spark/deploy/yarn/ClientArguments.scala   | 30 ++++++++++++++-----
 .../deploy/yarn/YarnSparkHadoopUtil.scala     |  2 ++
 .../cluster/YarnClusterSchedulerBackend.scala |  4 +--
 4 files changed, 29 insertions(+), 10 deletions(-)

diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 5c54e34003..104db4f65f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy.yarn
 
 import org.apache.spark.util.{MemoryParam, IntParam}
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import collection.mutable.ArrayBuffer
 
 class ApplicationMasterArguments(val args: Array[String]) {
@@ -26,7 +27,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
   var userArgs: Seq[String] = Seq[String]()
   var executorMemory = 1024
   var executorCores = 1
-  var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+  var numExecutors = DEFAULT_NUMBER_EXECUTORS
 
   parseArgs(args.toList)
 
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index a12f82d2fb..4d859450ef 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -20,8 +20,8 @@ package org.apache.spark.deploy.yarn
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.SparkConf
-import org.apache.spark.util.{Utils, IntParam, MemoryParam}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.util.{Utils, IntParam, MemoryParam}
 
 // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
 private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
@@ -33,23 +33,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
   var userArgs: Seq[String] = Seq[String]()
   var executorMemory = 1024 // MB
   var executorCores = 1
-  var numExecutors = 2
+  var numExecutors = DEFAULT_NUMBER_EXECUTORS
   var amQueue = sparkConf.get("spark.yarn.queue", "default")
   var amMemory: Int = 512 // MB
   var appName: String = "Spark"
   var priority = 0
 
-  parseArgs(args.toList)
-  loadEnvironmentArgs()
-
   // Additional memory to allocate to containers
   // For now, use driver's memory overhead as our AM container's memory overhead
-  val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead", 
+  val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
     math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))
 
-  val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", 
+  val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
     math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
 
+  private val isDynamicAllocationEnabled =
+    sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
+
+  parseArgs(args.toList)
+  loadEnvironmentArgs()
   validateArgs()
 
   /** Load any default arguments provided through environment variables and Spark properties. */
@@ -64,6 +66,15 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
       .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
       .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
       .orNull
+    // If dynamic allocation is enabled, start at the max number of executors
+    if (isDynamicAllocationEnabled) {
+      val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
+      if (!sparkConf.contains(maxExecutorsConf)) {
+        throw new IllegalArgumentException(
+          s"$maxExecutorsConf must be set if dynamic allocation is enabled!")
+      }
+      numExecutors = sparkConf.get(maxExecutorsConf).toInt
+    }
   }
 
   /**
@@ -113,6 +124,11 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
           if (args(0) == "--num-workers") {
             println("--num-workers is deprecated. Use --num-executors instead.")
           }
+          // Dynamic allocation is not compatible with this option
+          if (isDynamicAllocationEnabled) {
+            throw new IllegalArgumentException("Explicitly setting the number " +
+              "of executors is not compatible with spark.dynamicAllocation.enabled!")
+          }
           numExecutors = value
           args = tail
 
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index e1e0144f46..7d453ecb79 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -93,6 +93,8 @@ object YarnSparkHadoopUtil {
 
   val ANY_HOST = "*"
 
+  val DEFAULT_NUMBER_EXECUTORS = 2
+
   // All RM requests are issued with same priority : we do not (yet) have any distinction between
   // request types (like map/reduce in hadoop for example)
   val RM_REQUEST_PRIORITY = 1
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index a96a54f668..b1de81e6a8 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.scheduler.cluster
 
 import org.apache.spark.SparkContext
-import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.util.IntParam
 
@@ -29,7 +29,7 @@ private[spark] class YarnClusterSchedulerBackend(
 
   override def start() {
     super.start()
-    totalExpectedExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+    totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
     if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
       totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
         .getOrElse(totalExpectedExecutors)
-- 
GitLab