diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index 1fe956320a1b8b05035526c5dc582051a957a9a9..957a928bc402b5a14dd0e53a9757b40e7b2cb7c3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -392,15 +392,14 @@ private[spark] object RestSubmissionClient {
       mainClass: String,
       appArgs: Array[String],
       conf: SparkConf,
-      env: Map[String, String] = sys.env): SubmitRestProtocolResponse = {
+      env: Map[String, String] = Map()): SubmitRestProtocolResponse = {
     val master = conf.getOption("spark.master").getOrElse {
       throw new IllegalArgumentException("'spark.master' must be set.")
     }
     val sparkProperties = conf.getAll.toMap
-    val environmentVariables = env.filter { case (k, _) => k.startsWith("SPARK_") }
     val client = new RestSubmissionClient(master)
     val submitRequest = client.constructSubmitRequest(
-      appResource, mainClass, appArgs, sparkProperties, environmentVariables)
+      appResource, mainClass, appArgs, sparkProperties, env)
     client.createSubmission(submitRequest)
   }
 
@@ -413,6 +412,16 @@ private[spark] object RestSubmissionClient {
     val mainClass = args(1)
     val appArgs = args.slice(2, args.size)
     val conf = new SparkConf
-    run(appResource, mainClass, appArgs, conf)
+    val env = filterSystemEnvironment(sys.env)
+    run(appResource, mainClass, appArgs, conf, env)
+  }
+
+  /**
+   * Filter non-spark environment variables from any environment.
+   */
+  private[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = {
+    env.filter { case (k, _) =>
+      (k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED") || k.startsWith("MESOS_")
+    }
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 96e456d889ac36483789345ae291c8d5990afa1f..9693e32bf6af606a500e083be97d03bb31447be7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -366,6 +366,18 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach {
     assert(conn3.getResponseCode === HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
   }
 
+  test("client does not send 'SPARK_ENV_LOADED' env var by default") {
+    val environmentVariables = Map("SPARK_VAR" -> "1", "SPARK_ENV_LOADED" -> "1")
+    val filteredVariables = RestSubmissionClient.filterSystemEnvironment(environmentVariables)
+    assert(filteredVariables == Map("SPARK_VAR" -> "1"))
+  }
+
+  test("client includes mesos env vars") {
+    val environmentVariables = Map("SPARK_VAR" -> "1", "MESOS_VAR" -> "1", "OTHER_VAR" -> "1")
+    val filteredVariables = RestSubmissionClient.filterSystemEnvironment(environmentVariables)
+    assert(filteredVariables == Map("SPARK_VAR" -> "1", "MESOS_VAR" -> "1"))
+  }
+
   /* --------------------- *
    |     Helper methods    |
    * --------------------- */