From 03d0b858c807339b4221bedffa29ac76eef5352e Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Sun, 30 Jun 2013 15:38:58 -0700
Subject: [PATCH] Made use of spark.executor.memory setting consistent and
 documented it

Conflicts:

	core/src/main/scala/spark/SparkContext.scala
---
 core/src/main/scala/spark/SparkContext.scala  | 24 +++++++++-----
 .../scheduler/cluster/SchedulerBackend.scala  | 11 ++-----
 docs/configuration.md                         | 31 ++++++++++++-------
 docs/ec2-scripts.md                           |  5 ++-
 docs/tuning.md                                |  6 ++--
 5 files changed, 43 insertions(+), 34 deletions(-)

diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 70a9d7698c..366afb2a2a 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -115,13 +115,14 @@ class SparkContext(
   // Environment variables to pass to our executors
   private[spark] val executorEnvs = HashMap[String, String]()
   // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
-  for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS",
-      "SPARK_TESTING")) {
+  for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING")) {
     val value = System.getenv(key)
     if (value != null) {
       executorEnvs(key) = value
     }
   }
+  // Since memory can be set with a system property too, use that
+  executorEnvs("SPARK_MEM") = SparkContext.executorMemoryRequested + "m"
   if (environment != null) {
     executorEnvs ++= environment
   }
@@ -156,14 +157,12 @@ class SparkContext(
         scheduler
 
       case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
-        // Check to make sure SPARK_MEM <= memoryPerSlave. Otherwise Spark will just hang.
+        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
         val memoryPerSlaveInt = memoryPerSlave.toInt
-        val sparkMemEnv = System.getenv("SPARK_MEM")
-        val sparkMemEnvInt = if (sparkMemEnv != null) Utils.memoryStringToMb(sparkMemEnv) else 512
-        if (sparkMemEnvInt > memoryPerSlaveInt) {
+        if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) {
           throw new SparkException(
-            "Slave memory (%d MB) cannot be smaller than SPARK_MEM (%d MB)".format(
-              memoryPerSlaveInt, sparkMemEnvInt))
+            "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
+              memoryPerSlaveInt, SparkContext.executorMemoryRequested))
         }
 
         val scheduler = new ClusterScheduler(this)
@@ -881,6 +880,15 @@ object SparkContext {
 
   /** Find the JAR that contains the class of a particular object */
   def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
+
+  /** Get the amount of memory per executor requested through system properties or SPARK_MEM */
+  private[spark] val executorMemoryRequested = {
+    // TODO: Might need to add some extra memory for the non-heap parts of the JVM
+    Option(System.getProperty("spark.executor.memory"))
+      .orElse(Option(System.getenv("SPARK_MEM")))
+      .map(Utils.memoryStringToMb)
+      .getOrElse(512)
+  }
 }
 
 
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
index 9ac875de3a..8844057a5c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
@@ -1,6 +1,6 @@
 package spark.scheduler.cluster
 
-import spark.Utils
+import spark.{SparkContext, Utils}
 
 /**
  * A backend interface for cluster scheduling systems that allows plugging in different ones under
@@ -14,14 +14,7 @@ private[spark] trait SchedulerBackend {
   def defaultParallelism(): Int
 
   // Memory used by each executor (in megabytes)
-  protected val executorMemory = {
-    // TODO: Might need to add some extra memory for the non-heap parts of the JVM
-    Option(System.getProperty("spark.executor.memory"))
-      .orElse(Option(System.getenv("SPARK_MEM")))
-      .map(Utils.memoryStringToMb)
-      .getOrElse(512)
-  }
-
+  protected val executorMemory: Int = SparkContext.executorMemoryRequested
 
   // TODO: Probably want to add a killTask too
 }
diff --git a/docs/configuration.md b/docs/configuration.md
index 2de512f896..ae61769e31 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -25,23 +25,25 @@ Inside `spark-env.sh`, you *must* set at least the following two variables:
 * `SCALA_HOME`, to point to your Scala installation.
 * `MESOS_NATIVE_LIBRARY`, if you are [running on a Mesos cluster](running-on-mesos.html).
 
-In addition, there are four other variables that control execution. These can be set *either in `spark-env.sh`
-or in each job's driver program*, because they will automatically be propagated to workers from the driver.
-For a multi-user environment, we recommend setting the in the driver program instead of `spark-env.sh`, so
-that different user jobs can use different amounts of memory, JVM options, etc.
+In addition, there are four other variables that control execution. These should be set *in the environment that
+launches the job's driver program* instead of `spark-env.sh`, because they will be automatically propagated to
+workers. Setting these per-job instead of in `spark-env.sh` ensures that different jobs can have different settings
+for these variables.
 
-* `SPARK_MEM`, to set the amount of memory used per node (this should be in the same format as the 
-   JVM's -Xmx option, e.g. `300m` or `1g`)
 * `SPARK_JAVA_OPTS`, to add JVM options. This includes any system properties that you'd like to pass with `-D`.
 * `SPARK_CLASSPATH`, to add elements to Spark's classpath.
 * `SPARK_LIBRARY_PATH`, to add search directories for native libraries.
+* `SPARK_MEM`, to set the amount of memory used per node. This should be in the same format as the 
+   JVM's -Xmx option, e.g. `300m` or `1g`. Note that this option will soon be deprecated in favor of
+   the `spark.executor.memory` system property, so we recommend using that in new code.
 
-Note that if you do set these in `spark-env.sh`, they will override the values set by user programs, which
-is undesirable; you can choose to have `spark-env.sh` set them only if the user program hasn't, as follows:
+Beware that if you do set these variables in `spark-env.sh`, they will override the values set by user programs,
+which is undesirable; if you prefer, you can choose to have `spark-env.sh` set them only if the user program
+hasn't, as follows:
 
 {% highlight bash %}
-if [ -z "$SPARK_MEM" ] ; then
-  SPARK_MEM="1g"
+if [ -z "$SPARK_JAVA_OPTS" ] ; then
+  SPARK_JAVA_OPTS="-verbose:gc"
 fi
 {% endhighlight %}
 
@@ -55,10 +57,17 @@ val sc = new SparkContext(...)
 {% endhighlight %}
 
 Most of the configurable system properties control internal settings that have reasonable default values. However,
-there are at least four properties that you will commonly want to control:
+there are at least five properties that you will commonly want to control:
 
 <table class="table">
 <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr>
+  <td>spark.executor.memory</td>
+  <td>512m</td>
+  <td>
+    Amount of memory to use per executor process, in the same format as JVM memory strings (e.g. `512m`, `2g`).
+  </td>
+</tr>
 <tr>
   <td>spark.serializer</td>
   <td>spark.JavaSerializer</td>
diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md
index dc57035eba..eab8a0ff20 100644
--- a/docs/ec2-scripts.md
+++ b/docs/ec2-scripts.md
@@ -106,9 +106,8 @@ permissions on your private key file, you can run `launch` with the
 # Configuration
 
 You can edit `/root/spark/conf/spark-env.sh` on each machine to set Spark configuration options, such
-as JVM options and, most crucially, the amount of memory to use per machine (`SPARK_MEM`).
-This file needs to be copied to **every machine** to reflect the change. The easiest way to do this
-is to use a script we provide called `copy-dir`. First edit your `spark-env.sh` file on the master, 
+as JVM options. This file needs to be copied to **every machine** to reflect the change. The easiest way to
+do this is to use a script we provide called `copy-dir`. First edit your `spark-env.sh` file on the master, 
 then run `~/spark-ec2/copy-dir /root/spark/conf` to RSYNC it to all the workers.
 
 The [configuration guide](configuration.html) describes the available configuration options.
diff --git a/docs/tuning.md b/docs/tuning.md
index 32c7ab86e9..5ffca54481 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -157,9 +157,9 @@ their work directories), *not* on your driver program.
 
 **Cache Size Tuning**
 
-One important configuration parameter for GC is the amount of memory that should be used for
-caching RDDs. By default, Spark uses 66% of the configured memory (`SPARK_MEM`) to cache RDDs. This means that
- 33% of memory is available for any objects created during task execution.
+One important configuration parameter for GC is the amount of memory that should be used for caching RDDs.
+By default, Spark uses 66% of the configured executor memory (`spark.executor.memory` or `SPARK_MEM`) to
+cache RDDs. This means that 33% of memory is available for any objects created during task execution.
 
 In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of
 memory, lowering this value will help reduce the memory consumption. To change this to say 50%, you can call
-- 
GitLab