diff --git a/bin/spark-class b/bin/spark-class
index 79af42c72c76611b80a3372880449ffbf5f9fdff..1b945461fabc8fed4ff0cb5c702785f28e790ee8 100755
--- a/bin/spark-class
+++ b/bin/spark-class
@@ -71,6 +71,8 @@ case "$1" in
   'org.apache.spark.executor.MesosExecutorBackend')
     OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
     OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
+    export PYTHONPATH="$FWDIR/python:$PYTHONPATH"
+    export PYTHONPATH="$FWDIR/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
     ;;
 
   # Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index d252fe8595fb8a507023b74a13356149c04b7dd7..79c9051e88691e316755f18d81e062786864bc6e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -30,6 +30,7 @@ import org.apache.mesos._
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState,
   ExecutorInfo => MesosExecutorInfo, _}
 
+import org.apache.spark.executor.MesosExecutorBackend
 import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.scheduler._
@@ -123,14 +124,15 @@ private[spark] class MesosSchedulerBackend(
     val command = CommandInfo.newBuilder()
       .setEnvironment(environment)
     val uri = sc.conf.get("spark.executor.uri", null)
+    val executorBackendName = classOf[MesosExecutorBackend].getName
     if (uri == null) {
-      val executorPath = new File(executorSparkHome, "/sbin/spark-executor").getCanonicalPath
-      command.setValue("%s %s".format(prefixEnv, executorPath))
+      val executorPath = new File(executorSparkHome, "/bin/spark-class").getCanonicalPath
+      command.setValue(s"$prefixEnv $executorPath $executorBackendName")
     } else {
       // Grab everything to the first '.'. We'll use that and '*' to
       // glob the directory "correctly".
       val basename = uri.split('/').last.split('.').head
-      command.setValue("cd %s*; %s ./sbin/spark-executor".format(basename, prefixEnv))
+      command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
       command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
     }
     val cpus = Resource.newBuilder()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
index 78a30a40bf19ad33485f63f14fc644bfbfa50319..073814c127edc24ca5cf44751cfb3727f4dbcc81 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.scheduler.mesos
 
+import org.apache.spark.executor.MesosExecutorBackend
 import org.scalatest.FunSuite
 import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
 import org.apache.spark.scheduler.{SparkListenerExecutorAdded, LiveListenerBus,
@@ -37,6 +38,37 @@ import scala.collection.mutable.ArrayBuffer
 
 class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar {
 
+  test("check spark-class location correctly") {
+    val conf = new SparkConf
+    conf.set("spark.mesos.executor.home" , "/mesos-home")
+
+    val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
+    listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2)))
+    EasyMock.replay(listenerBus)
+
+    val sc = EasyMock.createMock(classOf[SparkContext])
+    EasyMock.expect(sc.getSparkHome()).andReturn(Option("/spark-home")).anyTimes()
+    EasyMock.expect(sc.conf).andReturn(conf).anyTimes()
+    EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
+    EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
+    EasyMock.expect(sc.listenerBus).andReturn(listenerBus)
+    EasyMock.replay(sc)
+    val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
+    EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
+    EasyMock.replay(taskScheduler)
+
+    val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+
+    // uri is null.
+    val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id")
+    assert(executorInfo.getCommand.getValue === s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
+
+    // uri exists.
+    conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
+    val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id")
+    assert(executorInfo1.getCommand.getValue === s"cd test-app-1*;  ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
+  }
+
   test("mesos resource offers result in launching tasks") {
     def createOffer(id: Int, mem: Int, cpu: Int) = {
       val builder = Offer.newBuilder()
diff --git a/sbin/spark-executor b/sbin/spark-executor
deleted file mode 100755
index 674ce906d9421cd53cfb3dc28dfdbd18383c54cd..0000000000000000000000000000000000000000
--- a/sbin/spark-executor
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/bin/sh
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
-
-export PYTHONPATH="$FWDIR/python:$PYTHONPATH"
-export PYTHONPATH="$FWDIR/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
-
-echo "Running spark-executor with framework dir = $FWDIR"
-exec "$FWDIR"/bin/spark-class org.apache.spark.executor.MesosExecutorBackend