diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index d25d830fd4349024b10028d8cce1443984a8f196..9fcfe362a3ba29e8256daf9ee87c032cd9cd4a5f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1212,7 +1212,7 @@ object Client extends Logging {
         } else {
           getMainJarUri(sparkConf.getOption(CONF_SPARK_USER_JAR))
         }
-      mainJar.foreach(addFileToClasspath(sparkConf, _, APP_JAR, env))
+      mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR, env))
 
       val secondaryJars =
         if (args != null) {
@@ -1221,10 +1221,10 @@ object Client extends Logging {
           getSecondaryJarUris(sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
         }
       secondaryJars.foreach { x =>
-        addFileToClasspath(sparkConf, x, null, env)
+        addFileToClasspath(sparkConf, conf, x, null, env)
       }
     }
-    addFileToClasspath(sparkConf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
+    addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
     populateHadoopClasspath(conf, env)
     sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
       addClasspathEntry(getClusterPath(sparkConf, cp), env)
@@ -1259,15 +1259,17 @@ object Client extends Logging {
    * If an alternate name for the file is given, and it's not a "local:" file, the alternate
    * name will be added to the classpath (relative to the job's work directory).
    *
-   * If not a "local:" file and no alternate name, the environment is not modified.
+   * If not a "local:" file and no alternate name, the linkName will be added to the classpath.
    *
-   * @param conf      Spark configuration.
-   * @param uri       URI to add to classpath (optional).
-   * @param fileName  Alternate name for the file (optional).
-   * @param env       Map holding the environment variables.
+   * @param conf        Spark configuration.
+   * @param hadoopConf  Hadoop configuration.
+   * @param uri         URI to add to classpath (optional).
+   * @param fileName    Alternate name for the file (optional).
+   * @param env         Map holding the environment variables.
    */
   private def addFileToClasspath(
       conf: SparkConf,
+      hadoopConf: Configuration,
       uri: URI,
       fileName: String,
       env: HashMap[String, String]): Unit = {
@@ -1276,6 +1278,11 @@ object Client extends Logging {
     } else if (fileName != null) {
       addClasspathEntry(buildPath(
         YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
+    } else if (uri != null) {
+      val localPath = getQualifiedLocalPath(uri, hadoopConf)
+      val linkName = Option(uri.getFragment()).getOrElse(localPath.getName())
+      addClasspathEntry(buildPath(
+        YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env)
     }
   }