diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index da1c8e8aa8667590df7b2e06dfbcd4f8bfe6de63..183698ffe9304861bba0f047552bc2294d21bb30 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -149,6 +149,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
   In cluster mode, use spark.driver.extraJavaOptions instead.
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.maxAppAttempts</code></td>
+  <td>yarn.resourcemanager.am.max-attempts in YARN</td>
+  <td>
+  The maximum number of attempts that will be made to submit the application.
+  It should be no larger than the global number of max attempts in the YARN configuration.
+  </td>
+</tr>
 </table>
 
 # Launching Spark on YARN
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 618db7f9085c23b1d9a17d9830176726d539157d..902bdda59860ed1b12dfbe72f39f3608be3647a8 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -102,7 +102,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
             logInfo("Invoking sc stop from shutdown hook")
             sc.stop()
           }
-          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
+          val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
           val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
 
           if (!finished) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index addaddb711d3ca222e75dad98008321c2af9a9e3..a2c3f918a1ab2cf3ed90a38f253bf48eaea98449 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -98,6 +98,11 @@ private[spark] class Client(
     appContext.setQueue(args.amQueue)
     appContext.setAMContainerSpec(containerContext)
     appContext.setApplicationType("SPARK")
+    sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
+      case Some(v) => appContext.setMaxAppAttempts(v)
+      case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
+          "Cluster's default value will be used.")
+    }
     val capability = Records.newRecord(classOf[Resource])
     capability.setMemory(args.amMemory + amMemoryOverhead)
     appContext.setResource(capability)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index bf4e15908bb46f9fe8dc956174e9f95f4089ef92..e183efccbb6f7288b09501a8a8937c5fa90fe307 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -120,7 +120,10 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
   }
 
   /** Returns the maximum number of attempts to register the AM. */
-  def getMaxRegAttempts(conf: YarnConfiguration): Int =
-    conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
+  def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = {
+    sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt).getOrElse(
+      yarnConf.getInt(
+        YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS))
+  }
 
 }