Skip to content
Snippets Groups Projects
Commit 238d0e68 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge branch 'master' of github.com:mesos/spark

parents ed7fd501 c7877d5e
No related branches found
No related tags found
No related merge requests found
...@@ -27,6 +27,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -27,6 +27,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = null private var yarnAllocator: YarnAllocationHandler = null
private var isFinished:Boolean = false
def run() { def run() {
...@@ -68,10 +69,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -68,10 +69,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Wait for the user class to Finish // Wait for the user class to Finish
userThread.join() userThread.join()
// Finish the ApplicationMaster
finishApplicationMaster()
// TODO: Exit based on success/failure
System.exit(0) System.exit(0)
} }
...@@ -124,17 +122,30 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -124,17 +122,30 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
} }
} }
} }
private def startUserClass(): Thread = { private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread") logInfo("Starting the user JAR in a separate Thread")
val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader) val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader)
.getMethod("main", classOf[Array[String]]) .getMethod("main", classOf[Array[String]])
val t = new Thread { val t = new Thread {
override def run() { override def run() {
// Copy var successed = false
var mainArgs: Array[String] = new Array[String](args.userArgs.size()) try {
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size()) // Copy
mainMethod.invoke(null, mainArgs) var mainArgs: Array[String] = new Array[String](args.userArgs.size())
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
mainMethod.invoke(null, mainArgs)
// some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
// userThread will stop here unless it has uncaught exception thrown out
// It need shutdown hook to set SUCCEEDED
successed = true
} finally {
if (successed) {
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
} else {
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
}
}
} }
} }
t.start() t.start()
...@@ -179,7 +190,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -179,7 +190,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("All workers have launched.") logInfo("All workers have launched.")
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
if (userThread.isAlive){ if (userThread.isAlive) {
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
...@@ -197,7 +208,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -197,7 +208,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
val t = new Thread { val t = new Thread {
override def run() { override def run() {
while (userThread.isAlive){ while (userThread.isAlive) {
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) { if (missingWorkerCount > 0) {
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers") logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
...@@ -235,14 +246,23 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -235,14 +246,23 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
} }
} }
*/ */
def finishApplicationMaster() { def finishApplicationMaster(status: FinalApplicationStatus) {
synchronized {
if (isFinished) {
return
}
isFinished = true
}
logInfo("finishApplicationMaster with " + status)
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest] .asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId) finishReq.setAppAttemptId(appAttemptId)
// TODO: Check if the application has failed or succeeded finishReq.setFinishApplicationStatus(status)
finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED)
resourceManager.finishApplicationMaster(finishReq) resourceManager.finishApplicationMaster(finishReq)
} }
} }
...@@ -256,7 +276,7 @@ object ApplicationMaster { ...@@ -256,7 +276,7 @@ object ApplicationMaster {
private val ALLOCATOR_LOOP_WAIT_COUNT = 30 private val ALLOCATOR_LOOP_WAIT_COUNT = 30
def incrementAllocatorLoop(by: Int) { def incrementAllocatorLoop(by: Int) {
val count = yarnAllocatorLoop.getAndAdd(by) val count = yarnAllocatorLoop.getAndAdd(by)
if (count >= ALLOCATOR_LOOP_WAIT_COUNT){ if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.synchronized { yarnAllocatorLoop.synchronized {
// to wake threads off wait ... // to wake threads off wait ...
yarnAllocatorLoop.notifyAll() yarnAllocatorLoop.notifyAll()
...@@ -291,14 +311,16 @@ object ApplicationMaster { ...@@ -291,14 +311,16 @@ object ApplicationMaster {
logInfo("Invoking sc stop from shutdown hook") logInfo("Invoking sc stop from shutdown hook")
sc.stop() sc.stop()
// best case ... // best case ...
for (master <- applicationMasters) master.finishApplicationMaster for (master <- applicationMasters) {
master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
}
} }
} ) } )
} }
// Wait for initialization to complete and atleast 'some' nodes can get allocated // Wait for initialization to complete and atleast 'some' nodes can get allocated
yarnAllocatorLoop.synchronized { yarnAllocatorLoop.synchronized {
while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT){ while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.wait(1000L) yarnAllocatorLoop.wait(1000L)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment