diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 053a78617d4e069a0d4c6393000143549eaa7f00..172fb46c986c68fd46048195533cdb1540caf198 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -598,8 +598,16 @@ private[spark] class Client( ).foreach { case (flist, resType, addToClasspath) => flist.foreach { file => val (_, localizedPath) = distribute(file, resType = resType) - if (addToClasspath && localizedPath != null) { - cachedSecondaryJarLinks += localizedPath + // If addToClassPath, we ignore adding jar multiple times to distitrbuted cache. + if (addToClasspath) { + if (localizedPath != null) { + cachedSecondaryJarLinks += localizedPath + } + } else { + if (localizedPath != null) { + throw new IllegalArgumentException(s"Attempt to add ($file) multiple times" + + " to the distributed cache.") + } } } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 0a4f291e25fb0fc1bd871a4d6a1fc8387a85031d..06516c1baf1cc9fbe24baea9931d75ff1cc0b145 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -282,6 +282,48 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll } } + test("distribute archive multiple times") { + val libs = Utils.createTempDir() + // Create jars dir and RELEASE file to avoid IllegalStateException. + val jarsDir = new File(libs, "jars") + assert(jarsDir.mkdir()) + new FileOutputStream(new File(libs, "RELEASE")).close() + + val userLib1 = Utils.createTempDir() + val testJar = TestUtils.createJarWithFiles(Map(), userLib1) + + // Case 1: FILES_TO_DISTRIBUTE and ARCHIVES_TO_DISTRIBUTE can't have duplicate files + val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) + .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath)) + .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath)) + + val client = createClient(sparkConf) + val tempDir = Utils.createTempDir() + intercept[IllegalArgumentException] { + client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) + } + + // Case 2: FILES_TO_DISTRIBUTE can't have duplicate files. + val sparkConfFiles = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) + .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath)) + + val clientFiles = createClient(sparkConfFiles) + val tempDirForFiles = Utils.createTempDir() + intercept[IllegalArgumentException] { + clientFiles.prepareLocalResources(new Path(tempDirForFiles.getAbsolutePath()), Nil) + } + + // Case 3: ARCHIVES_TO_DISTRIBUTE can't have duplicate files. + val sparkConfArchives = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) + .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath)) + + val clientArchives = createClient(sparkConfArchives) + val tempDirForArchives = Utils.createTempDir() + intercept[IllegalArgumentException] { + clientArchives.prepareLocalResources(new Path(tempDirForArchives.getAbsolutePath()), Nil) + } + } + test("distribute local spark jars") { val temp = Utils.createTempDir() val jarsDir = new File(temp, "jars")