From 8fdd48959c93b9cf809f03549e2ae6c4687d1fcd Mon Sep 17 00:00:00 2001
From: WangTaoTheTonic <barneystinson@aliyun.com>
Date: Wed, 7 Jan 2015 08:14:39 -0600
Subject: [PATCH] [SPARK-2165][YARN]add support for setting maxAppAttempts in
 the ApplicationSubmissionContext

...xt

https://issues.apache.org/jira/browse/SPARK-2165

I still have 2 questions:
* If this config is not set, we should use yarn's corresponding value or a default value(like 2) on spark side?
* Is the config name best? Or "spark.yarn.am.maxAttempts"?

Author: WangTaoTheTonic <barneystinson@aliyun.com>

Closes #3878 from WangTaoTheTonic/SPARK-2165 and squashes the following commits:

1416c83 [WangTaoTheTonic] use the name spark.yarn.maxAppAttempts
202ac85 [WangTaoTheTonic] rephrase some
afdfc99 [WangTaoTheTonic] more detailed description
91562c6 [WangTaoTheTonic] add support for setting maxAppAttempts in the ApplicationSubmissionContext
---
 docs/running-on-yarn.md                                   | 8 ++++++++
 .../org/apache/spark/deploy/yarn/ApplicationMaster.scala  | 2 +-
 .../main/scala/org/apache/spark/deploy/yarn/Client.scala  | 5 +++++
 .../scala/org/apache/spark/deploy/yarn/YarnRMClient.scala | 7 +++++--
 4 files changed, 19 insertions(+), 3 deletions(-)

diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index da1c8e8aa8..183698ffe9 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -149,6 +149,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
   In cluster mode, use spark.driver.extraJavaOptions instead.
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.maxAppAttempts</code></td>
+  <td>yarn.resourcemanager.am.max-attempts in YARN</td>
+  <td>
+  The maximum number of attempts that will be made to submit the application.
+  It should be no larger than the global number of max attempts in the YARN configuration.
+  </td>
+</tr>
 </table>
 
 # Launching Spark on YARN
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 618db7f908..902bdda598 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -102,7 +102,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
             logInfo("Invoking sc stop from shutdown hook")
             sc.stop()
           }
-          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
+          val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
           val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
 
           if (!finished) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index addaddb711..a2c3f918a1 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -98,6 +98,11 @@ private[spark] class Client(
     appContext.setQueue(args.amQueue)
     appContext.setAMContainerSpec(containerContext)
     appContext.setApplicationType("SPARK")
+    sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
+      case Some(v) => appContext.setMaxAppAttempts(v)
+      case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
+          "Cluster's default value will be used.")
+    }
     val capability = Records.newRecord(classOf[Resource])
     capability.setMemory(args.amMemory + amMemoryOverhead)
     appContext.setResource(capability)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index bf4e15908b..e183efccbb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -120,7 +120,10 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
   }
 
   /** Returns the maximum number of attempts to register the AM. */
-  def getMaxRegAttempts(conf: YarnConfiguration): Int =
-    conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
+  def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = {
+    sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt).getOrElse(
+      yarnConf.getInt(
+        YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS))
+  }
 
 }
-- 
GitLab