diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
index 9bc692d480d74870c4771c1c8773f23684103010..776db201f957c49dbc16d0d1ce806cd6d842f948 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -122,23 +122,26 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
       }
     }
   }
-  
+
   private def startUserClass(): Thread  = {
     logInfo("Starting the user JAR in a separate Thread")
     val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader)
       .getMethod("main", classOf[Array[String]])
     val t = new Thread {
       override def run() {
+        var successed = false
         try{
           // Copy
           var mainArgs: Array[String] = new Array[String](args.userArgs.size())
           args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
           mainMethod.invoke(null, mainArgs)
-          ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
-        } catch {
-          case th: Throwable =>
+          successed = true
+        } finally {
+          if(successed){
+            ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+          }else{
             ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
-            logError("Finish ApplicationMaster with ",th)
+          }
         }
       }
     }
@@ -248,14 +251,15 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
         return
       }
       isFinished = true
-
-      logInfo("finishApplicationMaster with "+status)
-      val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
-        .asInstanceOf[FinishApplicationMasterRequest]
-      finishReq.setAppAttemptId(appAttemptId)
-      finishReq.setFinishApplicationStatus(status)
-      resourceManager.finishApplicationMaster(finishReq)
     }
+
+    logInfo("finishApplicationMaster with " + status)
+    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+      .asInstanceOf[FinishApplicationMasterRequest]
+    finishReq.setAppAttemptId(appAttemptId)
+    finishReq.setFinishApplicationStatus(status)
+    resourceManager.finishApplicationMaster(finishReq)
+
   }
  
 }
@@ -304,6 +308,9 @@ object ApplicationMaster {
           logInfo("Invoking sc stop from shutdown hook") 
           sc.stop() 
           // best case ...
+          // due to the sparkContext is stopped and ApplicationMaster is down,
+          // the status of registered masters should be set KILLED better than FAILED.
+          // need discussion
           for (master <- applicationMasters) {
             master.finishApplicationMaster(FinalApplicationStatus.KILLED)
           }