Skip to content
Snippets Groups Projects
Commit ab3cefde authored by Raymond Liu's avatar Raymond Liu
Browse files

Add YarnClientClusterScheduler and Backend.

With this scheduler, the user application is launched locally,
While the executor will be launched by YARN on remote nodes.

This enables spark-shell to run upon YARN.
parent 2fead510
No related branches found
No related tags found
No related merge requests found
...@@ -226,6 +226,31 @@ class SparkContext( ...@@ -226,6 +226,31 @@ class SparkContext(
scheduler.initialize(backend) scheduler.initialize(backend)
scheduler scheduler
case "yarn-client" =>
val scheduler = try {
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(this).asInstanceOf[ClusterScheduler]
} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
}
}
val backend = try {
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext])
cons.newInstance(scheduler, this).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
}
}
scheduler.initialize(backend)
scheduler
case MESOS_REGEX(mesosUrl) => case MESOS_REGEX(mesosUrl) =>
MesosNativeLibrary.load() MesosNativeLibrary.load()
val scheduler = new ClusterScheduler(this) val scheduler = new ClusterScheduler(this)
......
...@@ -45,6 +45,10 @@ System Properties: ...@@ -45,6 +45,10 @@ System Properties:
Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster. Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster.
This would be used to connect to the cluster, write to the dfs and submit jobs to the resource manager. This would be used to connect to the cluster, write to the dfs and submit jobs to the resource manager.
There are two scheduler mode that can be used to launch spark application on YARN.
## Launch spark application by YARN Client with yarn-standalone mode.
The command to launch the YARN Client is as follows: The command to launch the YARN Client is as follows:
SPARK_JAR=<SPARK_ASSEMBLY_JAR_FILE> ./spark-class org.apache.spark.deploy.yarn.Client \ SPARK_JAR=<SPARK_ASSEMBLY_JAR_FILE> ./spark-class org.apache.spark.deploy.yarn.Client \
...@@ -52,6 +56,7 @@ The command to launch the YARN Client is as follows: ...@@ -52,6 +56,7 @@ The command to launch the YARN Client is as follows:
--class <APP_MAIN_CLASS> \ --class <APP_MAIN_CLASS> \
--args <APP_MAIN_ARGUMENTS> \ --args <APP_MAIN_ARGUMENTS> \
--num-workers <NUMBER_OF_WORKER_MACHINES> \ --num-workers <NUMBER_OF_WORKER_MACHINES> \
--master-class <ApplicationMaster_CLASS>
--master-memory <MEMORY_FOR_MASTER> \ --master-memory <MEMORY_FOR_MASTER> \
--worker-memory <MEMORY_PER_WORKER> \ --worker-memory <MEMORY_PER_WORKER> \
--worker-cores <CORES_PER_WORKER> \ --worker-cores <CORES_PER_WORKER> \
...@@ -85,11 +90,29 @@ For example: ...@@ -85,11 +90,29 @@ For example:
$ cat $YARN_APP_LOGS_DIR/$YARN_APP_ID/container*_000001/stdout $ cat $YARN_APP_LOGS_DIR/$YARN_APP_ID/container*_000001/stdout
Pi is roughly 3.13794 Pi is roughly 3.13794
The above starts a YARN Client programs which periodically polls the Application Master for status updates and displays them in the console. The client will exit once your application has finished running. The above starts a YARN Client programs which start the default Application Master. Then SparkPi will be run as a child thread of Application Master, YARN Client will periodically polls the Application Master for status updates and displays them in the console. The client will exit once your application has finished running.
With this mode, your application is actually run on the remote machine where the Application Master is run upon. Thus application that involve local interaction will not work well, e.g. spark-shell.
## Launch spark application with yarn-client mode.
With yarn-client mode, the application will be launched locally. Just like running application or spark-shell on Local / Mesos / Standalone mode. The launch method is also the similar with them, just make sure that when you need to specify a master url, use "yarn-client" instead. And you also need to export the env value for SPARK_JAR and SPARK_YARN_APP_JAR
In order to tune worker core/number/memory etc. You need to export SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_WORKER_INSTANCES e.g. by ./conf/spark-env.sh
For example:
SPARK_JAR=./assembly/target/scala-{{site.SCALA_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop2.0.5-alpha.jar \
SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
./run-example org.apache.spark.examples.SparkPi yarn-client
SPARK_JAR=./assembly/target/scala-{{site.SCALA_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop2.0.5-alpha.jar \
SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
MASTER=yarn-client ./spark-shell
# Important Notes # Important Notes
- When your application instantiates a Spark context it must use a special "yarn-standalone" master url. This starts the scheduler without forcing it to connect to a cluster. A good way to handle this is to pass "yarn-standalone" as an argument to your program, as shown in the example above.
- We do not requesting container resources based on the number of cores. Thus the numbers of cores given via command line arguments cannot be guaranteed. - We do not requesting container resources based on the number of cores. Thus the numbers of cores given via command line arguments cannot be guaranteed.
- The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored. - The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.
- The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN. - The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN.
......
...@@ -54,9 +54,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl ...@@ -54,9 +54,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// staging directory is private! -> rwx-------- // staging directory is private! -> rwx--------
val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short) val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
// app files are world-wide readable and owner writable -> rw-r--r-- // app files are world-wide readable and owner writable -> rw-r--r--
val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short) val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
def run() { // for client user who want to monitor app status by itself.
def runApp() = {
validateArgs() validateArgs()
init(yarnConf) init(yarnConf)
...@@ -78,7 +79,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl ...@@ -78,7 +79,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName()) appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
submitApp(appContext) submitApp(appContext)
appId
}
def run() {
val appId = runApp()
monitorApplication(appId) monitorApplication(appId)
System.exit(0) System.exit(0)
} }
...@@ -372,7 +377,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl ...@@ -372,7 +377,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val commands = List[String](javaCommand + val commands = List[String](javaCommand +
" -server " + " -server " +
JAVA_OPTS + JAVA_OPTS +
" org.apache.spark.deploy.yarn.ApplicationMaster" + " " + args.amClass +
" --class " + args.userClass + " --class " + args.userClass +
" --jar " + args.userJar + " --jar " + args.userJar +
userArgsToString(args) + userArgsToString(args) +
......
...@@ -35,6 +35,7 @@ class ClientArguments(val args: Array[String]) { ...@@ -35,6 +35,7 @@ class ClientArguments(val args: Array[String]) {
var numWorkers = 2 var numWorkers = 2
var amQueue = System.getProperty("QUEUE", "default") var amQueue = System.getProperty("QUEUE", "default")
var amMemory: Int = 512 var amMemory: Int = 512
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
var appName: String = "Spark" var appName: String = "Spark"
// TODO // TODO
var inputFormatInfo: List[InputFormatInfo] = null var inputFormatInfo: List[InputFormatInfo] = null
...@@ -62,18 +63,22 @@ class ClientArguments(val args: Array[String]) { ...@@ -62,18 +63,22 @@ class ClientArguments(val args: Array[String]) {
userArgsBuffer += value userArgsBuffer += value
args = tail args = tail
case ("--master-memory") :: MemoryParam(value) :: tail => case ("--master-class") :: value :: tail =>
amMemory = value amClass = value
args = tail args = tail
case ("--num-workers") :: IntParam(value) :: tail => case ("--master-memory") :: MemoryParam(value) :: tail =>
numWorkers = value amMemory = value
args = tail args = tail
case ("--worker-memory") :: MemoryParam(value) :: tail => case ("--worker-memory") :: MemoryParam(value) :: tail =>
workerMemory = value workerMemory = value
args = tail args = tail
case ("--num-workers") :: IntParam(value) :: tail =>
numWorkers = value
args = tail
case ("--worker-cores") :: IntParam(value) :: tail => case ("--worker-cores") :: IntParam(value) :: tail =>
workerCores = value workerCores = value
args = tail args = tail
...@@ -119,19 +124,20 @@ class ClientArguments(val args: Array[String]) { ...@@ -119,19 +124,20 @@ class ClientArguments(val args: Array[String]) {
System.err.println( System.err.println(
"Usage: org.apache.spark.deploy.yarn.Client [options] \n" + "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
"Options:\n" + "Options:\n" +
" --jar JAR_PATH Path to your application's JAR file (required)\n" + " --jar JAR_PATH Path to your application's JAR file (required)\n" +
" --class CLASS_NAME Name of your application's main class (required)\n" + " --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" + " --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" + " Mutliple invocations are possible, each will be passed in order.\n" +
" --num-workers NUM Number of workers to start (Default: 2)\n" + " --num-workers NUM Number of workers to start (Default: 2)\n" +
" --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --name NAME The name of your application (Default: Spark)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
" --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" + " --name NAME The name of your application (Default: Spark)\n" +
" --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
" --files files Comma separated list of files to be distributed with the job.\n" + " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
" --archives archives Comma separated list of archives to be distributed with the job." " --files files Comma separated list of files to be distributed with the job.\n" +
" --archives archives Comma separated list of archives to be distributed with the job."
) )
System.exit(exitCode) System.exit(exitCode)
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn
import java.net.Socket
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
import akka.remote.RemoteClientShutdown
import akka.actor.Terminated
import akka.remote.RemoteClientDisconnected
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
def this(args: ApplicationMasterArguments) = this(args, new Configuration())
private val rpc: YarnRPC = YarnRPC.create(conf)
private var resourceManager: AMRMProtocol = null
private var appAttemptId: ApplicationAttemptId = null
private var reporterThread: Thread = null
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = null
private var driverClosed:Boolean = false
val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0)._1
var actor: ActorRef = null
// This actor just working as a monitor to watch on Driver Actor.
class MonitorActor(driverUrl: String) extends Actor {
var driver: ActorRef = null
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
driver = context.actorFor(driverUrl)
driver ! "hello"
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(driver) // Doesn't work with remote actors, but useful for testing
}
override def receive = {
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
logInfo("Driver terminated or disconnected! Shutting down.")
driverClosed = true
}
}
def run() {
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
// Compute number of threads for akka
val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
if (minimumMemory > 0) {
val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
if (numCore > 0) {
// do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
// TODO: Uncomment when hadoop is on a version which has this fixed.
// args.workerCores = numCore
}
}
waitForSparkMaster()
// Allocate all containers
allocateWorkers()
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
// must be <= timeoutInterval/ 2.
// On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
// so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
reporterThread = launchReporterThread(interval)
// Wait for the reporter thread to Finish.
reporterThread.join()
finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
actorSystem.shutdown()
logInfo("Exited")
System.exit(0)
}
private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
val containerId = ConverterUtils.toContainerId(containerIdString)
val appAttemptId = containerId.getApplicationAttemptId()
logInfo("ApplicationAttemptId: " + appAttemptId)
return appAttemptId
}
private def registerWithResourceManager(): AMRMProtocol = {
val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
logInfo("Connecting to ResourceManager at " + rmAddress)
return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
}
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
logInfo("Registering the ApplicationMaster")
val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
.asInstanceOf[RegisterApplicationMasterRequest]
appMasterRequest.setApplicationAttemptId(appAttemptId)
// Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
// Users can then monitor stderr/stdout on that node if required.
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
// What do we provide here ? Might make sense to expose something sensible later ?
appMasterRequest.setTrackingUrl("")
return resourceManager.registerApplicationMaster(appMasterRequest)
}
private def waitForSparkMaster() {
logInfo("Waiting for spark driver to be reachable.")
var driverUp = false
val hostport = args.userArgs(0)
val (driverHost, driverPort) = Utils.parseHostPort(hostport)
while(!driverUp) {
try {
val socket = new Socket(driverHost, driverPort)
socket.close()
logInfo("Master now available: " + driverHost + ":" + driverPort)
driverUp = true
} catch {
case e: Exception =>
logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
Thread.sleep(100)
}
}
System.setProperty("spark.driver.host", driverHost)
System.setProperty("spark.driver.port", driverPort.toString)
val driverUrl = "akka://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
}
private def allocateWorkers() {
// Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map()
yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, preferredNodeLocationData)
logInfo("Allocating " + args.numWorkers + " workers.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
Thread.sleep(100)
}
logInfo("All workers have launched.")
}
// TODO: We might want to extend this to allocate more containers in case they die !
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
val t = new Thread {
override def run() {
while (!driverClosed) {
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
yarnAllocator.allocateContainers(missingWorkerCount)
}
else sendProgress()
Thread.sleep(sleepTime)
}
}
}
// setting to daemon status, though this is usually not a good idea.
t.setDaemon(true)
t.start()
logInfo("Started progress reporter thread - sleep time : " + sleepTime)
return t
}
private def sendProgress() {
logDebug("Sending progress")
// simulated with an allocate request with no nodes requested ...
yarnAllocator.allocateContainers(0)
}
def finishApplicationMaster(status: FinalApplicationStatus) {
logInfo("finish ApplicationMaster with " + status)
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
resourceManager.finishApplicationMaster(finishReq)
}
}
object WorkerLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
new WorkerLauncher(args).run()
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
import org.apache.spark._
import org.apache.hadoop.conf.Configuration
import org.apache.spark.deploy.yarn.YarnAllocationHandler
import org.apache.spark.util.Utils
/**
*
* This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
*/
private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
def this(sc: SparkContext) = this(sc, new Configuration())
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
val retval = YarnAllocationHandler.lookupRack(conf, host)
if (retval != null) Some(retval) else None
}
override def postStartHook() {
// The yarn application is running, but the worker might not yet ready
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
Thread.sleep(2000L)
logInfo("YarnClientClusterScheduler.postStartHook done")
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
import org.apache.spark.{SparkException, Logging, SparkContext}
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
private[spark] class YarnClientSchedulerBackend(
scheduler: ClusterScheduler,
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with Logging {
var client: Client = null
var appId: ApplicationId = null
override def start() {
super.start()
val defalutWorkerCores = "2"
val defalutWorkerMemory = "512m"
val defaultWorkerNumber = "1"
val userJar = System.getenv("SPARK_YARN_APP_JAR")
var workerCores = System.getenv("SPARK_WORKER_CORES")
var workerMemory = System.getenv("SPARK_WORKER_MEMORY")
var workerNumber = System.getenv("SPARK_WORKER_INSTANCES")
if (userJar == null)
throw new SparkException("env SPARK_YARN_APP_JAR is not set")
if (workerCores == null)
workerCores = defalutWorkerCores
if (workerMemory == null)
workerMemory = defalutWorkerMemory
if (workerNumber == null)
workerNumber = defaultWorkerNumber
val driverHost = System.getProperty("spark.driver.host")
val driverPort = System.getProperty("spark.driver.port")
val hostport = driverHost + ":" + driverPort
val argsArray = Array[String](
"--class", "notused",
"--jar", userJar,
"--args", hostport,
"--worker-memory", workerMemory,
"--worker-cores", workerCores,
"--num-workers", workerNumber,
"--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
)
val args = new ClientArguments(argsArray)
client = new Client(args)
appId = client.runApp()
waitForApp()
}
def waitForApp() {
// TODO : need a better way to find out whether the workers are ready or not
// maybe by resource usage report?
while(true) {
val report = client.getApplicationReport(appId)
logInfo("Application report from ASM: \n" +
"\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
"\t appStartTime: " + report.getStartTime() + "\n" +
"\t yarnAppState: " + report.getYarnApplicationState() + "\n"
)
// Ready to go, or already gone.
val state = report.getYarnApplicationState()
if (state == YarnApplicationState.RUNNING) {
return
} else if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
throw new SparkException("Yarn application already ended," +
"might be killed or not able to launch application master.")
}
Thread.sleep(1000)
}
}
override def stop() {
super.stop()
client.stop()
logInfo("Stoped")
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment