diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1dd07159180427cb68bb89280d756bb03b24800d..7caaa91e1af97e029cd5ed52eb9ceaf6bf8c01c2 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -986,13 +986,15 @@ private[spark] class Client(
    * @param appId ID of the application to monitor.
    * @param returnOnRunning Whether to also return the application state when it is RUNNING.
    * @param logApplicationReport Whether to log details of the application report every iteration.
+   * @param interval How often to poll the YARN RM for application status (in ms).
    * @return A pair of the yarn application state and the final application state.
    */
   def monitorApplication(
       appId: ApplicationId,
       returnOnRunning: Boolean = false,
-      logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
-    val interval = sparkConf.get(REPORT_INTERVAL)
+      logApplicationReport: Boolean = true,
+      interval: Long = sparkConf.get(REPORT_INTERVAL)):
+      (YarnApplicationState, FinalApplicationStatus) = {
     var lastState: YarnApplicationState = null
     while (true) {
       Thread.sleep(interval)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index d4108caab28c18976330afe7a9694211899ed53b..187803cc6050b692e6ee366caa8fae2491a1d4a6 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -127,7 +127,7 @@ package object config {
     .stringConf
     .createOptional
 
-  /* Cluster-mode launcher configuration. */
+  /* Launcher configuration. */
 
   private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
     .doc("In cluster mode, whether to wait for the application to finish before exiting the " +
@@ -136,10 +136,16 @@ package object config {
     .createWithDefault(true)
 
   private[spark] val REPORT_INTERVAL = ConfigBuilder("spark.yarn.report.interval")
-    .doc("Interval between reports of the current app status in cluster mode.")
+    .doc("Interval between reports of the current app status.")
     .timeConf(TimeUnit.MILLISECONDS)
     .createWithDefaultString("1s")
 
+  private[spark] val CLIENT_LAUNCH_MONITOR_INTERVAL =
+    ConfigBuilder("spark.yarn.clientLaunchMonitorInterval")
+      .doc("Interval between requests for status the client mode AM when starting the app.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("1s")
+
   /* Shared Client-mode AM / Driver configuration. */
 
   private[spark] val AM_MAX_WAIT_TIME = ConfigBuilder("spark.yarn.am.waitTime")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 60da356ad14aadfe9bdcd7f8c226cee6551498a2..d482376d14dd76992b0f4673465225a3da50d71b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState
 
 import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
+import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.launcher.SparkAppHandle
 import org.apache.spark.scheduler.TaskSchedulerImpl
@@ -77,8 +78,11 @@ private[spark] class YarnClientSchedulerBackend(
    * This assumes both `client` and `appId` have already been set.
    */
   private def waitForApplication(): Unit = {
+    val monitorInterval = conf.get(CLIENT_LAUNCH_MONITOR_INTERVAL)
+
     assert(client != null && appId.isDefined, "Application has not been submitted yet!")
-    val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true) // blocking
+    val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true,
+      interval = monitorInterval) // blocking
     if (state == YarnApplicationState.FINISHED ||
       state == YarnApplicationState.FAILED ||
       state == YarnApplicationState.KILLED) {