diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 0933a03a0fce49affd400be9415a833b0ada1a4c..4a888248542b94e59ae311abd55ad6eaca6b039a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -163,11 +163,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   }
 
   def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = {
-    val executorSparkHome = conf.getOption("spark.mesos.executor.home")
-      .orElse(sc.getSparkHome())
-      .getOrElse {
-        throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
-      }
     val environment = Environment.newBuilder()
     val extraClassPath = conf.getOption("spark.executor.extraClassPath")
     extraClassPath.foreach { cp =>
@@ -201,6 +196,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
 
     if (uri.isEmpty) {
+      val executorSparkHome = conf.getOption("spark.mesos.executor.home")
+        .orElse(sc.getSparkHome())
+        .getOrElse {
+          throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
+        }
       val runScript = new File(executorSparkHome, "./bin/spark-class").getPath
       command.setValue(
         "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 0e66979901540b3637cd12df4fabe3ced9745720..26a3ad49d0da6b11aa595197ee160e74b847b191 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -370,6 +370,21 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
   }
 
+  test("mesos supports spark.executor.uri") {
+    val url = "spark.spark.spark.com"
+    setBackend(Map(
+      "spark.executor.uri" -> url
+    ), false)
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    val launchedTasks = verifyTaskLaunched(driver, "o1")
+    assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == url)
+  }
+
   private def verifyDeclinedOffer(driver: SchedulerDriver,
       offerId: OfferID,
       filter: Boolean = false): Unit = {
@@ -435,13 +450,17 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     backend
   }
 
-  private def setBackend(sparkConfVars: Map[String, String] = null) {
+  private def setBackend(sparkConfVars: Map[String, String] = null,
+      setHome: Boolean = true) {
     sparkConf = (new SparkConf)
       .setMaster("local[*]")
       .setAppName("test-mesos-dynamic-alloc")
-      .setSparkHome("/path")
       .set("spark.mesos.driver.webui.url", "http://webui")
 
+    if (setHome) {
+      sparkConf.setSparkHome("/path")
+    }
+
     if (sparkConfVars != null) {
       sparkConf.setAll(sparkConfVars)
     }