diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 66c226e4913621e11fd11a63898d026d35fa2f5e..1647d904a22474094ed22ae6e02017120f16748e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -677,10 +677,10 @@ class SparkContext( key = uri.getScheme match { // A JAR file which exists only on the driver node case null | "file" => - if (SparkHadoopUtil.get.isYarnMode()) { - // In order for this to work on yarn the user must specify the --addjars option to - // the client to upload the file into the distributed cache to make it show up in the - // current working directory. + if (SparkHadoopUtil.get.isYarnMode() && master == "yarn-standalone") { + // In order for this to work in yarn standalone mode the user must specify the + // --addjars option to the client to upload the file into the distributed cache + // of the AM to make it show up in the current working directory. val fileName = new Path(uri.getPath).getName() try { env.httpFileServer.addJar(new File(fileName)) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index b20627010798a9fe8461a661c281f12847593eab..3bd62646bab060eba316406fa9a1c7a4983a31b1 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -101,7 +101,19 @@ With this mode, your application is actually run on the remote machine where the With yarn-client mode, the application will be launched locally. Just like running application or spark-shell on Local / Mesos / Standalone mode. The launch method is also the similar with them, just make sure that when you need to specify a master url, use "yarn-client" instead. And you also need to export the env value for SPARK_JAR and SPARK_YARN_APP_JAR -In order to tune worker core/number/memory etc. You need to export SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_WORKER_INSTANCES e.g. by ./conf/spark-env.sh +Configuration in yarn-client mode: + +In order to tune worker core/number/memory etc. You need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options. + +* `SPARK_YARN_APP_JAR`, Path to your application's JAR file (required) +* `SPARK_WORKER_INSTANCES`, Number of workers to start (Default: 2) +* `SPARK_WORKER_CORES`, Number of cores for the workers (Default: 1). +* `SPARK_WORKER_MEMORY`, Memory per Worker (e.g. 1000M, 2G) (Default: 1G) +* `SPARK_MASTER_MEMORY`, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb) +* `SPARK_YARN_APP_NAME`, The name of your application (Default: Spark) +* `SPARK_YARN_QUEUE`, The hadoop queue to use for allocation requests (Default: 'default') +* `SPARK_YARN_DIST_FILES`, Comma separated list of files to be distributed with the job. +* `SPARK_YARN_DIST_ARCHIVES`, Comma separated list of archives to be distributed with the job. For example: @@ -114,7 +126,6 @@ For example: SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \ MASTER=yarn-client ./bin/spark-shell -You can also send extra files to yarn cluster for worker to use by exporting SPARK_YARN_DIST_FILES=file1,file2... etc. # Building Spark for Hadoop/YARN 2.2.x diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 2bb11e54c549af037fc75a2dd92300e3588828b4..2e46d750c4a3801f5d4cc381dcd2f8dd759561a3 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -127,14 +127,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // local dirs, so lets check both. We assume one of the 2 is set. // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) - .getOrElse(Option(System.getenv("LOCAL_DIRS")) - .getOrElse("")) - - if (localDirs.isEmpty()) { - throw new Exception("Yarn Local dirs can't be empty") + .orElse(Option(System.getenv("LOCAL_DIRS"))) + + localDirs match { + case None => throw new Exception("Yarn Local dirs can't be empty") + case Some(l) => l } - localDirs - } + } private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index ddfec1a4ac6728e8dd0030a6b5fcf164a1dff42c..62b20b8fbaf19da6cc13d2eb531e4ec78aa6f199 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -76,6 +76,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar def run() { + // Setup the directories so things go to yarn approved directories rather + // then user specified and /tmp. + System.setProperty("spark.local.dir", getLocalDirs()) + appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() @@ -103,10 +107,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - // must be <= timeoutInterval/ 2. - // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. - // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L)) + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + // must be <= timeoutInterval / 2. + val interval = math.min(timeoutInterval / 2, schedulerInterval) + reporterThread = launchReporterThread(interval) // Wait for the reporter thread to Finish. @@ -119,6 +125,20 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar System.exit(0) } + /** Get the Yarn approved local directories. */ + private def getLocalDirs(): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + .orElse(Option(System.getenv("LOCAL_DIRS"))) + + localDirs match { + case None => throw new Exception("Yarn Local dirs can't be empty") + case Some(l) => l + } + } + private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV) diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 4b1b5da048df4d801dacb24f7df18245e98735ac..22e55e0c60647978d4543d14f78444fdfa0d2e8d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -22,6 +22,8 @@ import org.apache.spark.{SparkException, Logging, SparkContext} import org.apache.spark.deploy.yarn.{Client, ClientArguments} import org.apache.spark.scheduler.TaskSchedulerImpl +import scala.collection.mutable.ArrayBuffer + private[spark] class YarnClientSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) @@ -31,45 +33,47 @@ private[spark] class YarnClientSchedulerBackend( var client: Client = null var appId: ApplicationId = null + private[spark] def addArg(optionName: String, optionalParam: String, arrayBuf: ArrayBuffer[String]) { + Option(System.getenv(optionalParam)) foreach { + optParam => { + arrayBuf += (optionName, optParam) + } + } + } + override def start() { super.start() - val defalutWorkerCores = "2" - val defalutWorkerMemory = "512m" - val defaultWorkerNumber = "1" - val userJar = System.getenv("SPARK_YARN_APP_JAR") - val distFiles = System.getenv("SPARK_YARN_DIST_FILES") - var workerCores = System.getenv("SPARK_WORKER_CORES") - var workerMemory = System.getenv("SPARK_WORKER_MEMORY") - var workerNumber = System.getenv("SPARK_WORKER_INSTANCES") - if (userJar == null) throw new SparkException("env SPARK_YARN_APP_JAR is not set") - if (workerCores == null) - workerCores = defalutWorkerCores - if (workerMemory == null) - workerMemory = defalutWorkerMemory - if (workerNumber == null) - workerNumber = defaultWorkerNumber - val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort - val argsArray = Array[String]( + val argsArrayBuf = new ArrayBuffer[String]() + argsArrayBuf += ( "--class", "notused", "--jar", userJar, "--args", hostport, - "--worker-memory", workerMemory, - "--worker-cores", workerCores, - "--num-workers", workerNumber, - "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher", - "--files", distFiles + "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher" ) - val args = new ClientArguments(argsArray, conf) + // process any optional arguments, use the defaults already defined in ClientArguments + // if things aren't specified + Map("--master-memory" -> "SPARK_MASTER_MEMORY", + "--num-workers" -> "SPARK_WORKER_INSTANCES", + "--worker-memory" -> "SPARK_WORKER_MEMORY", + "--worker-cores" -> "SPARK_WORKER_CORES", + "--queue" -> "SPARK_YARN_QUEUE", + "--name" -> "SPARK_YARN_APP_NAME", + "--files" -> "SPARK_YARN_DIST_FILES", + "--archives" -> "SPARK_YARN_DIST_ARCHIVES") + .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) } + + logDebug("ClientArguments called with: " + argsArrayBuf) + val args = new ClientArguments(argsArrayBuf.toArray, conf) client = new Client(args, conf) appId = client.runApp() waitForApp() diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 69ae14ce8385cfb3b978481d90b2681d02a2ae80..4b777d5fa7a283e78744923dce484b13b1cc1431 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -116,14 +116,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // local dirs, so lets check both. We assume one of the 2 is set. // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) - .getOrElse(Option(System.getenv("LOCAL_DIRS")) - .getOrElse("")) - - if (localDirs.isEmpty()) { - throw new Exception("Yarn Local dirs can't be empty") + .orElse(Option(System.getenv("LOCAL_DIRS"))) + + localDirs match { + case None => throw new Exception("Yarn Local dirs can't be empty") + case Some(l) => l } - localDirs - } + } private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index be323d77835a8892eb3e481eba83f31f7dc3e8b9..952e963389c0aa7f1b9e3a3a4557ff4736920964 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -99,6 +99,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) appContext.setApplicationName(args.appName) appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) + appContext.setApplicationType("SPARK") // Memory for the ApplicationMaster. val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 49248a8516b9cef6941524500965354d9ac30d08..78353224fa4b8b51aea9ba6d56ab04c2e1a479cd 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -78,6 +78,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar def run() { + // Setup the directories so things go to yarn approved directories rather + // then user specified and /tmp. + System.setProperty("spark.local.dir", getLocalDirs()) + amClient = AMRMClient.createAMRMClient() amClient.init(yarnConf) amClient.start() @@ -94,10 +98,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - // must be <= timeoutInterval/ 2. - // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. - // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L)) + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + // must be <= timeoutInterval / 2. + val interval = math.min(timeoutInterval / 2, schedulerInterval) + reporterThread = launchReporterThread(interval) // Wait for the reporter thread to Finish. @@ -110,6 +116,20 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar System.exit(0) } + /** Get the Yarn approved local directories. */ + private def getLocalDirs(): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + .orElse(Option(System.getenv("LOCAL_DIRS"))) + + localDirs match { + case None => throw new Exception("Yarn Local dirs can't be empty") + case Some(l) => l + } + } + private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())