From 569f77a11819523bdf5dc2c6429fc3399cbb6519 Mon Sep 17 00:00:00 2001
From: Kishor Patil <kpatil@yahoo-inc.com>
Date: Thu, 3 Nov 2016 16:10:26 -0500
Subject: [PATCH] [SPARK-18099][YARN] Fail if same files added to distributed
 cache for --files and --archives
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

## What changes were proposed in this pull request?

During spark-submit, if yarn dist cache is instructed to add same file under --files and --archives, This code change ensures the spark yarn distributed cache behaviour is retained i.e. to warn and fail if same files is mentioned in both --files and --archives.
## How was this patch tested?

Manually tested:
1. if same jar is mentioned in --jars and --files it will continue to submit the job.
- basically functionality [SPARK-14423] #12203 is unchanged
  1. if same file is mentioned in --files and --archives it will fail to submit the job.

Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.

… under archives and files

Author: Kishor Patil <kpatil@yahoo-inc.com>

Closes #15627 from kishorvpatil/spark18099.

(cherry picked from commit 098e4ca9c7af61e64839a50c65be449749af6482)
Signed-off-by: Tom Graves <tgraves@yahoo-inc.com>
---
 .../org/apache/spark/deploy/yarn/Client.scala | 12 +++++-
 .../spark/deploy/yarn/ClientSuite.scala       | 42 +++++++++++++++++++
 2 files changed, 52 insertions(+), 2 deletions(-)

diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 053a78617d..172fb46c98 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -598,8 +598,16 @@ private[spark] class Client(
     ).foreach { case (flist, resType, addToClasspath) =>
       flist.foreach { file =>
         val (_, localizedPath) = distribute(file, resType = resType)
-        if (addToClasspath && localizedPath != null) {
-          cachedSecondaryJarLinks += localizedPath
+        // If addToClassPath, we ignore adding jar multiple times to distitrbuted cache.
+        if (addToClasspath) {
+          if (localizedPath != null) {
+            cachedSecondaryJarLinks += localizedPath
+          }
+        } else {
+          if (localizedPath != null) {
+            throw new IllegalArgumentException(s"Attempt to add ($file) multiple times" +
+              " to the distributed cache.")
+          }
         }
       }
     }
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 0a4f291e25..06516c1baf 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -282,6 +282,48 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
     }
   }
 
+  test("distribute archive multiple times") {
+    val libs = Utils.createTempDir()
+    // Create jars dir and RELEASE file to avoid IllegalStateException.
+    val jarsDir = new File(libs, "jars")
+    assert(jarsDir.mkdir())
+    new FileOutputStream(new File(libs, "RELEASE")).close()
+
+    val userLib1 = Utils.createTempDir()
+    val testJar = TestUtils.createJarWithFiles(Map(), userLib1)
+
+    // Case 1:  FILES_TO_DISTRIBUTE and ARCHIVES_TO_DISTRIBUTE can't have duplicate files
+    val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
+      .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath))
+      .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath))
+
+    val client = createClient(sparkConf)
+    val tempDir = Utils.createTempDir()
+    intercept[IllegalArgumentException] {
+      client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
+    }
+
+    // Case 2: FILES_TO_DISTRIBUTE can't have duplicate files.
+    val sparkConfFiles = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
+      .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath))
+
+    val clientFiles = createClient(sparkConfFiles)
+    val tempDirForFiles = Utils.createTempDir()
+    intercept[IllegalArgumentException] {
+      clientFiles.prepareLocalResources(new Path(tempDirForFiles.getAbsolutePath()), Nil)
+    }
+
+    // Case 3: ARCHIVES_TO_DISTRIBUTE can't have duplicate files.
+    val sparkConfArchives = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
+      .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath))
+
+    val clientArchives = createClient(sparkConfArchives)
+    val tempDirForArchives = Utils.createTempDir()
+    intercept[IllegalArgumentException] {
+      clientArchives.prepareLocalResources(new Path(tempDirForArchives.getAbsolutePath()), Nil)
+    }
+  }
+
   test("distribute local spark jars") {
     val temp = Utils.createTempDir()
     val jarsDir = new File(temp, "jars")
-- 
GitLab