diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 036c7191adcffd1383c05c184105ff50b15f662f..9e0356a711beb6d5372b3e37010622e11690d776 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -60,6 +60,13 @@ private[spark] class Executor(
     System.setProperty(key, value)
   }
 
+  // If we are in yarn mode, systems can have different disk layouts so we must set it
+  // to what Yarn on this system said was available. This will be used later when SparkEnv
+  // created.
+  if (java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) {
+    System.setProperty("spark.local.dir", getYarnLocalDirs())
+  }
+
   // Create our ClassLoader and set it on this thread
   private val urlClassLoader = createClassLoader()
   private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
@@ -107,6 +114,24 @@ private[spark] class Executor(
     threadPool.execute(new TaskRunner(context, taskId, serializedTask))
   }
 
+  /** Get the Yarn approved local directories. */
+  private def getYarnLocalDirs(): String = {
+    // Hadoop 0.23 and 2.x have different Environment variable names for the
+    // local dirs, so lets check both. We assume one of the 2 is set.
+    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+    var localDirs = System.getenv("LOCAL_DIRS")
+    val yarnLocalSysDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+    yarnLocalSysDirs match {
+        case Some(s) => localDirs = s
+        case None => {
+          if ((localDirs == null) || (localDirs.isEmpty())) {
+            throw new Exception("Yarn Local dirs can't be empty")
+          }
+        }
+    }
+    return localDirs
+  }
+
   class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
     extends Runnable {