diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 71650cd773bcfb5904120f0475f003652bc7607f..71d7385b08eb99fd6d13425f62dcaee2b002decb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -111,13 +111,14 @@ private[spark] class ExecutorRunner(
     case "{{EXECUTOR_ID}}" => execId.toString
     case "{{HOSTNAME}}" => host
     case "{{CORES}}" => cores.toString
+    case "{{APP_ID}}" => appId
     case other => other
   }
 
   def getCommandSeq = {
     val command = Command(
       appDesc.command.mainClass,
-      appDesc.command.arguments.map(substituteVariables) ++ Seq(appId),
+      appDesc.command.arguments.map(substituteVariables),
       appDesc.command.environment,
       appDesc.command.classPathEntries,
       appDesc.command.libraryPathEntries,
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 06061edfc084497f9ccdc58c5e3bc0d52b7e6ca2..c40a3e16675ad5cfe81450898c8c660eb326a8c2 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -152,6 +152,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
           "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
           "<cores> <appid> [<workerUrl>] ")
         System.exit(1)
+
+      // NB: These arguments are provided by SparkDeploySchedulerBackend (for standalone mode)
+      // and CoarseMesosSchedulerBackend (for mesos mode).
       case 5 =>
         run(args(0), args(1), args(2), args(3).toInt, args(4), None)
       case x if x > 5 =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index ed209d195ec9d5c385a535e183264c0f9ed836cb..8c7de75600b5f317df1051720eb65601887e1359 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -51,7 +51,8 @@ private[spark] class SparkDeploySchedulerBackend(
       conf.get("spark.driver.host"),
       conf.get("spark.driver.port"),
       CoarseGrainedSchedulerBackend.ACTOR_NAME)
-    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
+    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}",
+      "{{WORKER_URL}}")
     val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
       .map(Utils.splitCommandString).getOrElse(Seq.empty)
     val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 90828578cd88f22476afe523b6ae436e8a199153..d7f88de4b40aa596056e9899d96821545cfe747d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -150,17 +150,17 @@ private[spark] class CoarseMesosSchedulerBackend(
     if (uri == null) {
       val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
       command.setValue(
-        "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
-          runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+        "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format(
+          runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores, appId))
     } else {
       // Grab everything to the first '.'. We'll use that and '*' to
       // glob the directory "correctly".
       val basename = uri.split('/').last.split('.').head
       command.setValue(
         ("cd %s*; " +
-          "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d")
+          "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s")
           .format(basename, driverUrl, offer.getSlaveId.getValue,
-            offer.getHostname, numCores))
+            offer.getHostname, numCores, appId))
       command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
     }
     command.build()
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 39ab53cf0b5b1b0dd1321bbad3c37df0560e588f..5e2592e8d2e8d13454ab37dfef60de9de416d9ff 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -26,14 +26,12 @@ import org.apache.spark.SparkConf
 
 class ExecutorRunnerTest extends FunSuite {
   test("command includes appId") {
-    def f(s:String) = new File(s)
+    val appId = "12345-worker321-9876"
     val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
     val appDesc = new ApplicationDescription("app name", Some(8), 500,
-      Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl")
-    val appId = "12345-worker321-9876"
-    val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
-      f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
-
+      Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
+    val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
+      new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
     assert(er.getCommandSeq.last === appId)
   }
 }