Skip to content
Snippets Groups Projects
Commit 2f545438 authored by Andrew Or's avatar Andrew Or
Browse files

[SPARK-3661] Respect spark.*.memory in cluster mode

This also includes minor re-organization of the code. Tested locally in both client and deploy modes.

Author: Andrew Or <andrew@databricks.com>
Author: Andrew Or <andrewor14@gmail.com>

Closes #2697 from andrewor14/memory-cluster-mode and squashes the following commits:

01d78bc [Andrew Or] Merge branch 'master' of github.com:apache/spark into memory-cluster-mode
ccd468b [Andrew Or] Add some comments per Patrick
c956577 [Andrew Or] Tweak wording
2b4afa0 [Andrew Or] Unused import
47a5a88 [Andrew Or] Correct Spark properties precedence order
bf64717 [Andrew Or] Merge branch 'master' of github.com:apache/spark into memory-cluster-mode
dd452d0 [Andrew Or] Respect spark.*.memory in cluster mode
parent d3450578
No related branches found
No related tags found
No related merge requests found
......@@ -274,17 +274,11 @@ object SparkSubmit {
}
}
// Properties given with --conf are superceded by other options, but take precedence over
// properties in the defaults file.
// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)
}
// Read from default spark properties, if any
for ((k, v) <- args.defaultSparkProperties) {
sysProps.getOrElseUpdate(k, v)
}
// Resolve paths in certain spark properties
val pathConfigs = Seq(
"spark.jars",
......
......@@ -19,7 +19,6 @@ package org.apache.spark.deploy
import java.util.jar.JarFile
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import org.apache.spark.util.Utils
......@@ -72,39 +71,54 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
defaultProperties
}
// Respect SPARK_*_MEMORY for cluster mode
driverMemory = sys.env.get("SPARK_DRIVER_MEMORY").orNull
executorMemory = sys.env.get("SPARK_EXECUTOR_MEMORY").orNull
// Set parameters from command line arguments
parseOpts(args.toList)
mergeSparkProperties()
// Populate `sparkProperties` map from properties file
mergeDefaultSparkProperties()
// Use `sparkProperties` map along with env vars to fill in any missing parameters
loadEnvironmentArguments()
checkRequiredArguments()
/**
* Fill in any undefined values based on the default properties file or options passed in through
* the '--conf' flag.
* Merge values from the default properties file with those specified through --conf.
* When this is called, `sparkProperties` is already filled with configs from the latter.
*/
private def mergeSparkProperties(): Unit = {
private def mergeDefaultSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(env))
// Honor --conf before the defaults file
defaultSparkProperties.foreach { case (k, v) =>
if (!sparkProperties.contains(k)) {
sparkProperties(k) = v
}
}
}
val properties = HashMap[String, String]()
properties.putAll(defaultSparkProperties)
properties.putAll(sparkProperties)
// Use properties file as fallback for values which have a direct analog to
// arguments in this script.
master = Option(master).orElse(properties.get("spark.master")).orNull
executorMemory = Option(executorMemory).orElse(properties.get("spark.executor.memory")).orNull
executorCores = Option(executorCores).orElse(properties.get("spark.executor.cores")).orNull
/**
* Load arguments from environment variables, Spark properties etc.
*/
private def loadEnvironmentArguments(): Unit = {
master = Option(master)
.orElse(sparkProperties.get("spark.master"))
.orElse(env.get("MASTER"))
.orNull
driverMemory = Option(driverMemory)
.orElse(sparkProperties.get("spark.driver.memory"))
.orElse(env.get("SPARK_DRIVER_MEMORY"))
.orNull
executorMemory = Option(executorMemory)
.orElse(sparkProperties.get("spark.executor.memory"))
.orElse(env.get("SPARK_EXECUTOR_MEMORY"))
.orNull
executorCores = Option(executorCores)
.orElse(sparkProperties.get("spark.executor.cores"))
.orNull
totalExecutorCores = Option(totalExecutorCores)
.orElse(properties.get("spark.cores.max"))
.orElse(sparkProperties.get("spark.cores.max"))
.orNull
name = Option(name).orElse(properties.get("spark.app.name")).orNull
jars = Option(jars).orElse(properties.get("spark.jars")).orNull
// This supports env vars in older versions of Spark
master = Option(master).orElse(env.get("MASTER")).orNull
name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
// Try to set main class from JAR if no --class argument is given
......@@ -131,7 +145,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
}
/** Ensure that required fields exists. Call this only once all defaults are loaded. */
private def checkRequiredArguments() = {
private def checkRequiredArguments(): Unit = {
if (args.length == 0) {
printUsageAndExit(-1)
}
......@@ -166,7 +180,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
}
}
override def toString = {
override def toString = {
s"""Parsed arguments:
| master $master
| deployMode $deployMode
......@@ -174,7 +188,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| executorCores $executorCores
| totalExecutorCores $totalExecutorCores
| propertiesFile $propertiesFile
| extraSparkProperties $sparkProperties
| driverMemory $driverMemory
| driverCores $driverCores
| driverExtraClassPath $driverExtraClassPath
......@@ -193,8 +206,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| jars $jars
| verbose $verbose
|
|Default properties from $propertiesFile:
|${defaultSparkProperties.mkString(" ", "\n ", "\n")}
|Spark properties used, including those specified through
| --conf and those from the properties file $propertiesFile:
|${sparkProperties.mkString(" ", "\n ", "\n")}
""".stripMargin
}
......@@ -327,7 +341,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
}
}
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
val outStream = SparkSubmit.printStream
if (unknownParam != null) {
outStream.println("Unknown/unsupported param " + unknownParam)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment