diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 7b29c1ae4d888ee29536bb16b7f6c149b364e850..f0f13a16e091ddfe04eee33c1b6dcc2337fedaf5 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -351,14 +351,6 @@ private[spark] class Client( val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - val oldLog4jConf = Option(System.getenv("SPARK_LOG4J_CONF")) - if (oldLog4jConf.isDefined) { - logWarning( - "SPARK_LOG4J_CONF detected in the system environment. This variable has been " + - "deprecated. Please refer to the \"Launching Spark on YARN\" documentation " + - "for alternatives.") - } - def addDistributedUri(uri: URI): Boolean = { val uriStr = uri.toString() if (distributedUris.contains(uriStr)) { @@ -479,25 +471,16 @@ private[spark] class Client( } /** - * Copy a few resources to the distributed cache if their scheme is not "local". + * Copy user jar to the distributed cache if their scheme is not "local". * Otherwise, set the corresponding key in our SparkConf to handle it downstream. - * Each resource is represented by a 3-tuple of: - * (1) destination resource name, - * (2) local path to the resource, - * (3) Spark property key to set if the scheme is not local */ - List( - (APP_JAR_NAME, args.userJar, APP_JAR), - ("log4j.properties", oldLog4jConf.orNull, null) - ).foreach { case (destName, path, confKey) => - if (path != null && !path.trim().isEmpty()) { - val (isLocal, localizedPath) = distribute(path, destName = Some(destName)) - if (isLocal && confKey != null) { - require(localizedPath != null, s"Path $path already distributed.") - // If the resource is intended for local use only, handle this downstream - // by setting the appropriate property - sparkConf.set(confKey, localizedPath) - } + Option(args.userJar).filter(_.trim.nonEmpty).foreach { jar => + val (isLocal, localizedPath) = distribute(jar, destName = Some(APP_JAR_NAME)) + if (isLocal) { + require(localizedPath != null, s"Path $jar already distributed") + // If the resource is intended for local use only, handle this downstream + // by setting the appropriate property + sparkConf.set(APP_JAR, localizedPath) } } @@ -541,11 +524,10 @@ private[spark] class Client( distribute(f, targetDir = targetDir) } - // Distribute an archive with Hadoop and Spark configuration for the AM. + // Distribute an archive with Hadoop and Spark configuration for the AM and executors. val (_, confLocalizedPath) = distribute(createConfArchive().toURI().getPath(), resType = LocalResourceType.ARCHIVE, - destName = Some(LOCALIZED_CONF_DIR), - appMasterOnly = true) + destName = Some(LOCALIZED_CONF_DIR)) require(confLocalizedPath != null) localResources @@ -554,10 +536,10 @@ private[spark] class Client( /** * Create an archive with the config files for distribution. * - * These are only used by the AM, since executors will use the configuration object broadcast by - * the driver. The files are zipped and added to the job as an archive, so that YARN will explode - * it when distributing to the AM. This directory is then added to the classpath of the AM - * process, just to make sure that everybody is using the same default config. + * These will be used by AM and executors. The files are zipped and added to the job as an + * archive, so that YARN will explode it when distributing to AM and executors. This directory + * is then added to the classpath of AM and executor process, just to make sure that everybody + * is using the same default config. * * This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR * shows up in the classpath before YARN_CONF_DIR. @@ -576,11 +558,14 @@ private[spark] class Client( // required when user changes log4j.properties directly to set the log configurations. If // configuration file is provided through --files then executors will be taking configurations // from --files instead of $SPARK_CONF_DIR/log4j.properties. - val log4jFileName = "log4j.properties" - Option(Utils.getContextOrSparkClassLoader.getResource(log4jFileName)).foreach { url => - if (url.getProtocol == "file") { - hadoopConfFiles(log4jFileName) = new File(url.getPath) - } + + // Also uploading metrics.properties to distributed cache if exists in classpath. + // If user specify this file using --files then executors will use the one + // from --files instead. + for { prop <- Seq("log4j.properties", "metrics.properties") + url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop)) + if url.getProtocol == "file" } { + hadoopConfFiles(prop) = new File(url.getPath) } Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey => @@ -659,7 +644,7 @@ private[spark] class Client( pySparkArchives: Seq[String]): HashMap[String, String] = { logInfo("Setting up the launch environment for our AM container") val env = new HashMap[String, String]() - populateClasspath(args, yarnConf, sparkConf, env, true, sparkConf.get(DRIVER_CLASS_PATH)) + populateClasspath(args, yarnConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH)) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() @@ -1236,18 +1221,16 @@ object Client extends Logging { conf: Configuration, sparkConf: SparkConf, env: HashMap[String, String], - isAM: Boolean, extraClassPath: Option[String] = None): Unit = { extraClassPath.foreach { cp => addClasspathEntry(getClusterPath(sparkConf, cp), env) } + addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env) - if (isAM) { - addClasspathEntry( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + - LOCALIZED_CONF_DIR, env) - } + addClasspathEntry( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + + LOCALIZED_CONF_DIR, env) if (sparkConf.get(USER_CLASS_PATH_FIRST)) { // in order to properly add the app jar when user classpath is first diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index f956a4d1d5953c03113bbdb6cb62f768a118fc28..7b55d781f86e8772bee716833ecf3889362d2c79 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -289,8 +289,7 @@ private[yarn] class ExecutorRunnable( private def prepareEnvironment(container: Container): HashMap[String, String] = { val env = new HashMap[String, String]() - Client.populateClasspath(null, yarnConf, sparkConf, env, false, - sparkConf.get(EXECUTOR_CLASS_PATH)) + Client.populateClasspath(null, yarnConf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH)) sparkConf.getExecutorEnv.foreach { case (key, value) => // This assumes each executor environment variable set here is a path diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index e3613a93ed05f79669fb0ea730e928dd53ccea04..64723c361c0aba8567dd47777c61c511d05548d0 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -121,7 +121,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val env = new MutableHashMap[String, String]() val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - populateClasspath(args, conf, sparkConf, env, true) + populateClasspath(args, conf, sparkConf, env) val cp = env("CLASSPATH").split(":|;|<CPS>") s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => @@ -178,8 +178,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll "/remotePath/1:/remotePath/2") val env = new MutableHashMap[String, String]() - populateClasspath(null, conf, sparkConf, env, false, - extraClassPath = Some("/localPath/my1.jar")) + populateClasspath(null, conf, sparkConf, env, extraClassPath = Some("/localPath/my1.jar")) val cp = classpath(env) cp should contain ("/remotePath/spark.jar") cp should contain ("/remotePath/my1.jar") @@ -356,7 +355,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll private def classpath(client: Client): Array[String] = { val env = new MutableHashMap[String, String]() - populateClasspath(null, client.hadoopConf, client.sparkConf, env, false) + populateClasspath(null, client.hadoopConf, client.sparkConf, env) classpath(env) }