From 2f54543815c0905dc958d444ad638c23a29507c6 Mon Sep 17 00:00:00 2001
From: Andrew Or <andrew@databricks.com>
Date: Thu, 30 Oct 2014 15:44:29 -0700
Subject: [PATCH] [SPARK-3661] Respect spark.*.memory in cluster mode

This also includes minor re-organization of the code. Tested locally in both client and deploy modes.

Author: Andrew Or <andrew@databricks.com>
Author: Andrew Or <andrewor14@gmail.com>

Closes #2697 from andrewor14/memory-cluster-mode and squashes the following commits:

01d78bc [Andrew Or] Merge branch 'master' of github.com:apache/spark into memory-cluster-mode
ccd468b [Andrew Or] Add some comments per Patrick
c956577 [Andrew Or] Tweak wording
2b4afa0 [Andrew Or] Unused import
47a5a88 [Andrew Or] Correct Spark properties precedence order
bf64717 [Andrew Or] Merge branch 'master' of github.com:apache/spark into memory-cluster-mode
dd452d0 [Andrew Or] Respect spark.*.memory in cluster mode
---
 .../org/apache/spark/deploy/SparkSubmit.scala |  8 +-
 .../spark/deploy/SparkSubmitArguments.scala   | 74 +++++++++++--------
 2 files changed, 45 insertions(+), 37 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 0379adeb07..b43e68e40f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -274,17 +274,11 @@ object SparkSubmit {
       }
     }
 
-    // Properties given with --conf are superceded by other options, but take precedence over
-    // properties in the defaults file.
+    // Load any properties specified through --conf and the default properties file
     for ((k, v) <- args.sparkProperties) {
       sysProps.getOrElseUpdate(k, v)
     }
 
-    // Read from default spark properties, if any
-    for ((k, v) <- args.defaultSparkProperties) {
-      sysProps.getOrElseUpdate(k, v)
-    }
-
     // Resolve paths in certain spark properties
     val pathConfigs = Seq(
       "spark.jars",
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 72a452e0ae..f0e9ee67f6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -19,7 +19,6 @@ package org.apache.spark.deploy
 
 import java.util.jar.JarFile
 
-import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 
 import org.apache.spark.util.Utils
@@ -72,39 +71,54 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
     defaultProperties
   }
 
-  // Respect SPARK_*_MEMORY for cluster mode
-  driverMemory = sys.env.get("SPARK_DRIVER_MEMORY").orNull
-  executorMemory = sys.env.get("SPARK_EXECUTOR_MEMORY").orNull
-
+  // Set parameters from command line arguments
   parseOpts(args.toList)
-  mergeSparkProperties()
+  // Populate `sparkProperties` map from properties file
+  mergeDefaultSparkProperties()
+  // Use `sparkProperties` map along with env vars to fill in any missing parameters
+  loadEnvironmentArguments()
+
   checkRequiredArguments()
 
   /**
-   * Fill in any undefined values based on the default properties file or options passed in through
-   * the '--conf' flag.
+   * Merge values from the default properties file with those specified through --conf.
+   * When this is called, `sparkProperties` is already filled with configs from the latter.
    */
-  private def mergeSparkProperties(): Unit = {
+  private def mergeDefaultSparkProperties(): Unit = {
     // Use common defaults file, if not specified by user
     propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(env))
+    // Honor --conf before the defaults file
+    defaultSparkProperties.foreach { case (k, v) =>
+      if (!sparkProperties.contains(k)) {
+        sparkProperties(k) = v
+      }
+    }
+  }
 
-    val properties = HashMap[String, String]()
-    properties.putAll(defaultSparkProperties)
-    properties.putAll(sparkProperties)
-
-    // Use properties file as fallback for values which have a direct analog to
-    // arguments in this script.
-    master = Option(master).orElse(properties.get("spark.master")).orNull
-    executorMemory = Option(executorMemory).orElse(properties.get("spark.executor.memory")).orNull
-    executorCores = Option(executorCores).orElse(properties.get("spark.executor.cores")).orNull
+  /**
+   * Load arguments from environment variables, Spark properties etc.
+   */
+  private def loadEnvironmentArguments(): Unit = {
+    master = Option(master)
+      .orElse(sparkProperties.get("spark.master"))
+      .orElse(env.get("MASTER"))
+      .orNull
+    driverMemory = Option(driverMemory)
+      .orElse(sparkProperties.get("spark.driver.memory"))
+      .orElse(env.get("SPARK_DRIVER_MEMORY"))
+      .orNull
+    executorMemory = Option(executorMemory)
+      .orElse(sparkProperties.get("spark.executor.memory"))
+      .orElse(env.get("SPARK_EXECUTOR_MEMORY"))
+      .orNull
+    executorCores = Option(executorCores)
+      .orElse(sparkProperties.get("spark.executor.cores"))
+      .orNull
     totalExecutorCores = Option(totalExecutorCores)
-      .orElse(properties.get("spark.cores.max"))
+      .orElse(sparkProperties.get("spark.cores.max"))
       .orNull
-    name = Option(name).orElse(properties.get("spark.app.name")).orNull
-    jars = Option(jars).orElse(properties.get("spark.jars")).orNull
-
-    // This supports env vars in older versions of Spark
-    master = Option(master).orElse(env.get("MASTER")).orNull
+    name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
+    jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
     deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
 
     // Try to set main class from JAR if no --class argument is given
@@ -131,7 +145,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
   }
 
   /** Ensure that required fields exists. Call this only once all defaults are loaded. */
-  private def checkRequiredArguments() = {
+  private def checkRequiredArguments(): Unit = {
     if (args.length == 0) {
       printUsageAndExit(-1)
     }
@@ -166,7 +180,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
     }
   }
 
-  override def toString =  {
+  override def toString = {
     s"""Parsed arguments:
     |  master                  $master
     |  deployMode              $deployMode
@@ -174,7 +188,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
     |  executorCores           $executorCores
     |  totalExecutorCores      $totalExecutorCores
     |  propertiesFile          $propertiesFile
-    |  extraSparkProperties    $sparkProperties
     |  driverMemory            $driverMemory
     |  driverCores             $driverCores
     |  driverExtraClassPath    $driverExtraClassPath
@@ -193,8 +206,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
     |  jars                    $jars
     |  verbose                 $verbose
     |
-    |Default properties from $propertiesFile:
-    |${defaultSparkProperties.mkString("  ", "\n  ", "\n")}
+    |Spark properties used, including those specified through
+    | --conf and those from the properties file $propertiesFile:
+    |${sparkProperties.mkString("  ", "\n  ", "\n")}
     """.stripMargin
   }
 
@@ -327,7 +341,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
     }
   }
 
-  private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+  private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
     val outStream = SparkSubmit.printStream
     if (unknownParam != null) {
       outStream.println("Unknown/unsupported param " + unknownParam)
-- 
GitLab