From 52834d761b059264214dfc6a1f9c70b8bc7ec089 Mon Sep 17 00:00:00 2001
From: Aaron Davidson <aaron@databricks.com>
Date: Sun, 9 Mar 2014 11:08:39 -0700
Subject: [PATCH] SPARK-929: Fully deprecate usage of SPARK_MEM

(Continued from old repo, prior discussion at https://github.com/apache/incubator-spark/pull/615)

This patch cements our deprecation of the SPARK_MEM environment variable by replacing it with three more specialized variables:
SPARK_DAEMON_MEMORY, SPARK_EXECUTOR_MEMORY, and SPARK_DRIVER_MEMORY

The creation of the latter two variables means that we can safely set driver/job memory without accidentally setting the executor memory. Neither is public.

SPARK_EXECUTOR_MEMORY is only used by the Mesos scheduler (and set within SparkContext). The proper way of configuring executor memory is through the "spark.executor.memory" property.

SPARK_DRIVER_MEMORY is the new way of specifying the amount of memory run by jobs launched by spark-class, without possibly affecting executor memory.

Other memory considerations:
- The repl's memory can be set through the "--drivermem" command-line option, which really just sets SPARK_DRIVER_MEMORY.
- run-example doesn't use spark-class, so the only way to modify examples' memory is actually an unusual use of SPARK_JAVA_OPTS (which is normally overriden in all cases by spark-class).

This patch also fixes a lurking bug where spark-shell misused spark-class (the first argument is supposed to be the main class name, not java options), as well as a bug in the Windows spark-class2.cmd. I have not yet tested this patch on either Windows or Mesos, however.

Author: Aaron Davidson <aaron@databricks.com>

Closes #99 from aarondav/sparkmem and squashes the following commits:

9df4c68 [Aaron Davidson] SPARK-929: Fully deprecate usage of SPARK_MEM
---
 bin/spark-class                               | 48 +++++++++++--------
 bin/spark-class2.cmd                          | 47 +++++++++++++-----
 bin/spark-shell                               | 28 +++++------
 .../scala/org/apache/spark/SparkContext.scala | 20 ++++----
 .../scala/org/apache/spark/util/Utils.scala   |  2 -
 docs/tuning.md                                |  2 +-
 python/pyspark/java_gateway.py                |  2 +-
 7 files changed, 90 insertions(+), 59 deletions(-)

diff --git a/bin/spark-class b/bin/spark-class
index c4225a392d..229ae2cebb 100755
--- a/bin/spark-class
+++ b/bin/spark-class
@@ -40,34 +40,46 @@ if [ -z "$1" ]; then
   exit 1
 fi
 
-# If this is a standalone cluster daemon, reset SPARK_JAVA_OPTS and SPARK_MEM to reasonable
-# values for that; it doesn't need a lot
-if [ "$1" = "org.apache.spark.deploy.master.Master" -o "$1" = "org.apache.spark.deploy.worker.Worker" ]; then
-  SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
-  SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
-  # Do not overwrite SPARK_JAVA_OPTS environment variable in this script
-  OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS"   # Empty by default
-else
-  OUR_JAVA_OPTS="$SPARK_JAVA_OPTS"
+if [ -n "$SPARK_MEM" ]; then
+  echo "Warning: SPARK_MEM is deprecated, please use a more specific config option"
+  echo "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)."
 fi
 
+# Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
+DEFAULT_MEM=${SPARK_MEM:-512m}
+
+SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
 
-# Add java opts for master, worker, executor. The opts maybe null
+# Add java opts and memory settings for master, worker, executors, and repl.
 case "$1" in
+  # Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
   'org.apache.spark.deploy.master.Master')
-    OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS"
+    OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS"
+    OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
     ;;
   'org.apache.spark.deploy.worker.Worker')
-    OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS"
+    OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS"
+    OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
     ;;
+
+  # Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
   'org.apache.spark.executor.CoarseGrainedExecutorBackend')
-    OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
+    OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
+    OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
     ;;
   'org.apache.spark.executor.MesosExecutorBackend')
-    OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
+    OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
+    OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
     ;;
+
+  # All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS.
   'org.apache.spark.repl.Main')
-    OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS"
+    OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_REPL_OPTS"
+    OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
+    ;;
+  *)
+    OUR_JAVA_OPTS="$SPARK_JAVA_OPTS"
+    OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
     ;;
 esac
 
@@ -83,14 +95,10 @@ else
   fi
 fi
 
-# Set SPARK_MEM if it isn't already set since we also use it for this process
-SPARK_MEM=${SPARK_MEM:-512m}
-export SPARK_MEM
-
 # Set JAVA_OPTS to be able to load native libraries and to set heap size
 JAVA_OPTS="$OUR_JAVA_OPTS"
 JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
-JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"
+JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
 # Load extra JAVA_OPTS from conf/java-opts, if it exists
 if [ -e "$FWDIR/conf/java-opts" ] ; then
   JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd
index 80818c78ec..f488cfdbec 100755
--- a/bin/spark-class2.cmd
+++ b/bin/spark-class2.cmd
@@ -34,22 +34,45 @@ if not "x%1"=="x" goto arg_given
   goto exit
 :arg_given
 
-set RUNNING_DAEMON=0
-if "%1"=="spark.deploy.master.Master" set RUNNING_DAEMON=1
-if "%1"=="spark.deploy.worker.Worker" set RUNNING_DAEMON=1
-if "x%SPARK_DAEMON_MEMORY%" == "x" set SPARK_DAEMON_MEMORY=512m
+if not "x%SPARK_MEM%"=="x" (
+  echo Warning: SPARK_MEM is deprecated, please use a more specific config option
+  echo e.g., spark.executor.memory or SPARK_DRIVER_MEMORY.
+)
+
+rem Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
+set OUR_JAVA_MEM=%SPARK_MEM%
+if "x%OUR_JAVA_MEM%"=="x" set OUR_JAVA_MEM=512m
+
 set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true
-if "%RUNNING_DAEMON%"=="1" set SPARK_MEM=%SPARK_DAEMON_MEMORY%
-rem Do not overwrite SPARK_JAVA_OPTS environment variable in this script
-if "%RUNNING_DAEMON%"=="0" set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
-if "%RUNNING_DAEMON%"=="1" set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS%
 
-rem Figure out how much memory to use per executor and set it as an environment
-rem variable so that our process sees it and can report it to Mesos
-if "x%SPARK_MEM%"=="x" set SPARK_MEM=512m
+rem Add java opts and memory settings for master, worker, executors, and repl.
+rem Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
+if "%1"=="org.apache.spark.deploy.master.Master" (
+  set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_MASTER_OPTS%
+  if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
+) else if "%1"=="org.apache.spark.deploy.worker.Worker" (
+  set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_WORKER_OPTS%
+  if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
+
+rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
+) else if "%1"=="org.apache.spark.executor.CoarseGrainedExecutorBackend" (
+  set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
+  if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%
+) else if "%1"=="org.apache.spark.executor.MesosExecutorBackend" (
+  set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
+  if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%
+
+rem All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS.
+) else if "%1"=="org.apache.spark.repl.Main" (
+  set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_REPL_OPTS%
+  if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
+) else (
+  set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
+  if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
+)
 
 rem Set JAVA_OPTS to be able to load native libraries and to set heap size
-set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM%
+set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
 rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala!
 
 rem Test whether the user has built Spark
diff --git a/bin/spark-shell b/bin/spark-shell
index 2bff06cf70..7d3fe3aca7 100755
--- a/bin/spark-shell
+++ b/bin/spark-shell
@@ -45,13 +45,11 @@ if [ "$1" = "--help" ] || [ "$1" = "-h" ]; then
 	exit
 fi
 
-SPARK_SHELL_OPTS=""
-
 for o in "$@"; do
   if [ "$1" = "-c" -o "$1" = "--cores" ]; then
     shift
     if [[ "$1" =~ $CORE_PATTERN ]]; then
-      SPARK_SHELL_OPTS="$SPARK_SHELL_OPTS -Dspark.cores.max=$1"
+      SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.cores.max=$1"
       shift
     else
       echo "ERROR: wrong format for -c/--cores"
@@ -61,7 +59,7 @@ for o in "$@"; do
   if [ "$1" = "-em" -o "$1" = "--execmem" ]; then
     shift
     if [[ $1 =~ $MEM_PATTERN ]]; then
-      SPARK_SHELL_OPTS="$SPARK_SHELL_OPTS -Dspark.executor.memory=$1"
+      SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.executor.memory=$1"
       shift
     else
       echo "ERROR: wrong format for --execmem/-em"
@@ -71,7 +69,7 @@ for o in "$@"; do
   if [ "$1" = "-dm" -o "$1" = "--drivermem" ]; then
     shift
     if [[ $1 =~ $MEM_PATTERN ]]; then
-      export SPARK_MEM=$1
+      export SPARK_DRIVER_MEMORY=$1
       shift
     else
       echo "ERROR: wrong format for --drivermem/-dm"
@@ -125,16 +123,18 @@ if [[ ! $? ]]; then
 fi
 
 if $cygwin; then
-    # Workaround for issue involving JLine and Cygwin
-    # (see http://sourceforge.net/p/jline/bugs/40/).
-    # If you're using the Mintty terminal emulator in Cygwin, may need to set the
-    # "Backspace sends ^H" setting in "Keys" section of the Mintty options
-    # (see https://github.com/sbt/sbt/issues/562).
-    stty -icanon min 1 -echo > /dev/null 2>&1
-    $FWDIR/bin/spark-class -Djline.terminal=unix $SPARK_SHELL_OPTS org.apache.spark.repl.Main "$@"
-    stty icanon echo > /dev/null 2>&1
+  # Workaround for issue involving JLine and Cygwin
+  # (see http://sourceforge.net/p/jline/bugs/40/).
+  # If you're using the Mintty terminal emulator in Cygwin, may need to set the
+  # "Backspace sends ^H" setting in "Keys" section of the Mintty options
+  # (see https://github.com/sbt/sbt/issues/562).
+  stty -icanon min 1 -echo > /dev/null 2>&1
+  export SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Djline.terminal=unix"
+  $FWDIR/bin/spark-class org.apache.spark.repl.Main "$@"
+  stty icanon echo > /dev/null 2>&1
 else
-    $FWDIR/bin/spark-class $SPARK_SHELL_OPTS org.apache.spark.repl.Main "$@"
+  export SPARK_REPL_OPTS
+  $FWDIR/bin/spark-class org.apache.spark.repl.Main "$@"
 fi
 
 # record the exit status lest it be overwritten:
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ce25573834..cdc0e5a342 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -162,19 +162,20 @@ class SparkContext(
     jars.foreach(addJar)
   }
 
+  def warnSparkMem(value: String): String = {
+    logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
+      "deprecated, please use spark.executor.memory instead.")
+    value
+  }
+
   private[spark] val executorMemory = conf.getOption("spark.executor.memory")
-    .orElse(Option(System.getenv("SPARK_MEM")))
+    .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
+    .orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem))
     .map(Utils.memoryStringToMb)
     .getOrElse(512)
 
-  if (!conf.contains("spark.executor.memory") && sys.env.contains("SPARK_MEM")) {
-    logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
-      "deprecated, instead use spark.executor.memory")
-  }
-
   // Environment variables to pass to our executors
   private[spark] val executorEnvs = HashMap[String, String]()
-  // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
   for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS");
       value <- Option(System.getenv(key))) {
     executorEnvs(key) = value
@@ -185,8 +186,9 @@ class SparkContext(
     value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
     executorEnvs(envKey) = value
   }
-  // Since memory can be set with a system property too, use that
-  executorEnvs("SPARK_MEM") = executorMemory + "m"
+  // The Mesos scheduler backend relies on this environment variable to set executor memory.
+  // TODO: Set this only in the Mesos scheduler.
+  executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
   executorEnvs ++= conf.getExecutorEnv
 
   // Set SPARK_USER for user who is running SparkContext.
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0eb2f78b73..53458b6660 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -532,8 +532,6 @@ private[spark] object Utils extends Logging {
 
   /**
    * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
-   * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
-   * environment variable.
    */
   def memoryStringToMb(str: String): Int = {
     val lower = str.toLowerCase
diff --git a/docs/tuning.md b/docs/tuning.md
index 26ff1325bb..093df3187a 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -163,7 +163,7 @@ their work directories), *not* on your driver program.
 **Cache Size Tuning**
 
 One important configuration parameter for GC is the amount of memory that should be used for caching RDDs.
-By default, Spark uses 60% of the configured executor memory (`spark.executor.memory` or `SPARK_MEM`) to
+By default, Spark uses 60% of the configured executor memory (`spark.executor.memory`) to
 cache RDDs. This means that 40% of memory is available for any objects created during task execution.
 
 In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index c15add5237..6a16756e05 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -29,7 +29,7 @@ SPARK_HOME = os.environ["SPARK_HOME"]
 
 def launch_gateway():
     # Launch the Py4j gateway using Spark's run command so that we pick up the
-    # proper classpath and SPARK_MEM settings from spark-env.sh
+    # proper classpath and settings from spark-env.sh
     on_windows = platform.system() == "Windows"
     script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
     command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
-- 
GitLab