Skip to content
Snippets Groups Projects
Commit dba31402 authored by Xiangrui Meng's avatar Xiangrui Meng Committed by Tathagata Das
Browse files

[SPARK-1870] Make spark-submit --jars work in yarn-cluster mode.

Sent secondary jars to distributed cache of all containers and add the cached jars to classpath before executors start. Tested on a YARN cluster (CDH-5.0).

`spark-submit --jars` also works in standalone server and `yarn-client`. Thanks for @andrewor14 for testing!

I removed "Doesn't work for drivers in standalone mode with "cluster" deploy mode." from `spark-submit`'s help message, though we haven't tested mesos yet.

CC: @dbtsai @sryza

Author: Xiangrui Meng <meng@databricks.com>

Closes #848 from mengxr/yarn-classpath and squashes the following commits:

23e7df4 [Xiangrui Meng] rename spark.jar to __spark__.jar and app.jar to __app__.jar to avoid confliction apped $CWD/ and $CWD/* to the classpath remove unused methods
a40f6ed [Xiangrui Meng] standalone -> cluster
65e04ad [Xiangrui Meng] update spark-submit help message and add a comment for yarn-client
11e5354 [Xiangrui Meng] minor changes
3e7e1c4 [Xiangrui Meng] use sparkConf instead of hadoop conf
dc3c825 [Xiangrui Meng] add secondary jars to classpath in yarn
parent 2a948e7e
No related branches found
No related tags found
No related merge requests found
......@@ -326,8 +326,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| --class CLASS_NAME Your application's main class (for Java / Scala apps).
| --name NAME A name of your application.
| --jars JARS Comma-separated list of local jars to include on the driver
| and executor classpaths. Doesn't work for drivers in
| standalone mode with "cluster" deploy mode.
| and executor classpaths.
| --py-files PY_FILES Comma-separated list of .zip or .egg files to place on the
| PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
......
......@@ -44,7 +44,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext}
* Client submits an application to the YARN ResourceManager.
*
* Depending on the deployment mode this will launch one of two application master classes:
* 1. In standalone mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
* 1. In cluster mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
* which launches a driver program inside of the cluster.
* 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]] to
* request executors on behalf of a driver running outside of the cluster.
......@@ -220,10 +220,11 @@ trait ClientBase extends Logging {
}
}
var cachedSecondaryJarLinks = ListBuffer.empty[String]
val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
(args.files, LocalResourceType.FILE, false),
(args.archives, LocalResourceType.ARCHIVE, false) )
fileLists.foreach { case (flist, resType, appMasterOnly) =>
fileLists.foreach { case (flist, resType, addToClasspath) =>
if (flist != null && !flist.isEmpty()) {
flist.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
......@@ -232,11 +233,15 @@ trait ClientBase extends Logging {
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, resType,
linkname, statCache, appMasterOnly)
linkname, statCache)
if (addToClasspath) {
cachedSecondaryJarLinks += linkname
}
}
}
}
}
sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
UserGroupInformation.getCurrentUser().addCredentials(credentials)
localResources
......@@ -374,11 +379,12 @@ trait ClientBase extends Logging {
}
object ClientBase {
val SPARK_JAR: String = "spark.jar"
val APP_JAR: String = "app.jar"
val SPARK_JAR: String = "__spark__.jar"
val APP_JAR: String = "__app__.jar"
val LOG4J_PROP: String = "log4j.properties"
val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
val LOCAL_SCHEME = "local"
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head)
......@@ -479,66 +485,25 @@ object ClientBase {
extraClassPath.foreach(addClasspathEntry)
addClasspathEntry(Environment.PWD.$())
val cachedSecondaryJarLinks =
sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
// Normally the users app.jar is last in case conflicts with spark jars
if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
addPwdClasspathEntry(APP_JAR)
cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
addPwdClasspathEntry(SPARK_JAR)
ClientBase.populateHadoopClasspath(conf, env)
} else {
addPwdClasspathEntry(SPARK_JAR)
ClientBase.populateHadoopClasspath(conf, env)
addPwdClasspathEntry(APP_JAR)
cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
}
// Append all class files and jar files under the working directory to the classpath.
addClasspathEntry(Environment.PWD.$())
addPwdClasspathEntry("*")
}
/**
* Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
* to the classpath.
*/
private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = {
if (args != null) {
addClasspathEntry(args.userJar, APP_JAR, env)
}
if (args != null && args.addJars != null) {
args.addJars.split(",").foreach { case file: String =>
addClasspathEntry(file, null, env)
}
}
}
/**
* Adds the given path to the classpath, handling "local:" URIs correctly.
*
* If an alternate name for the file is given, and it's not a "local:" file, the alternate
* name will be added to the classpath (relative to the job's work directory).
*
* If not a "local:" file and no alternate name, the environment is not modified.
*
* @param path Path to add to classpath (optional).
* @param fileName Alternate name for the file (optional).
* @param env Map holding the environment variables.
*/
private def addClasspathEntry(path: String, fileName: String,
env: HashMap[String, String]) : Unit = {
if (path != null) {
scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
val localPath = getLocalPath(path)
if (localPath != null) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath,
File.pathSeparator)
return
}
}
}
if (fileName != null) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator);
}
}
/**
* Returns the local path if the URI is a "local:" URI, or null otherwise.
*/
......
......@@ -52,7 +52,7 @@ private[spark] class YarnClientSchedulerBackend(
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
"--class", "notused",
"--jar", null,
"--jar", null, // The primary jar will be added dynamically in SparkContext.
"--args", hostport,
"--am-class", classOf[ExecutorLauncher].getName
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment