diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4694790c72cd8f2df0f911c4bd86d76841a5a872..63478c88b057bdf0f1c968600903c500e8632a58 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1716,29 +1716,12 @@ class SparkContext(config: SparkConf) extends Logging {
         key = uri.getScheme match {
           // A JAR file which exists only on the driver node
           case null | "file" =>
-            if (master == "yarn" && deployMode == "cluster") {
-              // In order for this to work in yarn cluster mode the user must specify the
-              // --addJars option to the client to upload the file into the distributed cache
-              // of the AM to make it show up in the current working directory.
-              val fileName = new Path(uri.getPath).getName()
-              try {
-                env.rpcEnv.fileServer.addJar(new File(fileName))
-              } catch {
-                case e: Exception =>
-                  // For now just log an error but allow to go through so spark examples work.
-                  // The spark examples don't really need the jar distributed since its also
-                  // the app jar.
-                  logError("Error adding jar (" + e + "), was the --addJars option used?")
-                  null
-              }
-            } else {
-              try {
-                env.rpcEnv.fileServer.addJar(new File(uri.getPath))
-              } catch {
-                case exc: FileNotFoundException =>
-                  logError(s"Jar not found at $path")
-                  null
-              }
+            try {
+              env.rpcEnv.fileServer.addJar(new File(uri.getPath))
+            } catch {
+              case exc: FileNotFoundException =>
+                logError(s"Jar not found at $path")
+                null
             }
           // A JAR file which exists locally on every worker node
           case "local" =>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 55e4a833b67079a3cdae530edb36d859803a9a4d..053a78617d4e069a0d4c6393000143549eaa7f00 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1202,7 +1202,10 @@ private object Client extends Logging {
     // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
     System.setProperty("SPARK_YARN_MODE", "true")
     val sparkConf = new SparkConf
-
+    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
+    // so remove them from sparkConf here for yarn mode.
+    sparkConf.remove("spark.jars")
+    sparkConf.remove("spark.files")
     val args = new ClientArguments(argStrings)
     new Client(args, sparkConf).run()
   }