From 626aab79c9b4d4ac9d65bf5fa45b81dd9cbc609c Mon Sep 17 00:00:00 2001
From: Lianhui Wang <lianhuiwang09@gmail.com>
Date: Tue, 13 Oct 2015 08:29:47 -0500
Subject: [PATCH] [SPARK-11026] [YARN] spark.yarn.user.classpath.first does
 work for 'spark-submit --jars hdfs://user/foo.jar'

when spark.yarn.user.classpath.first=true and using 'spark-submit --jars hdfs://user/foo.jar', it can not put foo.jar to system classpath. so we need to put yarn's linkNames of jars to the system classpath. vanzin tgravescs

Author: Lianhui Wang <lianhuiwang09@gmail.com>

Closes #9045 from lianhuiwang/spark-11026.
---
 .../org/apache/spark/deploy/yarn/Client.scala | 23 ++++++++++++-------
 1 file changed, 15 insertions(+), 8 deletions(-)

diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index d25d830fd4..9fcfe362a3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1212,7 +1212,7 @@ object Client extends Logging {
         } else {
           getMainJarUri(sparkConf.getOption(CONF_SPARK_USER_JAR))
         }
-      mainJar.foreach(addFileToClasspath(sparkConf, _, APP_JAR, env))
+      mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR, env))
 
       val secondaryJars =
         if (args != null) {
@@ -1221,10 +1221,10 @@ object Client extends Logging {
           getSecondaryJarUris(sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
         }
       secondaryJars.foreach { x =>
-        addFileToClasspath(sparkConf, x, null, env)
+        addFileToClasspath(sparkConf, conf, x, null, env)
       }
     }
-    addFileToClasspath(sparkConf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
+    addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
     populateHadoopClasspath(conf, env)
     sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
       addClasspathEntry(getClusterPath(sparkConf, cp), env)
@@ -1259,15 +1259,17 @@ object Client extends Logging {
    * If an alternate name for the file is given, and it's not a "local:" file, the alternate
    * name will be added to the classpath (relative to the job's work directory).
    *
-   * If not a "local:" file and no alternate name, the environment is not modified.
+   * If not a "local:" file and no alternate name, the linkName will be added to the classpath.
    *
-   * @param conf      Spark configuration.
-   * @param uri       URI to add to classpath (optional).
-   * @param fileName  Alternate name for the file (optional).
-   * @param env       Map holding the environment variables.
+   * @param conf        Spark configuration.
+   * @param hadoopConf  Hadoop configuration.
+   * @param uri         URI to add to classpath (optional).
+   * @param fileName    Alternate name for the file (optional).
+   * @param env         Map holding the environment variables.
    */
   private def addFileToClasspath(
       conf: SparkConf,
+      hadoopConf: Configuration,
       uri: URI,
       fileName: String,
       env: HashMap[String, String]): Unit = {
@@ -1276,6 +1278,11 @@ object Client extends Logging {
     } else if (fileName != null) {
       addClasspathEntry(buildPath(
         YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
+    } else if (uri != null) {
+      val localPath = getQualifiedLocalPath(uri, hadoopConf)
+      val linkName = Option(uri.getFragment()).getOrElse(localPath.getName())
+      addClasspathEntry(buildPath(
+        YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env)
     }
   }
 
-- 
GitLab