Skip to content
Snippets Groups Projects
Commit c1d44be8 authored by BlackNiuza's avatar BlackNiuza
Browse files

Bug fix: SPARK-796

parent 3c131783
No related branches found
No related tags found
No related merge requests found
...@@ -27,6 +27,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -27,6 +27,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = null private var yarnAllocator: YarnAllocationHandler = null
private var isFinished:Boolean = false
def run() { def run() {
...@@ -68,10 +69,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -68,10 +69,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Wait for the user class to Finish // Wait for the user class to Finish
userThread.join() userThread.join()
// Finish the ApplicationMaster
finishApplicationMaster()
// TODO: Exit based on success/failure
System.exit(0) System.exit(0)
} }
...@@ -131,10 +129,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -131,10 +129,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.getMethod("main", classOf[Array[String]]) .getMethod("main", classOf[Array[String]])
val t = new Thread { val t = new Thread {
override def run() { override def run() {
// Copy try{
var mainArgs: Array[String] = new Array[String](args.userArgs.size()) // Copy
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size()) var mainArgs: Array[String] = new Array[String](args.userArgs.size())
mainMethod.invoke(null, mainArgs) args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
mainMethod.invoke(null, mainArgs)
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
} catch {
case th: Throwable =>
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
logError("Finish ApplicationMaster with ",th)
}
} }
} }
t.start() t.start()
...@@ -235,14 +240,22 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ...@@ -235,14 +240,22 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
} }
} }
*/ */
def finishApplicationMaster() { def finishApplicationMaster(status: FinalApplicationStatus) {
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest] synchronized {
finishReq.setAppAttemptId(appAttemptId) if(isFinished){
// TODO: Check if the application has failed or succeeded return
finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED) }
resourceManager.finishApplicationMaster(finishReq) isFinished = true
logInfo("finishApplicationMaster with "+status)
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
resourceManager.finishApplicationMaster(finishReq)
}
} }
} }
...@@ -291,7 +304,9 @@ object ApplicationMaster { ...@@ -291,7 +304,9 @@ object ApplicationMaster {
logInfo("Invoking sc stop from shutdown hook") logInfo("Invoking sc stop from shutdown hook")
sc.stop() sc.stop()
// best case ... // best case ...
for (master <- applicationMasters) master.finishApplicationMaster for (master <- applicationMasters) {
master.finishApplicationMaster(FinalApplicationStatus.KILLED)
}
} }
} ) } )
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment