From bc36df127d3b9f56b4edaeb5eca7697d4aef761a Mon Sep 17 00:00:00 2001
From: Devaraj K <devaraj@apache.org>
Date: Tue, 5 Apr 2016 14:12:00 -0500
Subject: [PATCH] [SPARK-13063][YARN] Make the SPARK YARN STAGING DIR as
 configurable
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

## What changes were proposed in this pull request?
Made the SPARK YARN STAGING DIR as configurable with the configuration as 'spark.yarn.staging-dir'.

## How was this patch tested?

I have verified it manually by running applications on yarn, If the 'spark.yarn.staging-dir' is configured then the value used as staging directory otherwise uses the default value i.e. file system’s home directory for the user.

Author: Devaraj K <devaraj@apache.org>

Closes #12082 from devaraj-kavali/SPARK-13063.
---
 docs/running-on-yarn.md                        |  7 +++++++
 .../org/apache/spark/deploy/yarn/Client.scala  | 18 +++++++++++++++---
 .../org/apache/spark/deploy/yarn/config.scala  |  5 +++++
 3 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index bb83272ec8..ddc75a70b9 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -159,6 +159,13 @@ If you need a reference to the proper location to put log files in the YARN so t
     HDFS replication level for the files uploaded into HDFS for the application. These include things like the Spark jar, the app jar, and any distributed cache files/archives.
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.stagingDir</code></td>
+  <td>Current user's home directory in the filesystem</td>
+  <td>
+    Staging directory used while submitting applications.
+  </td>
+</tr>
 <tr>
   <td><code>spark.yarn.preserve.staging.files</code></td>
   <td><code>false</code></td>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 336e29fc6b..5e7e3be08d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -182,8 +182,8 @@ private[spark] class Client(
     val appStagingDir = getAppStagingDir(appId)
     try {
       val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
-      val stagingDirPath = new Path(appStagingDir)
       val fs = FileSystem.get(hadoopConf)
+      val stagingDirPath = getAppStagingDirPath(sparkConf, fs, appStagingDir)
       if (!preserveFiles && fs.exists(stagingDirPath)) {
         logInfo("Deleting staging directory " + stagingDirPath)
         fs.delete(stagingDirPath, true)
@@ -357,7 +357,7 @@ private[spark] class Client(
     // Upload Spark and the application JAR to the remote file system if necessary,
     // and add them as local resources to the application master.
     val fs = FileSystem.get(hadoopConf)
-    val dst = new Path(fs.getHomeDirectory(), appStagingDir)
+    val dst = getAppStagingDirPath(sparkConf, fs, appStagingDir)
     val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
     YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
     // Used to keep track of URIs added to the distributed cache. If the same URI is added
@@ -668,7 +668,7 @@ private[spark] class Client(
     env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
     if (loginFromKeytab) {
       val remoteFs = FileSystem.get(hadoopConf)
-      val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir)
+      val stagingDirPath = getAppStagingDirPath(sparkConf, remoteFs, stagingDir)
       val credentialsFile = "credentials-" + UUID.randomUUID().toString
       sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString)
       logInfo(s"Credentials file set to: $credentialsFile")
@@ -1438,4 +1438,16 @@ private object Client extends Logging {
     uri.startsWith(s"$LOCAL_SCHEME:")
   }
 
+  /**
+   *  Returns the app staging dir based on the STAGING_DIR configuration if configured
+   *  otherwise based on the users home directory.
+   */
+  private def getAppStagingDirPath(
+      conf: SparkConf,
+      fs: FileSystem,
+      appStagingDir: String): Path = {
+    val baseDir = conf.get(STAGING_DIR).map { new Path(_) }.getOrElse(fs.getHomeDirectory())
+    new Path(baseDir, appStagingDir)
+  }
+
 }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index a3b9134b58..5188a3e229 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -108,6 +108,11 @@ package object config {
     .intConf
     .optional
 
+  private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
+    .doc("Staging directory used while submitting applications.")
+    .stringConf
+    .optional
+
   /* Cluster-mode launcher configuration. */
 
   private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
-- 
GitLab