Skip to content
Snippets Groups Projects
Commit cf52a3cb authored by Y.CORP.YAHOO.COM\tgraves's avatar Y.CORP.YAHOO.COM\tgraves
Browse files

Allow for Executors to have different directories then the Spark Master for Yarn

parent 6dd64e8b
No related branches found
No related tags found
No related merge requests found
......@@ -60,6 +60,13 @@ private[spark] class Executor(
System.setProperty(key, value)
}
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. This will be used later when SparkEnv
// created.
if (java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) {
System.setProperty("spark.local.dir", getYarnLocalDirs())
}
// Create our ClassLoader and set it on this thread
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
......@@ -107,6 +114,24 @@ private[spark] class Executor(
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
}
/** Get the Yarn approved local directories. */
private def getYarnLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
var localDirs = System.getenv("LOCAL_DIRS")
val yarnLocalSysDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
yarnLocalSysDirs match {
case Some(s) => localDirs = s
case None => {
if ((localDirs == null) || (localDirs.isEmpty())) {
throw new Exception("Yarn Local dirs can't be empty")
}
}
}
return localDirs
}
class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
extends Runnable {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment