diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 8739c8bb6d4d8a7dea08621f2cb79aa0a95bf5a3..6b81a6f259efd2786ad1bdb926bd1f47b8a3fea8 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -92,6 +92,12 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars:
   private[spark] val addedFiles = HashMap[String, Long]()
   private[spark] val addedJars = HashMap[String, Long]()
 
+  // Environment variables to pass to our executors
+  private[spark] val executorEnvs = HashMap[String, String]()
+  Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", 
+    "SPARK_JAVA_OPTS", "SPARK_TESTING").foreach { key => executorEnvs.put(key, System.getenv(key)) }
+
+
   // Add each JAR given through the constructor
   jars.foreach { addJar(_) }
 
@@ -433,6 +439,12 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars:
     addedJars.clear()
   }
 
+  /* Sets an environment variable that will be passed to the executors */
+  def putExecutorEnv(key: String, value: String) {
+    logInfo("Setting executor environment variable " + key + "=" + value)
+    executorEnvs.put(key,value)
+  }
+
   /** Shut down the SparkContext. */
   def stop() {
     dagScheduler.stop()
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 73722a82e0c3899417bf5636ef5fcc4eaac1493e..637e763c9e192bc6d8e9ea313fb65acb7b27c0d3 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -116,10 +116,12 @@ private[spark] class ExecutorRunner(
       val builder = new ProcessBuilder(command: _*).directory(executorDir)
       val env = builder.environment()
       for ((key, value) <- jobDesc.command.environment) {
-        env.put(key, value)
+        if (value == null) {
+          logInfo("Environment variable not set: " + key)
+        } else {
+          env.put(key, value)
+        }
       }
-      env.put("SPARK_CORES", cores.toString)
-      env.put("SPARK_MEMORY", memory.toString)
       // In case we are running this from within the Spark Shell
       // so we are not creating a parent process.
       env.put("SPARK_LAUNCH_WITH_SCALA", "0")
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 88cb11454480abd73dcb7963ebee0e8b09a35e30..7aba7324ab00992702a283caf164f0f631d06299 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -20,15 +20,6 @@ private[spark] class SparkDeploySchedulerBackend(
 
   val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
 
-  // Environment variables to pass to our executors
-  val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
-    "SPARK_MEM",
-    "SPARK_CLASSPATH",
-    "SPARK_LIBRARY_PATH",
-    "SPARK_JAVA_OPTS",
-    "SPARK_TESTING"
-  )
-
   // Memory used by each executor (in megabytes)
   val executorMemory = {
     if (System.getenv("SPARK_MEM") != null) {
@@ -42,17 +33,11 @@ private[spark] class SparkDeploySchedulerBackend(
   override def start() {
     super.start()
 
-    val environment = new HashMap[String, String]
-    for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
-      if (System.getenv(key) != null) {
-        environment(key) = System.getenv(key)
-      }
-    }
     val masterUrl = "akka://spark@%s:%s/user/%s".format(
       System.getProperty("spark.master.host"), System.getProperty("spark.master.port"),
       StandaloneSchedulerBackend.ACTOR_NAME)
     val args = Seq(masterUrl, "{{SLAVEID}}", "{{HOSTNAME}}", "{{CORES}}")
-    val command = Command("spark.executor.StandaloneExecutorBackend", args, environment)
+    val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
     val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command)
 
     client = new Client(sc.env.actorSystem, master, jobDesc, this)
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index e6d8b9d8226226e7823e80ec571adf483a967ec6..29dd36be15344df084a6c106af6ab185077fbbc7 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -33,15 +33,6 @@ private[spark] class CoarseMesosSchedulerBackend(
   with MScheduler
   with Logging {
 
-  // Environment variables to pass to our executors
-  val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
-    "SPARK_MEM",
-    "SPARK_CLASSPATH",
-    "SPARK_LIBRARY_PATH",
-    "SPARK_JAVA_OPTS",
-    "SPARK_TESTING"
-  )
-
   val MAX_SLAVE_FAILURES = 2     // Blacklist a slave after this many failures
 
   // Memory used by each executor (in megabytes)
@@ -123,13 +114,15 @@ private[spark] class CoarseMesosSchedulerBackend(
     val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
       runScript, masterUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
     val environment = Environment.newBuilder()
-    for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
-      if (System.getenv(key) != null) {
+    sc.executorEnvs.foreach { case(key, value) =>
+      if (value == null) {
+        logInfo("Environment variable not set: " + key)
+      } else {
         environment.addVariables(Environment.Variable.newBuilder()
           .setName(key)
-          .setValue(System.getenv(key))
+          .setValue(value)
           .build())
-      }
+      }   
     }
     return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build()
   }
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 6f01c8c09dafeafe25cbcc425496b6c1eedb71b3..c4aee5c9cb7d361f0f3f00443fa7b50c827bec2d 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -29,15 +29,6 @@ private[spark] class MesosSchedulerBackend(
   with MScheduler
   with Logging {
 
-  // Environment variables to pass to our executors
-  val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
-    "SPARK_MEM",
-    "SPARK_CLASSPATH",
-    "SPARK_LIBRARY_PATH",
-    "SPARK_JAVA_OPTS",
-    "SPARK_TESTING"
-  )
-
   // Memory used by each executor (in megabytes)
   val EXECUTOR_MEMORY = {
     if (System.getenv("SPARK_MEM") != null) {
@@ -94,13 +85,15 @@ private[spark] class MesosSchedulerBackend(
     }
     val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
     val environment = Environment.newBuilder()
-    for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
-      if (System.getenv(key) != null) {
+    sc.executorEnvs.foreach { case(key, value) =>
+      if (value == null) {
+        logInfo("Environment variable not set: " + key)
+      } else {
         environment.addVariables(Environment.Variable.newBuilder()
           .setName(key)
-          .setValue(System.getenv(key))
+          .setValue(value)
           .build())
-      }
+      }   
     }
     val memory = Resource.newBuilder()
       .setName("mem")