Skip to content
Snippets Groups Projects
Commit d6fb485d authored by jerryshao's avatar jerryshao Committed by Marcelo Vanzin
Browse files

[SPARK-14423][YARN] Avoid same name files added to distributed cache again

## What changes were proposed in this pull request?

In the current implementation of assembly-free spark deployment, jars under `assembly/target/scala-xxx/jars` will be uploaded to distributed cache by default, there's a chance these jars' name will be conflicted with name of jars specified in `--jars`, this will introduce exception when starting application:

```
client token: N/A
	 diagnostics: Application application_1459907402325_0004 failed 2 times due to AM Container for appattempt_1459907402325_0004_000002 exited with  exitCode: -1000
For more detailed output, check application tracking page:http://hw12100.local:8088/proxy/application_1459907402325_0004/Then, click on links to logs of each attempt.
Diagnostics: Resource hdfs://localhost:8020/user/sshao/.sparkStaging/application_1459907402325_0004/avro-mapred-1.7.7-hadoop2.jar changed on src filesystem (expected 1459909780508, was 1459909782590
java.io.IOException: Resource hdfs://localhost:8020/user/sshao/.sparkStaging/application_1459907402325_0004/avro-mapred-1.7.7-hadoop2.jar changed on src filesystem (expected 1459909780508, was 1459909782590
	at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
	at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
```

So here by checking the name of file to avoid same name files uploaded again.

## How was this patch tested?

Unit test and manual integrated test is done locally.

Author: jerryshao <sshao@hortonworks.com>

Closes #12203 from jerryshao/SPARK-14423.
parent 1a396647
No related branches found
No related tags found
No related merge requests found
...@@ -364,6 +364,10 @@ private[spark] class Client( ...@@ -364,6 +364,10 @@ private[spark] class Client(
// multiple times, YARN will fail to launch containers for the app with an internal // multiple times, YARN will fail to launch containers for the app with an internal
// error. // error.
val distributedUris = new HashSet[String] val distributedUris = new HashSet[String]
// Used to keep track of URIs(files) added to the distribute cache have the same name. If
// same name but different path files are added multiple time, YARN will fail to launch
// containers for the app with an internal error.
val distributedNames = new HashSet[String]
YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials) YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials) YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials)
...@@ -376,11 +380,16 @@ private[spark] class Client( ...@@ -376,11 +380,16 @@ private[spark] class Client(
def addDistributedUri(uri: URI): Boolean = { def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString() val uriStr = uri.toString()
val fileName = new File(uri.getPath).getName
if (distributedUris.contains(uriStr)) { if (distributedUris.contains(uriStr)) {
logWarning(s"Resource $uri added multiple times to distributed cache.") logWarning(s"Same path resource $uri added multiple times to distributed cache.")
false
} else if (distributedNames.contains(fileName)) {
logWarning(s"Same name resource $uri added multiple times to distributed cache")
false false
} else { } else {
distributedUris += uriStr distributedUris += uriStr
distributedNames += fileName
true true
} }
} }
...@@ -519,8 +528,7 @@ private[spark] class Client( ...@@ -519,8 +528,7 @@ private[spark] class Client(
).foreach { case (flist, resType, addToClasspath) => ).foreach { case (flist, resType, addToClasspath) =>
flist.foreach { file => flist.foreach { file =>
val (_, localizedPath) = distribute(file, resType = resType) val (_, localizedPath) = distribute(file, resType = resType)
require(localizedPath != null) if (addToClasspath && localizedPath != null) {
if (addToClasspath) {
cachedSecondaryJarLinks += localizedPath cachedSecondaryJarLinks += localizedPath
} }
} }
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package org.apache.spark.deploy.yarn package org.apache.spark.deploy.yarn
import java.io.{File, FileOutputStream} import java.io.{File, FileInputStream, FileOutputStream}
import java.net.URI import java.net.URI
import java.util.Properties import java.util.Properties
...@@ -285,6 +285,36 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll ...@@ -285,6 +285,36 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
} }
test("ignore same name jars") {
val libs = Utils.createTempDir()
val jarsDir = new File(libs, "jars")
assert(jarsDir.mkdir())
new FileOutputStream(new File(libs, "RELEASE")).close()
val userLibs = Utils.createTempDir()
val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir)
val jar2 = TestUtils.createJarWithFiles(Map(), userLibs)
// Copy jar2 to jar3 with same name
val jar3 = {
val target = new File(userLibs, new File(jar1.toURI).getName)
val input = new FileInputStream(jar2.getPath)
val output = new FileOutputStream(target)
Utils.copyStream(input, output, closeStreams = true)
target.toURI.toURL
}
val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
.set(JARS_TO_DISTRIBUTE, Seq(jar2.getPath, jar3.getPath))
val client = createClient(sparkConf)
val tempDir = Utils.createTempDir()
client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
// Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar1 will be
// ignored.
sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName)))
}
object Fixtures { object Fixtures {
val knownDefYarnAppCP: Seq[String] = val knownDefYarnAppCP: Seq[String] =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment