From 1cad31f00644d899d8e74d58c6eb4e9f72065473 Mon Sep 17 00:00:00 2001
From: Marcelo Vanzin <vanzin@cloudera.com>
Date: Tue, 11 Jul 2017 11:25:40 -0700
Subject: [PATCH] [SPARK-16019][YARN] Use separate RM poll interval when
 starting client AM.

Currently the code monitoring the launch of the client AM uses the value of
spark.yarn.report.interval as the interval for polling the RM; if someone
has that value to a really large interval, it would take that long to detect
that the client AM has started, which is not expected.

Instead, have a separate config for the interval to use when the client AM is
starting. The other config is still used in cluster mode, and to detect the
status of the client AM after it is already running.

Tested by running client and cluster mode apps with a modified value of
spark.yarn.report.interval, verifying client AM launch is detected before
that interval elapses.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #18380 from vanzin/SPARK-16019.
---
 .../scala/org/apache/spark/deploy/yarn/Client.scala    |  6 ++++--
 .../scala/org/apache/spark/deploy/yarn/config.scala    | 10 ++++++++--
 .../scheduler/cluster/YarnClientSchedulerBackend.scala |  6 +++++-
 3 files changed, 17 insertions(+), 5 deletions(-)

diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1dd0715918..7caaa91e1a 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -986,13 +986,15 @@ private[spark] class Client(
    * @param appId ID of the application to monitor.
    * @param returnOnRunning Whether to also return the application state when it is RUNNING.
    * @param logApplicationReport Whether to log details of the application report every iteration.
+   * @param interval How often to poll the YARN RM for application status (in ms).
    * @return A pair of the yarn application state and the final application state.
    */
   def monitorApplication(
       appId: ApplicationId,
       returnOnRunning: Boolean = false,
-      logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
-    val interval = sparkConf.get(REPORT_INTERVAL)
+      logApplicationReport: Boolean = true,
+      interval: Long = sparkConf.get(REPORT_INTERVAL)):
+      (YarnApplicationState, FinalApplicationStatus) = {
     var lastState: YarnApplicationState = null
     while (true) {
       Thread.sleep(interval)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index d4108caab2..187803cc60 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -127,7 +127,7 @@ package object config {
     .stringConf
     .createOptional
 
-  /* Cluster-mode launcher configuration. */
+  /* Launcher configuration. */
 
   private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
     .doc("In cluster mode, whether to wait for the application to finish before exiting the " +
@@ -136,10 +136,16 @@ package object config {
     .createWithDefault(true)
 
   private[spark] val REPORT_INTERVAL = ConfigBuilder("spark.yarn.report.interval")
-    .doc("Interval between reports of the current app status in cluster mode.")
+    .doc("Interval between reports of the current app status.")
     .timeConf(TimeUnit.MILLISECONDS)
     .createWithDefaultString("1s")
 
+  private[spark] val CLIENT_LAUNCH_MONITOR_INTERVAL =
+    ConfigBuilder("spark.yarn.clientLaunchMonitorInterval")
+      .doc("Interval between requests for status the client mode AM when starting the app.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("1s")
+
   /* Shared Client-mode AM / Driver configuration. */
 
   private[spark] val AM_MAX_WAIT_TIME = ConfigBuilder("spark.yarn.am.waitTime")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 60da356ad1..d482376d14 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState
 
 import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
+import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.launcher.SparkAppHandle
 import org.apache.spark.scheduler.TaskSchedulerImpl
@@ -77,8 +78,11 @@ private[spark] class YarnClientSchedulerBackend(
    * This assumes both `client` and `appId` have already been set.
    */
   private def waitForApplication(): Unit = {
+    val monitorInterval = conf.get(CLIENT_LAUNCH_MONITOR_INTERVAL)
+
     assert(client != null && appId.isDefined, "Application has not been submitted yet!")
-    val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true) // blocking
+    val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true,
+      interval = monitorInterval) // blocking
     if (state == YarnApplicationState.FINISHED ||
       state == YarnApplicationState.FAILED ||
       state == YarnApplicationState.KILLED) {
-- 
GitLab