Skip to content
Snippets Groups Projects
Commit 8fdd4895 authored by WangTaoTheTonic's avatar WangTaoTheTonic Committed by Thomas Graves
Browse files

[SPARK-2165][YARN]add support for setting maxAppAttempts in the ApplicationSubmissionContext

...xt

https://issues.apache.org/jira/browse/SPARK-2165

I still have 2 questions:
* If this config is not set, we should use yarn's corresponding value or a default value(like 2) on spark side?
* Is the config name best? Or "spark.yarn.am.maxAttempts"?

Author: WangTaoTheTonic <barneystinson@aliyun.com>

Closes #3878 from WangTaoTheTonic/SPARK-2165 and squashes the following commits:

1416c83 [WangTaoTheTonic] use the name spark.yarn.maxAppAttempts
202ac85 [WangTaoTheTonic] rephrase some
afdfc99 [WangTaoTheTonic] more detailed description
91562c6 [WangTaoTheTonic] add support for setting maxAppAttempts in the ApplicationSubmissionContext
parent 5fde6616
No related branches found
No related tags found
No related merge requests found
...@@ -149,6 +149,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes ...@@ -149,6 +149,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
In cluster mode, use spark.driver.extraJavaOptions instead. In cluster mode, use spark.driver.extraJavaOptions instead.
</td> </td>
</tr> </tr>
<tr>
<td><code>spark.yarn.maxAppAttempts</code></td>
<td>yarn.resourcemanager.am.max-attempts in YARN</td>
<td>
The maximum number of attempts that will be made to submit the application.
It should be no larger than the global number of max attempts in the YARN configuration.
</td>
</tr>
</table> </table>
# Launching Spark on YARN # Launching Spark on YARN
......
...@@ -102,7 +102,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -102,7 +102,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
logInfo("Invoking sc stop from shutdown hook") logInfo("Invoking sc stop from shutdown hook")
sc.stop() sc.stop()
} }
val maxAppAttempts = client.getMaxRegAttempts(yarnConf) val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
if (!finished) { if (!finished) {
......
...@@ -98,6 +98,11 @@ private[spark] class Client( ...@@ -98,6 +98,11 @@ private[spark] class Client(
appContext.setQueue(args.amQueue) appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(containerContext) appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK") appContext.setApplicationType("SPARK")
sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
case Some(v) => appContext.setMaxAppAttempts(v)
case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
"Cluster's default value will be used.")
}
val capability = Records.newRecord(classOf[Resource]) val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead) capability.setMemory(args.amMemory + amMemoryOverhead)
appContext.setResource(capability) appContext.setResource(capability)
......
...@@ -120,7 +120,10 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg ...@@ -120,7 +120,10 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
} }
/** Returns the maximum number of attempts to register the AM. */ /** Returns the maximum number of attempts to register the AM. */
def getMaxRegAttempts(conf: YarnConfiguration): Int = def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = {
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt).getOrElse(
yarnConf.getInt(
YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS))
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment