From cf52a3cba6cce34d5d4da3da285c9d49d4ed3ceb Mon Sep 17 00:00:00 2001
From: "Y.CORP.YAHOO.COM\\tgraves" <tgraves@thatenemy-lm.champ.corp.yahoo.com>
Date: Tue, 27 Aug 2013 11:00:21 -0500
Subject: [PATCH] Allow for Executors to have different directories then the
 Spark Master for Yarn

---
 .../main/scala/spark/executor/Executor.scala  | 25 +++++++++++++++++++
 1 file changed, 25 insertions(+)

diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 036c7191ad..9e0356a711 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -60,6 +60,13 @@ private[spark] class Executor(
     System.setProperty(key, value)
   }
 
+  // If we are in yarn mode, systems can have different disk layouts so we must set it
+  // to what Yarn on this system said was available. This will be used later when SparkEnv
+  // created.
+  if (java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) {
+    System.setProperty("spark.local.dir", getYarnLocalDirs())
+  }
+
   // Create our ClassLoader and set it on this thread
   private val urlClassLoader = createClassLoader()
   private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
@@ -107,6 +114,24 @@ private[spark] class Executor(
     threadPool.execute(new TaskRunner(context, taskId, serializedTask))
   }
 
+  /** Get the Yarn approved local directories. */
+  private def getYarnLocalDirs(): String = {
+    // Hadoop 0.23 and 2.x have different Environment variable names for the
+    // local dirs, so lets check both. We assume one of the 2 is set.
+    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+    var localDirs = System.getenv("LOCAL_DIRS")
+    val yarnLocalSysDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+    yarnLocalSysDirs match {
+        case Some(s) => localDirs = s
+        case None => {
+          if ((localDirs == null) || (localDirs.isEmpty())) {
+            throw new Exception("Yarn Local dirs can't be empty")
+          }
+        }
+    }
+    return localDirs
+  }
+
   class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
     extends Runnable {
 
-- 
GitLab