diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 65238af2caa24cd8a2660d304251558fe8bfd770..8d13b2a2cd4f3422701a17439505a588a79dc64f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -89,7 +89,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
 
   /* Find out driver status then exit the JVM */
   def pollAndReportStatus(driverId: String) {
-    println(s"... waiting before polling master for driver state")
+    println("... waiting before polling master for driver state")
     Thread.sleep(5000)
     println("... polling master for driver state")
     val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
index a3539e44bd2f9db63b07df9934b6bf8dbe080d00..b8fd406fb6f9aec5fbef064f315d9d39b3bc773a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
@@ -245,7 +245,7 @@ private[deploy] class StandaloneRestClient extends Logging {
       }
     } else {
       val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("")
-      logError("Application submission failed" + failMessage)
+      logError(s"Application submission failed$failMessage")
     }
   }
 
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index d9f3eb2b74b18223a249759f3423e152ab100473..b7e68d4f7171486090e4a70c87908143faa3d0a0 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -196,6 +196,15 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
   It should be no larger than the global number of max attempts in the YARN configuration.
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.submit.waitAppCompletion</code></td>
+  <td>true</td>
+  <td>
+  In YARN cluster mode, controls whether the client waits to exit until the application completes.
+  If set to true, the client process will stay alive reporting the application's status.
+  Otherwise, the client process will exit after submission.
+  </td>
+</tr>
 </table>
 
 # Launching Spark on YARN
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 61f8fc3f5a01466e904e4d926b499f3a554008f1..79d55a09eb67126170d74fb766676af62f610da0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -66,6 +66,8 @@ private[spark] class Client(
   private val executorMemoryOverhead = args.executorMemoryOverhead // MB
   private val distCacheMgr = new ClientDistributedCacheManager()
   private val isClusterMode = args.isClusterMode
+  private val fireAndForget = isClusterMode &&
+    !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
 
 
   def stop(): Unit = yarnClient.stop()
@@ -564,31 +566,13 @@ private[spark] class Client(
 
       if (logApplicationReport) {
         logInfo(s"Application report for $appId (state: $state)")
-        val details = Seq[(String, String)](
-          ("client token", getClientToken(report)),
-          ("diagnostics", report.getDiagnostics),
-          ("ApplicationMaster host", report.getHost),
-          ("ApplicationMaster RPC port", report.getRpcPort.toString),
-          ("queue", report.getQueue),
-          ("start time", report.getStartTime.toString),
-          ("final status", report.getFinalApplicationStatus.toString),
-          ("tracking URL", report.getTrackingUrl),
-          ("user", report.getUser)
-        )
-
-        // Use more loggable format if value is null or empty
-        val formattedDetails = details
-          .map { case (k, v) =>
-          val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
-          s"\n\t $k: $newValue" }
-          .mkString("")
 
         // If DEBUG is enabled, log report details every iteration
         // Otherwise, log them every time the application changes state
         if (log.isDebugEnabled) {
-          logDebug(formattedDetails)
+          logDebug(formatReportDetails(report))
         } else if (lastState != state) {
-          logInfo(formattedDetails)
+          logInfo(formatReportDetails(report))
         }
       }
 
@@ -609,24 +593,57 @@ private[spark] class Client(
     throw new SparkException("While loop is depleted! This should never happen...")
   }
 
+  private def formatReportDetails(report: ApplicationReport): String = {
+    val details = Seq[(String, String)](
+      ("client token", getClientToken(report)),
+      ("diagnostics", report.getDiagnostics),
+      ("ApplicationMaster host", report.getHost),
+      ("ApplicationMaster RPC port", report.getRpcPort.toString),
+      ("queue", report.getQueue),
+      ("start time", report.getStartTime.toString),
+      ("final status", report.getFinalApplicationStatus.toString),
+      ("tracking URL", report.getTrackingUrl),
+      ("user", report.getUser)
+    )
+
+    // Use more loggable format if value is null or empty
+    details.map { case (k, v) =>
+      val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
+      s"\n\t $k: $newValue"
+    }.mkString("")
+  }
+
   /**
-   * Submit an application to the ResourceManager and monitor its state.
-   * This continues until the application has exited for any reason.
+   * Submit an application to the ResourceManager.
+   * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
+   * reporting the application's status until the application has exited for any reason.
+   * Otherwise, the client process will exit after submission.
    * If the application finishes with a failed, killed, or undefined status,
    * throw an appropriate SparkException.
    */
   def run(): Unit = {
-    val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication())
-    if (yarnApplicationState == YarnApplicationState.FAILED ||
-      finalApplicationStatus == FinalApplicationStatus.FAILED) {
-      throw new SparkException("Application finished with failed status")
-    }
-    if (yarnApplicationState == YarnApplicationState.KILLED ||
-      finalApplicationStatus == FinalApplicationStatus.KILLED) {
-      throw new SparkException("Application is killed")
-    }
-    if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
-      throw new SparkException("The final status of application is undefined")
+    val appId = submitApplication()
+    if (fireAndForget) {
+      val report = getApplicationReport(appId)
+      val state = report.getYarnApplicationState
+      logInfo(s"Application report for $appId (state: $state)")
+      logInfo(formatReportDetails(report))
+      if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
+        throw new SparkException(s"Application $appId finished with status: $state")
+      }
+    } else {
+      val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
+      if (yarnApplicationState == YarnApplicationState.FAILED ||
+        finalApplicationStatus == FinalApplicationStatus.FAILED) {
+        throw new SparkException(s"Application $appId finished with failed status")
+      }
+      if (yarnApplicationState == YarnApplicationState.KILLED ||
+        finalApplicationStatus == FinalApplicationStatus.KILLED) {
+        throw new SparkException(s"Application $appId is killed")
+      }
+      if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
+        throw new SparkException(s"The final status of application $appId is undefined")
+      }
     }
   }
 }