Skip to content
Snippets Groups Projects
Commit 601a237b authored by Marcelo Vanzin's avatar Marcelo Vanzin
Browse files

[SPARK-9825][YARN] Do not overwrite final Hadoop config entries.

When localizing the gateway config files in a YARN application, avoid
overwriting final configs by distributing the gateway files to a separate
directory, and explicitly loading them into the Hadoop config, instead
of placing those files before the cluster's files in the classpath.

This is done by saving the gateway's config to a separate XML file
distributed with the rest of the Spark app's config, and loading that
file when creating a new config through `YarnSparkHadoopUtil`.

Tested with existing unit tests, and by verifying the behavior in a YARN
cluster (final values are not overridden, non-final values are).

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #18370 from vanzin/SPARK-9825.
parent cb8d5cc9
No related branches found
No related tags found
No related merge requests found
......@@ -683,21 +683,6 @@ private[spark] class Client(
private def createConfArchive(): File = {
val hadoopConfFiles = new HashMap[String, File]()
// Uploading $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that
// the executors will use the latest configurations instead of the default values. This is
// required when user changes log4j.properties directly to set the log configurations. If
// configuration file is provided through --files then executors will be taking configurations
// from --files instead of $SPARK_CONF_DIR/log4j.properties.
// Also uploading metrics.properties to distributed cache if exists in classpath.
// If user specify this file using --files then executors will use the one
// from --files instead.
for { prop <- Seq("log4j.properties", "metrics.properties")
url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop))
if url.getProtocol == "file" } {
hadoopConfFiles(prop) = new File(url.getPath)
}
Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
sys.env.get(envKey).foreach { path =>
val dir = new File(path)
......@@ -722,14 +707,43 @@ private[spark] class Client(
try {
confStream.setLevel(0)
// Upload $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that
// the executors will use the latest configurations instead of the default values. This is
// required when user changes log4j.properties directly to set the log configurations. If
// configuration file is provided through --files then executors will be taking configurations
// from --files instead of $SPARK_CONF_DIR/log4j.properties.
// Also upload metrics.properties to distributed cache if exists in classpath.
// If user specify this file using --files then executors will use the one
// from --files instead.
for { prop <- Seq("log4j.properties", "metrics.properties")
url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop))
if url.getProtocol == "file" } {
val file = new File(url.getPath())
confStream.putNextEntry(new ZipEntry(file.getName()))
Files.copy(file, confStream)
confStream.closeEntry()
}
// Save the Hadoop config files under a separate directory in the archive. This directory
// is appended to the classpath so that the cluster-provided configuration takes precedence.
confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/"))
confStream.closeEntry()
hadoopConfFiles.foreach { case (name, file) =>
if (file.canRead()) {
confStream.putNextEntry(new ZipEntry(name))
confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/$name"))
Files.copy(file, confStream)
confStream.closeEntry()
}
}
// Save the YARN configuration into a separate file that will be overlayed on top of the
// cluster's Hadoop conf.
confStream.putNextEntry(new ZipEntry(SPARK_HADOOP_CONF_FILE))
yarnConf.writeXml(confStream)
confStream.closeEntry()
// Save Spark configuration to a file in the archive.
val props = new Properties()
sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) }
......@@ -1196,12 +1210,19 @@ private object Client extends Logging {
// Subdirectory where the user's Spark and Hadoop config files will be placed.
val LOCALIZED_CONF_DIR = "__spark_conf__"
// Subdirectory in the conf directory containing Hadoop config files.
val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__"
// File containing the conf archive in the AM. See prepareLocalResources().
val LOCALIZED_CONF_ARCHIVE = LOCALIZED_CONF_DIR + ".zip"
// Name of the file in the conf archive containing Spark configuration.
val SPARK_CONF_FILE = "__spark_conf__.properties"
// Name of the file containing the gateway's Hadoop configuration, to be overlayed on top of the
// cluster's Hadoop config.
val SPARK_HADOOP_CONF_FILE = "__spark_hadoop_conf__.xml"
// Subdirectory where the user's python files (not archives) will be placed.
val LOCALIZED_PYTHON_DIR = "__pyfiles__"
......@@ -1307,6 +1328,12 @@ private object Client extends Logging {
sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
addClasspathEntry(getClusterPath(sparkConf, cp), env)
}
// Add the localized Hadoop config at the end of the classpath, in case it contains other
// files (such as configuration files for different services) that are not part of the
// YARN cluster's config.
addClasspathEntry(
buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, LOCALIZED_HADOOP_CONF_DIR), env)
}
/**
......
......@@ -61,8 +61,11 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
// Return an appropriate (subclass) of Configuration. Creating a config initializes some Hadoop
// subsystems. Always create a new config, don't reuse yarnConf.
override def newConfiguration(conf: SparkConf): Configuration =
new YarnConfiguration(super.newConfiguration(conf))
override def newConfiguration(conf: SparkConf): Configuration = {
val hadoopConf = new YarnConfiguration(super.newConfiguration(conf))
hadoopConf.addResource(Client.SPARK_HADOOP_CONF_FILE)
hadoopConf
}
// Add any user credentials to the job conf which are necessary for running on a secure Hadoop
// cluster
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment