From 0bd00ff2454c5046e4cb084ee64d432c4d3dcbc3 Mon Sep 17 00:00:00 2001
From: Marcelo Vanzin <vanzin@cloudera.com>
Date: Tue, 6 Sep 2016 15:54:54 -0700
Subject: [PATCH] [SPARK-15891][YARN] Clean up some logging in the YARN AM.

To make the log file more readable, rework some of the logging done
by the AM:

- log executor command / env just once, since they're all almost the same;
  the information that changes, such as executor ID, is already available
  in other log messages.
- avoid printing logs when nothing happens, especially when updating the
  container requests in the allocator.
- print fewer log messages when requesting many unlocalized executors,
  instead of repeating the same message multiple times.
- removed some logs that seemed unnecessary.

In the process, I slightly fixed up the wording in a few log messages, and
did some minor clean up of method arguments that were redundant.

Tested by running existing unit tests, and analyzing the logs of an
application that exercises dynamic allocation by forcing executors
to be allocated and be killed in waves.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #14943 from vanzin/SPARK-15891.
---
 .../spark/deploy/yarn/ApplicationMaster.scala | 15 ++-
 .../spark/deploy/yarn/ExecutorRunnable.scala  | 92 +++++++++----------
 .../spark/deploy/yarn/YarnAllocator.scala     | 34 ++++---
 .../deploy/yarn/YarnAllocatorSuite.scala      |  4 +-
 4 files changed, 82 insertions(+), 63 deletions(-)

diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index a4b575c85d..ad50ea789a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -173,7 +173,6 @@ private[spark] class ApplicationMaster(
       sys.props.remove(e.key)
     }
 
-    logInfo("Prepared Local resources " + resources)
     resources.toMap
   }
 
@@ -329,7 +328,7 @@ private[spark] class ApplicationMaster(
     val appId = client.getAttemptId().getApplicationId().toString()
     val attemptId = client.getAttemptId().getAttemptId().toString()
     val historyAddress =
-      sparkConf.get(HISTORY_SERVER_ADDRESS)
+      _sparkConf.get(HISTORY_SERVER_ADDRESS)
         .map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) }
         .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
         .getOrElse("")
@@ -338,6 +337,18 @@ private[spark] class ApplicationMaster(
       _sparkConf.get("spark.driver.host"),
       _sparkConf.get("spark.driver.port").toInt,
       CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+    // Before we initialize the allocator, let's log the information about how executors will
+    // be run up front, to avoid printing this out for every single executor being launched.
+    // Use placeholders for information that changes such as executor IDs.
+    logInfo {
+      val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
+      val executorCores = sparkConf.get(EXECUTOR_CORES)
+      val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>",
+        "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
+      dummyRunner.launchContextDebugInfo()
+    }
+
     allocator = client.register(driverUrl,
       driverRef,
       yarnConf,
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 3d0e996b18..8e0533f39a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -24,7 +24,6 @@ import java.util.Collections
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, ListBuffer}
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.security.UserGroupInformation
@@ -45,11 +44,11 @@ import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.util.Utils
 
 private[yarn] class ExecutorRunnable(
-    container: Container,
-    conf: Configuration,
+    container: Option[Container],
+    conf: YarnConfiguration,
     sparkConf: SparkConf,
     masterAddress: String,
-    slaveId: String,
+    executorId: String,
     hostname: String,
     executorMemory: Int,
     executorCores: Int,
@@ -59,43 +58,46 @@ private[yarn] class ExecutorRunnable(
 
   var rpc: YarnRPC = YarnRPC.create(conf)
   var nmClient: NMClient = _
-  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-  lazy val env = prepareEnvironment(container)
 
   def run(): Unit = {
-    logInfo("Starting Executor Container")
+    logDebug("Starting Executor Container")
     nmClient = NMClient.createNMClient()
-    nmClient.init(yarnConf)
+    nmClient.init(conf)
     nmClient.start()
     startContainer()
   }
 
-  def startContainer(): java.util.Map[String, ByteBuffer] = {
-    logInfo("Setting up ContainerLaunchContext")
+  def launchContextDebugInfo(): String = {
+    val commands = prepareCommand()
+    val env = prepareEnvironment()
+
+    s"""
+    |===============================================================================
+    |YARN executor launch context:
+    |  env:
+    |${env.map { case (k, v) => s"    $k -> $v\n" }.mkString}
+    |  command:
+    |    ${commands.mkString(" \\ \n      ")}
+    |
+    |  resources:
+    |${localResources.map { case (k, v) => s"    $k -> $v\n" }.mkString}
+    |===============================================================================""".stripMargin
+  }
 
+  def startContainer(): java.util.Map[String, ByteBuffer] = {
     val ctx = Records.newRecord(classOf[ContainerLaunchContext])
       .asInstanceOf[ContainerLaunchContext]
+    val env = prepareEnvironment().asJava
 
     ctx.setLocalResources(localResources.asJava)
-    ctx.setEnvironment(env.asJava)
+    ctx.setEnvironment(env)
 
     val credentials = UserGroupInformation.getCurrentUser().getCredentials()
     val dob = new DataOutputBuffer()
     credentials.writeTokenStorageToStream(dob)
     ctx.setTokens(ByteBuffer.wrap(dob.getData()))
 
-    val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
-      appId)
-
-    logInfo(s"""
-      |===============================================================================
-      |YARN executor launch context:
-      |  env:
-      |${env.map { case (k, v) => s"    $k -> $v\n" }.mkString}
-      |  command:
-      |    ${commands.mkString(" ")}
-      |===============================================================================
-      """.stripMargin)
+    val commands = prepareCommand()
 
     ctx.setCommands(commands.asJava)
     ctx.setApplicationACLs(
@@ -119,21 +121,15 @@ private[yarn] class ExecutorRunnable(
 
     // Send the start request to the ContainerManager
     try {
-      nmClient.startContainer(container, ctx)
+      nmClient.startContainer(container.get, ctx)
     } catch {
       case ex: Exception =>
-        throw new SparkException(s"Exception while starting container ${container.getId}" +
+        throw new SparkException(s"Exception while starting container ${container.get.getId}" +
           s" on host $hostname", ex)
     }
   }
 
-  private def prepareCommand(
-      masterAddress: String,
-      slaveId: String,
-      hostname: String,
-      executorMemory: Int,
-      executorCores: Int,
-      appId: String): List[String] = {
+  private def prepareCommand(): List[String] = {
     // Extra options for the JVM
     val javaOpts = ListBuffer[String]()
 
@@ -216,23 +212,23 @@ private[yarn] class ExecutorRunnable(
       "-server") ++
       javaOpts ++
       Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
-        "--driver-url", masterAddress.toString,
-        "--executor-id", slaveId.toString,
-        "--hostname", hostname.toString,
+        "--driver-url", masterAddress,
+        "--executor-id", executorId,
+        "--hostname", hostname,
         "--cores", executorCores.toString,
         "--app-id", appId) ++
       userClassPath ++
       Seq(
-        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
-        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+        s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
+        s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
 
     // TODO: it would be nicer to just make sure there are no null commands here
     commands.map(s => if (s == null) "null" else s).toList
   }
 
-  private def prepareEnvironment(container: Container): HashMap[String, String] = {
+  private def prepareEnvironment(): HashMap[String, String] = {
     val env = new HashMap[String, String]()
-    Client.populateClasspath(null, yarnConf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
+    Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
 
     sparkConf.getExecutorEnv.foreach { case (key, value) =>
       // This assumes each executor environment variable set here is a path
@@ -246,20 +242,22 @@ private[yarn] class ExecutorRunnable(
     }
 
     // lookup appropriate http scheme for container log urls
-    val yarnHttpPolicy = yarnConf.get(
+    val yarnHttpPolicy = conf.get(
       YarnConfiguration.YARN_HTTP_POLICY_KEY,
       YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
     )
     val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
 
     // Add log urls
-    sys.env.get("SPARK_USER").foreach { user =>
-      val containerId = ConverterUtils.toString(container.getId)
-      val address = container.getNodeHttpAddress
-      val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user"
-
-      env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096"
-      env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096"
+    container.foreach { c =>
+      sys.env.get("SPARK_USER").foreach { user =>
+        val containerId = ConverterUtils.toString(c.getId)
+        val address = c.getNodeHttpAddress
+        val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user"
+
+        env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096"
+        env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096"
+      }
     }
 
     System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index dbdac3369b..0daf1ea0bc 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -26,10 +26,10 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.RackResolver
 import org.apache.log4j.{Level, Logger}
 
@@ -60,7 +60,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
 private[yarn] class YarnAllocator(
     driverUrl: String,
     driverRef: RpcEndpointRef,
-    conf: Configuration,
+    conf: YarnConfiguration,
     sparkConf: SparkConf,
     amClient: AMRMClient[ContainerRequest],
     appAttemptId: ApplicationAttemptId,
@@ -297,8 +297,9 @@ private[yarn] class YarnAllocator(
     val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
 
     if (missing > 0) {
-      logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
-        s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
+      logInfo(s"Will request $missing executor container(s), each with " +
+        s"${resource.getVirtualCores} core(s) and " +
+        s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")
 
       // Split the pending container request into three groups: locality matched list, locality
       // unmatched list and non-locality list. Take the locality matched container request into
@@ -314,7 +315,9 @@ private[yarn] class YarnAllocator(
         amClient.removeContainerRequest(stale)
       }
       val cancelledContainers = staleRequests.size
-      logInfo(s"Canceled $cancelledContainers container requests (locality no longer needed)")
+      if (cancelledContainers > 0) {
+        logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
+      }
 
       // consider the number of new containers and cancelled stale containers available
       val availableContainers = missing + cancelledContainers
@@ -344,14 +347,24 @@ private[yarn] class YarnAllocator(
         anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
           amClient.removeContainerRequest(nonLocal)
         }
-        logInfo(s"Canceled $numToCancel container requests for any host to resubmit with locality")
+        if (numToCancel > 0) {
+          logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality")
+        }
       }
 
       newLocalityRequests.foreach { request =>
         amClient.addContainerRequest(request)
-        logInfo(s"Submitted container request (host: ${hostStr(request)}, capability: $resource)")
       }
 
+      if (log.isInfoEnabled()) {
+        val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
+        if (anyHost.nonEmpty) {
+          logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
+        }
+        localized.foreach { request =>
+          logInfo(s"Submitted container request for host ${hostStr(request)}.")
+        }
+      }
     } else if (numPendingAllocate > 0 && missing < 0) {
       val numToCancel = math.min(numPendingAllocate, -missing)
       logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
@@ -479,7 +492,7 @@ private[yarn] class YarnAllocator(
       val containerId = container.getId
       val executorId = executorIdCounter.toString
       assert(container.getResource.getMemory >= resource.getMemory)
-      logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+      logInfo(s"Launching container $containerId on host $executorHostname")
 
       def updateInternalState(): Unit = synchronized {
         numExecutorsRunning += 1
@@ -494,14 +507,11 @@ private[yarn] class YarnAllocator(
       }
 
       if (launchContainers) {
-        logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: %s".format(
-          driverUrl, executorHostname))
-
         launcherPool.execute(new Runnable {
           override def run(): Unit = {
             try {
               new ExecutorRunnable(
-                container,
+                Some(container),
                 conf,
                 sparkConf,
                 driverUrl,
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 207dbf56d3..696e552c35 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -19,12 +19,12 @@ package org.apache.spark.deploy.yarn
 
 import java.util.{Arrays, List => JList}
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic
 import org.apache.hadoop.net.DNSToSwitchMapping
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfterEach, Matchers}
 
@@ -49,7 +49,7 @@ class MockResolver extends DNSToSwitchMapping {
 }
 
 class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
-  val conf = new Configuration()
+  val conf = new YarnConfiguration()
   conf.setClass(
     CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
     classOf[MockResolver], classOf[DNSToSwitchMapping])
-- 
GitLab