diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
index 9bd1719cb18089e5c486fdcb6ca52a8f3801e51a..7faf55bc633725891166f8c7636b882fde057c69 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -40,6 +40,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
   private var rpc: YarnRPC = null
   private var resourceManager: AMRMProtocol = _
   private var uiHistoryAddress: String = _
+  private var registered: Boolean = false
 
   override def register(
       conf: YarnConfiguration,
@@ -51,8 +52,11 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
     this.rpc = YarnRPC.create(conf)
     this.uiHistoryAddress = uiHistoryAddress
 
-    resourceManager = registerWithResourceManager(conf)
-    registerApplicationMaster(uiAddress)
+    synchronized {
+      resourceManager = registerWithResourceManager(conf)
+      registerApplicationMaster(uiAddress)
+      registered = true
+    }
 
     new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
       preferredNodeLocations, securityMgr)
@@ -66,14 +70,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
     appAttemptId
   }
 
-  override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = {
-    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
-      .asInstanceOf[FinishApplicationMasterRequest]
-    finishReq.setAppAttemptId(getAttemptId())
-    finishReq.setFinishApplicationStatus(status)
-    finishReq.setDiagnostics(diagnostics)
-    finishReq.setTrackingUrl(uiHistoryAddress)
-    resourceManager.finishApplicationMaster(finishReq)
+  override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
+    if (registered) {
+      val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+        .asInstanceOf[FinishApplicationMasterRequest]
+      finishReq.setAppAttemptId(getAttemptId())
+      finishReq.setFinishApplicationStatus(status)
+      finishReq.setDiagnostics(diagnostics)
+      finishReq.setTrackingUrl(uiHistoryAddress)
+      resourceManager.finishApplicationMaster(finishReq)
+    }
   }
 
   override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index caceef5d4b5b0415c74316c117451e8c44161844..a3c43b43848d2bcf9b9997449be977265d94e0d5 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
+import org.apache.spark.SparkException
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -56,8 +57,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
   private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
     sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
 
+  @volatile private var exitCode = 0
+  @volatile private var unregistered = false
   @volatile private var finished = false
   @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
+  @volatile private var finalMsg: String = ""
   @volatile private var userClassThread: Thread = _
 
   private var reporterThread: Thread = _
@@ -71,80 +75,107 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
   private val sparkContextRef = new AtomicReference[SparkContext](null)
 
   final def run(): Int = {
-    val appAttemptId = client.getAttemptId()
+    try {
+      val appAttemptId = client.getAttemptId()
 
-    if (isDriver) {
-      // Set the web ui port to be ephemeral for yarn so we don't conflict with
-      // other spark processes running on the same box
-      System.setProperty("spark.ui.port", "0")
+      if (isDriver) {
+        // Set the web ui port to be ephemeral for yarn so we don't conflict with
+        // other spark processes running on the same box
+        System.setProperty("spark.ui.port", "0")
 
-      // Set the master property to match the requested mode.
-      System.setProperty("spark.master", "yarn-cluster")
+        // Set the master property to match the requested mode.
+        System.setProperty("spark.master", "yarn-cluster")
 
-      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
-      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
-    }
+        // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
+        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
+      }
 
-    logInfo("ApplicationAttemptId: " + appAttemptId)
+      logInfo("ApplicationAttemptId: " + appAttemptId)
 
-    val cleanupHook = new Runnable {
-      override def run() {
-        // If the SparkContext is still registered, shut it down as a best case effort in case
-        // users do not call sc.stop or do System.exit().
-        val sc = sparkContextRef.get()
-        if (sc != null) {
-          logInfo("Invoking sc stop from shutdown hook")
-          sc.stop()
-          finish(FinalApplicationStatus.SUCCEEDED)
-        }
+      val cleanupHook = new Runnable {
+        override def run() {
+          // If the SparkContext is still registered, shut it down as a best case effort in case
+          // users do not call sc.stop or do System.exit().
+          val sc = sparkContextRef.get()
+          if (sc != null) {
+            logInfo("Invoking sc stop from shutdown hook")
+            sc.stop()
+          }
+          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
+          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
+
+          if (!finished) {
+            // this shouldn't ever happen, but if it does assume weird failure
+            finish(FinalApplicationStatus.FAILED,
+              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
+              "shutdown hook called without cleanly finishing")
+          }
 
-        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
-        // running the AM.
-        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
-        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
-        if (finished || isLastAttempt) {
-          cleanupStagingDir()
+          if (!unregistered) {
+            // we only want to unregister if we don't want the RM to retry
+            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
+              unregister(finalStatus, finalMsg)
+              cleanupStagingDir()
+            }
+          }
         }
       }
-    }
 
-    // Use higher priority than FileSystem.
-    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
-    ShutdownHookManager
-      .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
+      // Use higher priority than FileSystem.
+      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
+      ShutdownHookManager
+        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
 
-    // Call this to force generation of secret so it gets populated into the
-    // Hadoop UGI. This has to happen before the startUserClass which does a
-    // doAs in order for the credentials to be passed on to the executor containers.
-    val securityMgr = new SecurityManager(sparkConf)
+      // Call this to force generation of secret so it gets populated into the
+      // Hadoop UGI. This has to happen before the startUserClass which does a
+      // doAs in order for the credentials to be passed on to the executor containers.
+      val securityMgr = new SecurityManager(sparkConf)
 
-    if (isDriver) {
-      runDriver(securityMgr)
-    } else {
-      runExecutorLauncher(securityMgr)
+      if (isDriver) {
+        runDriver(securityMgr)
+      } else {
+        runExecutorLauncher(securityMgr)
+      }
+    } catch {
+      case e: Exception =>
+        // catch everything else if not specifically handled
+        logError("Uncaught exception: ", e)
+        finish(FinalApplicationStatus.FAILED,
+          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
+          "Uncaught exception: " + e.getMessage())
     }
+    exitCode
+  }
 
-    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
-      finish(finalStatus)
-      0
-    } else {
-      1
+  /**
+   * unregister is used to completely unregister the application from the ResourceManager.
+   * This means the ResourceManager will not retry the application attempt on your behalf if
+   * a failure occurred.
+   */
+  final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
+    if (!unregistered) {
+      logInfo(s"Unregistering ApplicationMaster with $status" +
+        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
+      unregistered = true
+      client.unregister(status, Option(diagnostics).getOrElse(""))
     }
   }
 
-  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
+  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
     if (!finished) {
-      logInfo(s"Finishing ApplicationMaster with $status"  +
-        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
-      finished = true
+      logInfo(s"Final app status: ${status}, exitCode: ${code}" +
+        Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
+      exitCode = code
       finalStatus = status
-      try {
-        if (Thread.currentThread() != reporterThread) {
-          reporterThread.interrupt()
-          reporterThread.join()
-        }
-      } finally {
-        client.shutdown(status, Option(diagnostics).getOrElse(""))
+      finalMsg = msg
+      finished = true
+      if (Thread.currentThread() != reporterThread && reporterThread != null) {
+        logDebug("shutting down reporter thread")
+        reporterThread.interrupt()
+      }
+      if (Thread.currentThread() != userClassThread && userClassThread != null) {
+        logDebug("shutting down user thread")
+        userClassThread.interrupt()
       }
     }
   }
@@ -182,7 +213,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
 
   private def runDriver(securityMgr: SecurityManager): Unit = {
     addAmIpFilter()
-    val userThread = startUserClass()
+    setupSystemSecurityManager()
+    userClassThread = startUserClass()
 
     // This a bit hacky, but we need to wait until the spark.driver.port property has
     // been set by the Thread executing the user class.
@@ -190,15 +222,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
 
     // If there is no SparkContext at this point, just fail the app.
     if (sc == null) {
-      finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
+      finish(FinalApplicationStatus.FAILED,
+        ApplicationMaster.EXIT_SC_NOT_INITED,
+        "Timed out waiting for SparkContext.")
     } else {
       registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
-      try {
-        userThread.join()
-      } finally {
-        // In cluster mode, ask the reporter thread to stop since the user app is finished.
-        reporterThread.interrupt()
-      }
+      userClassThread.join()
     }
   }
 
@@ -211,7 +240,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
 
     // In client mode the actor will stop the reporter thread.
     reporterThread.join()
-    finalStatus = FinalApplicationStatus.SUCCEEDED
   }
 
   private def launchReporterThread(): Thread = {
@@ -231,33 +259,26 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
     val t = new Thread {
       override def run() {
         var failureCount = 0
-
         while (!finished) {
           try {
-            checkNumExecutorsFailed()
-            if (!finished) {
+            if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+              finish(FinalApplicationStatus.FAILED,
+                ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
+                "Max number of executor failures reached")
+            } else {
               logDebug("Sending progress")
               allocator.allocateResources()
             }
             failureCount = 0
           } catch {
+            case i: InterruptedException =>
             case e: Throwable => {
               failureCount += 1
               if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
-                logError("Exception was thrown from Reporter thread.", e)
-                finish(FinalApplicationStatus.FAILED, "Exception was thrown" +
-                  s"${failureCount} time(s) from Reporter thread.")
-
-                /**
-                 * If exception is thrown from ReporterThread,
-                 * interrupt user class to stop.
-                 * Without this interrupting, if exception is
-                 * thrown before allocating enough executors,
-                 * YarnClusterScheduler waits until timeout even though
-                 * we cannot allocate executors.
-                 */
-                logInfo("Interrupting user class to stop.")
-                userClassThread.interrupt
+                finish(FinalApplicationStatus.FAILED,
+                  ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
+                    s"${failureCount} time(s) from Reporter thread.")
+
               } else {
                 logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e)
               }
@@ -308,7 +329,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       sparkContextRef.synchronized {
         var count = 0
         val waitTime = 10000L
-        val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
+        val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
         while (sparkContextRef.get() == null && count < numTries && !finished) {
           logInfo("Waiting for spark context initialization ... " + count)
           count = count + 1
@@ -328,10 +349,19 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
   private def waitForSparkDriver(): ActorRef = {
     logInfo("Waiting for Spark driver to be reachable.")
     var driverUp = false
+    var count = 0
     val hostport = args.userArgs(0)
     val (driverHost, driverPort) = Utils.parseHostPort(hostport)
-    while (!driverUp) {
+
+    // spark driver should already be up since it launched us, but we don't want to
+    // wait forever, so wait 100 seconds max to match the cluster mode setting.
+    // Leave this config unpublished for now. SPARK-3779 to investigating changing
+    // this config to be time based.
+    val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 1000)
+
+    while (!driverUp && !finished && count < numTries) {
       try {
+        count = count + 1
         val socket = new Socket(driverHost, driverPort)
         socket.close()
         logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
@@ -343,6 +373,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
           Thread.sleep(100)
       }
     }
+
+    if (!driverUp) {
+      throw new SparkException("Failed to connect to driver!")
+    }
+
     sparkConf.set("spark.driver.host", driverHost)
     sparkConf.set("spark.driver.port", driverPort.toString)
 
@@ -354,18 +389,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
     actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
   }
 
-  private def checkNumExecutorsFailed() = {
-    if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
-      finish(FinalApplicationStatus.FAILED, "Max number of executor failures reached.")
-
-      val sc = sparkContextRef.get()
-      if (sc != null) {
-        logInfo("Invoking sc stop from checkNumExecutorsFailed")
-        sc.stop()
-      }
-    }
-  }
-
   /** Add the Yarn IP filter that is required for properly securing the UI. */
   private def addAmIpFilter() = {
     val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
@@ -379,40 +402,81 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
     }
   }
 
+  /**
+   * This system security manager applies to the entire process.
+   * It's main purpose is to handle the case if the user code does a System.exit.
+   * This allows us to catch that and properly set the YARN application status and
+   * cleanup if needed.
+   */
+  private def setupSystemSecurityManager(): Unit = {
+    try {
+      var stopped = false
+      System.setSecurityManager(new java.lang.SecurityManager() {
+        override def checkExit(paramInt: Int) {
+          if (!stopped) {
+            logInfo("In securityManager checkExit, exit code: " + paramInt)
+            if (paramInt == 0) {
+              finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+            } else {
+              finish(FinalApplicationStatus.FAILED,
+                paramInt,
+                "User class exited with non-zero exit code")
+            }
+            stopped = true
+          }
+        }
+        // required for the checkExit to work properly
+        override def checkPermission(perm: java.security.Permission): Unit = {}
+      })
+    }
+    catch {
+      case e: SecurityException =>
+        finish(FinalApplicationStatus.FAILED,
+          ApplicationMaster.EXIT_SECURITY,
+          "Error in setSecurityManager")
+        logError("Error in setSecurityManager:", e)
+    }
+  }
+
+  /**
+   * Start the user class, which contains the spark driver, in a separate Thread.
+   * If the main routine exits cleanly or exits with System.exit(0) we
+   * assume it was successful, for all other cases we assume failure.
+   *
+   * Returns the user thread that was started.
+   */
   private def startUserClass(): Thread = {
     logInfo("Starting the user JAR in a separate Thread")
     System.setProperty("spark.executor.instances", args.numExecutors.toString)
     val mainMethod = Class.forName(args.userClass, false,
       Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
 
-    userClassThread = new Thread {
+    val userThread = new Thread {
       override def run() {
-        var status = FinalApplicationStatus.FAILED
         try {
-          // Copy
           val mainArgs = new Array[String](args.userArgs.size)
           args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
           mainMethod.invoke(null, mainArgs)
-          // Some apps have "System.exit(0)" at the end.  The user thread will stop here unless
-          // it has an uncaught exception thrown out.  It needs a shutdown hook to set SUCCEEDED.
-          status = FinalApplicationStatus.SUCCEEDED
+          finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+          logDebug("Done running users class")
         } catch {
           case e: InvocationTargetException =>
             e.getCause match {
               case _: InterruptedException =>
                 // Reporter thread can interrupt to stop user class
-
-              case e => throw e
+              case e: Exception =>
+                finish(FinalApplicationStatus.FAILED,
+                  ApplicationMaster.EXIT_EXCEPTION_USER_CLASS,
+                  "User class threw exception: " + e.getMessage)
+                // re-throw to get it logged
+                throw e
             }
-        } finally {
-          logDebug("Finishing main")
-          finalStatus = status
         }
       }
     }
-    userClassThread.setName("Driver")
-    userClassThread.start()
-    userClassThread
+    userThread.setName("Driver")
+    userThread.start()
+    userThread
   }
 
   // Actor used to monitor the driver when running in client deploy mode.
@@ -432,7 +496,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
     override def receive = {
       case x: DisassociatedEvent =>
         logInfo(s"Driver terminated or disconnected! Shutting down. $x")
-        finish(FinalApplicationStatus.SUCCEEDED)
+        finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
       case x: AddWebUIFilter =>
         logInfo(s"Add WebUI Filter. $x")
         driver ! x
@@ -446,6 +510,15 @@ object ApplicationMaster extends Logging {
 
   val SHUTDOWN_HOOK_PRIORITY: Int = 30
 
+  // exit codes for different causes, no reason behind the values
+  private val EXIT_SUCCESS = 0
+  private val EXIT_UNCAUGHT_EXCEPTION = 10
+  private val EXIT_MAX_EXECUTOR_FAILURES = 11
+  private val EXIT_REPORTER_FAILURE = 12
+  private val EXIT_SC_NOT_INITED = 13
+  private val EXIT_SECURITY = 14
+  private val EXIT_EXCEPTION_USER_CLASS = 15
+
   private var master: ApplicationMaster = _
 
   def main(args: Array[String]) = {
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 943dc56202a37b8f6b9485241764860ee7a4f312..2510b9c9cef68938ac4843ccdd7ca22aa520e698 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -49,12 +49,12 @@ trait YarnRMClient {
       securityMgr: SecurityManager): YarnAllocator
 
   /**
-   * Shuts down the AM. Guaranteed to only be called once.
+   * Unregister the AM. Guaranteed to only be called once.
    *
    * @param status The final status of the AM.
    * @param diagnostics Diagnostics message to include in the final status.
    */
-  def shutdown(status: FinalApplicationStatus, diagnostics: String = ""): Unit
+  def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit
 
   /** Returns the attempt ID. */
   def getAttemptId(): ApplicationAttemptId
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
index b581790e158acdf6094050abbfe5ef4486d3b207..8d4b96ed79933a1f284f9d7e77889dc3871ac224 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -45,6 +45,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
 
   private var amClient: AMRMClient[ContainerRequest] = _
   private var uiHistoryAddress: String = _
+  private var registered: Boolean = false
 
   override def register(
       conf: YarnConfiguration,
@@ -59,13 +60,19 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
     this.uiHistoryAddress = uiHistoryAddress
 
     logInfo("Registering the ApplicationMaster")
-    amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
+    synchronized {
+      amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
+      registered = true
+    }
     new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
       preferredNodeLocations, securityMgr)
   }
 
-  override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") =
-    amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
+  override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
+    if (registered) {
+      amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
+    }
+  }
 
   override def getAttemptId() = {
     val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())