diff --git a/run b/run
index 8be8f73220cf0474e642d9a5beca03a2010e01dc..627f44a5637c1edd394bd8749f33b4245905d0b1 100755
--- a/run
+++ b/run
@@ -3,14 +3,20 @@
 # Figure out where the Scala framework is installed
 FWDIR=`dirname $0`
 
+# Export this as SPARK_HOME
+export SPARK_HOME="$FWDIR"
+
 # Load environment variables from conf/spark-env.sh, if it exists
 if [ -e $FWDIR/conf/spark-env.sh ] ; then
   . $FWDIR/conf/spark-env.sh
 fi
 
+MESOS_CLASSPATH=""
+MESOS_LIBRARY_PATH=""
+
 if [ "x$MESOS_HOME" != "x" ] ; then
-  SPARK_CLASSPATH="$MESOS_HOME/lib/java/mesos.jar:$SPARK_CLASSPATH"
-  SPARK_LIBRARY_PATH="$MESOS_HOME/lib/java:$SPARK_LIBARY_PATH"
+  MESOS_CLASSPATH="$MESOS_HOME/lib/java/mesos.jar"
+  MESOS_LIBRARY_PATH="$MESOS_HOME/lib/java"
 fi
 
 if [ "x$SPARK_MEM" == "x" ] ; then
@@ -19,7 +25,7 @@ fi
 
 # Set JAVA_OPTS to be able to load native libraries and to set heap size
 JAVA_OPTS="$SPARK_JAVA_OPTS"
-JAVA_OPTS+=" -Djava.library.path=$SPARK_LIBRARY_PATH:$FWDIR/third_party:$FWDIR/src/native"
+JAVA_OPTS+=" -Djava.library.path=$SPARK_LIBRARY_PATH:$FWDIR/third_party:$FWDIR/src/native:$MESOS_LIBRARY_PATH"
 JAVA_OPTS+=" -Xms$SPARK_MEM -Xmx$SPARK_MEM"
 # Load extra JAVA_OPTS from conf/java-opts, if it exists
 if [ -e $FWDIR/conf/java-opts ] ; then
@@ -28,7 +34,7 @@ fi
 export JAVA_OPTS
 
 # Build up classpath
-CLASSPATH="$SPARK_CLASSPATH:$FWDIR/build/classes"
+CLASSPATH="$SPARK_CLASSPATH:$FWDIR/build/classes:$MESOS_CLASSPATH"
 CLASSPATH+=:$FWDIR/conf
 CLASSPATH+=:$FWDIR/third_party/mesos.jar
 CLASSPATH+=:$FWDIR/third_party/asm-3.2/lib/all/asm-all-3.2.jar
diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala
index 40680a625f0e7e671ccbbe76e07b2700971cd96b..bc24bf37fd1614ab33b6952b41ac91105c6205d5 100644
--- a/src/scala/spark/MesosScheduler.scala
+++ b/src/scala/spark/MesosScheduler.scala
@@ -28,6 +28,13 @@ private class MesosScheduler(
   master: String, frameworkName: String, execArg: Array[Byte])
 extends MScheduler with spark.Scheduler with Logging
 {
+  // Environment variables to pass to our executors
+  val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
+    "SPARK_MEM",
+    "SPARK_CLASSPATH",
+    "SPARK_LIBRARY_PATH"
+  )
+
   // Lock used to wait for  scheduler to be registered
   var isRegistered = false
   val registeredLock = new Object()
@@ -70,8 +77,28 @@ extends MScheduler with spark.Scheduler with Logging
 
   override def getFrameworkName(d: SchedulerDriver): String = frameworkName
   
-  override def getExecutorInfo(d: SchedulerDriver): ExecutorInfo =
-    new ExecutorInfo(new File("spark-executor").getCanonicalPath(), execArg)
+  // Get Spark's home location from either the spark.home Java property
+  // or the SPARK_HOME environment variable (in that order of preference).
+  // If neither of these is set, throws an exception.
+  def getSparkHome(): String = {
+    if (System.getProperty("spark.home") != null)
+      System.getProperty("spark.home")
+    else if (System.getenv("SPARK_HOME") != null)
+      System.getenv("SPARK_HOME")
+    else
+      throw new SparkException("Spark home is not set; either set the " +
+        "spark.home system property or the SPARK_HOME environment variable")
+  }
+
+  override def getExecutorInfo(d: SchedulerDriver): ExecutorInfo = {
+    val execScript = new File(getSparkHome, "spark-executor").getCanonicalPath
+    val params = new JHashMap[String, String]
+    for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
+      if (System.getenv(key) != null)
+        params(key) = System.getenv(key)
+    }
+    new ExecutorInfo(execScript, execArg)
+  }
 
   /**
    * The primary means to submit a job to the scheduler. Given a list of tasks,