Skip to content
Snippets Groups Projects
Commit bcb0258a authored by Marcelo Vanzin's avatar Marcelo Vanzin Committed by Tom Graves
Browse files

[SPARK-16080][YARN] Set correct link name for conf archive in executors.

This makes sure the files are in the executor's classpath as they're
expected to be. Also update the unit test to make sure the files are
there as expected.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #13792 from vanzin/SPARK-16080.
parent 93338807
No related branches found
No related tags found
No related merge requests found
......@@ -160,11 +160,17 @@ private[spark] class ApplicationMaster(
}
// Distribute the conf archive to executors.
sparkConf.get(CACHED_CONF_ARCHIVE).foreach { uri =>
val fs = FileSystem.get(new URI(uri), yarnConf)
sparkConf.get(CACHED_CONF_ARCHIVE).foreach { path =>
val uri = new URI(path)
val fs = FileSystem.get(uri, yarnConf)
val status = fs.getFileStatus(new Path(uri))
setupDistributedCache(uri, LocalResourceType.ARCHIVE, status.getModificationTime().toString,
status.getLen.toString, LocalResourceVisibility.PRIVATE.name())
// SPARK-16080: Make sure to use the correct name for the destination when distributing the
// conf archive to executors.
val destUri = new URI(uri.getScheme(), uri.getRawSchemeSpecificPart(),
Client.LOCALIZED_CONF_DIR)
setupDistributedCache(destUri.toString(), LocalResourceType.ARCHIVE,
status.getModificationTime().toString, status.getLen.toString,
LocalResourceVisibility.PRIVATE.name())
}
// Clean up the configuration so it doesn't show up in the Web UI (since it's really noisy).
......
......@@ -292,6 +292,14 @@ private object YarnClusterDriver extends Logging with Matchers {
sc.stop()
}
// Verify that the config archive is correctly placed in the classpath of all containers.
val confFile = "/" + Client.SPARK_CONF_FILE
assert(getClass().getResource(confFile) != null)
val configFromExecutors = sc.parallelize(1 to 4, 4)
.map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull }
.collect()
assert(configFromExecutors.find(_ == null) === None)
// verify log urls are present
val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
assert(listeners.size === 1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment