From c1d44be80580f0fad6bb1805bbcf74a34f536d8c Mon Sep 17 00:00:00 2001
From: BlackNiuza <shiyun.wxm@taobao.com>
Date: Tue, 9 Jul 2013 15:18:28 +0800
Subject: [PATCH] Bug fix: SPARK-796

---
 .../spark/deploy/yarn/ApplicationMaster.scala | 49 ++++++++++++-------
 1 file changed, 32 insertions(+), 17 deletions(-)

diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
index f19648ec68..9bc692d480 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -27,6 +27,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
   private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
 
   private var yarnAllocator: YarnAllocationHandler = null
+  private var isFinished:Boolean = false
 
   def run() {
     
@@ -68,10 +69,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     
     // Wait for the user class to Finish     
     userThread.join()
-     
-    // Finish the ApplicationMaster
-    finishApplicationMaster()
-    // TODO: Exit based on success/failure
+
     System.exit(0)
   }
   
@@ -131,10 +129,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
       .getMethod("main", classOf[Array[String]])
     val t = new Thread {
       override def run() {
-        // Copy
-        var mainArgs: Array[String] = new Array[String](args.userArgs.size())
-        args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
-        mainMethod.invoke(null, mainArgs)
+        try{
+          // Copy
+          var mainArgs: Array[String] = new Array[String](args.userArgs.size())
+          args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
+          mainMethod.invoke(null, mainArgs)
+          ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+        } catch {
+          case th: Throwable =>
+            ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
+            logError("Finish ApplicationMaster with ",th)
+        }
       }
     }
     t.start()
@@ -235,14 +240,22 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     }
   }
   */
-  
-  def finishApplicationMaster() { 
-    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
-      .asInstanceOf[FinishApplicationMasterRequest]
-    finishReq.setAppAttemptId(appAttemptId)
-    // TODO: Check if the application has failed or succeeded
-    finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED)
-    resourceManager.finishApplicationMaster(finishReq)
+
+  def finishApplicationMaster(status: FinalApplicationStatus) {
+
+    synchronized {
+      if(isFinished){
+        return
+      }
+      isFinished = true
+
+      logInfo("finishApplicationMaster with "+status)
+      val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+        .asInstanceOf[FinishApplicationMasterRequest]
+      finishReq.setAppAttemptId(appAttemptId)
+      finishReq.setFinishApplicationStatus(status)
+      resourceManager.finishApplicationMaster(finishReq)
+    }
   }
  
 }
@@ -291,7 +304,9 @@ object ApplicationMaster {
           logInfo("Invoking sc stop from shutdown hook") 
           sc.stop() 
           // best case ...
-          for (master <- applicationMasters) master.finishApplicationMaster
+          for (master <- applicationMasters) {
+            master.finishApplicationMaster(FinalApplicationStatus.KILLED)
+          }
         } 
       } )
     }
-- 
GitLab