Skip to content
Snippets Groups Projects
Commit 0bd00ff2 authored by Marcelo Vanzin's avatar Marcelo Vanzin
Browse files

[SPARK-15891][YARN] Clean up some logging in the YARN AM.

To make the log file more readable, rework some of the logging done
by the AM:

- log executor command / env just once, since they're all almost the same;
  the information that changes, such as executor ID, is already available
  in other log messages.
- avoid printing logs when nothing happens, especially when updating the
  container requests in the allocator.
- print fewer log messages when requesting many unlocalized executors,
  instead of repeating the same message multiple times.
- removed some logs that seemed unnecessary.

In the process, I slightly fixed up the wording in a few log messages, and
did some minor clean up of method arguments that were redundant.

Tested by running existing unit tests, and analyzing the logs of an
application that exercises dynamic allocation by forcing executors
to be allocated and be killed in waves.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #14943 from vanzin/SPARK-15891.
parent 4f769b90
No related branches found
No related tags found
No related merge requests found
......@@ -173,7 +173,6 @@ private[spark] class ApplicationMaster(
sys.props.remove(e.key)
}
logInfo("Prepared Local resources " + resources)
resources.toMap
}
......@@ -329,7 +328,7 @@ private[spark] class ApplicationMaster(
val appId = client.getAttemptId().getApplicationId().toString()
val attemptId = client.getAttemptId().getAttemptId().toString()
val historyAddress =
sparkConf.get(HISTORY_SERVER_ADDRESS)
_sparkConf.get(HISTORY_SERVER_ADDRESS)
.map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) }
.map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
.getOrElse("")
......@@ -338,6 +337,18 @@ private[spark] class ApplicationMaster(
_sparkConf.get("spark.driver.host"),
_sparkConf.get("spark.driver.port").toInt,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
// Before we initialize the allocator, let's log the information about how executors will
// be run up front, to avoid printing this out for every single executor being launched.
// Use placeholders for information that changes such as executor IDs.
logInfo {
val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
dummyRunner.launchContextDebugInfo()
}
allocator = client.register(driverUrl,
driverRef,
yarnConf,
......
......@@ -24,7 +24,6 @@ import java.util.Collections
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
......@@ -45,11 +44,11 @@ import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils
private[yarn] class ExecutorRunnable(
container: Container,
conf: Configuration,
container: Option[Container],
conf: YarnConfiguration,
sparkConf: SparkConf,
masterAddress: String,
slaveId: String,
executorId: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
......@@ -59,43 +58,46 @@ private[yarn] class ExecutorRunnable(
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
lazy val env = prepareEnvironment(container)
def run(): Unit = {
logInfo("Starting Executor Container")
logDebug("Starting Executor Container")
nmClient = NMClient.createNMClient()
nmClient.init(yarnConf)
nmClient.init(conf)
nmClient.start()
startContainer()
}
def startContainer(): java.util.Map[String, ByteBuffer] = {
logInfo("Setting up ContainerLaunchContext")
def launchContextDebugInfo(): String = {
val commands = prepareCommand()
val env = prepareEnvironment()
s"""
|===============================================================================
|YARN executor launch context:
| env:
|${env.map { case (k, v) => s" $k -> $v\n" }.mkString}
| command:
| ${commands.mkString(" \\ \n ")}
|
| resources:
|${localResources.map { case (k, v) => s" $k -> $v\n" }.mkString}
|===============================================================================""".stripMargin
}
def startContainer(): java.util.Map[String, ByteBuffer] = {
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
.asInstanceOf[ContainerLaunchContext]
val env = prepareEnvironment().asJava
ctx.setLocalResources(localResources.asJava)
ctx.setEnvironment(env.asJava)
ctx.setEnvironment(env)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
appId)
logInfo(s"""
|===============================================================================
|YARN executor launch context:
| env:
|${env.map { case (k, v) => s" $k -> $v\n" }.mkString}
| command:
| ${commands.mkString(" ")}
|===============================================================================
""".stripMargin)
val commands = prepareCommand()
ctx.setCommands(commands.asJava)
ctx.setApplicationACLs(
......@@ -119,21 +121,15 @@ private[yarn] class ExecutorRunnable(
// Send the start request to the ContainerManager
try {
nmClient.startContainer(container, ctx)
nmClient.startContainer(container.get, ctx)
} catch {
case ex: Exception =>
throw new SparkException(s"Exception while starting container ${container.getId}" +
throw new SparkException(s"Exception while starting container ${container.get.getId}" +
s" on host $hostname", ex)
}
}
private def prepareCommand(
masterAddress: String,
slaveId: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
appId: String): List[String] = {
private def prepareCommand(): List[String] = {
// Extra options for the JVM
val javaOpts = ListBuffer[String]()
......@@ -216,23 +212,23 @@ private[yarn] class ExecutorRunnable(
"-server") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
"--driver-url", masterAddress.toString,
"--executor-id", slaveId.toString,
"--hostname", hostname.toString,
"--driver-url", masterAddress,
"--executor-id", executorId,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId) ++
userClassPath ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
// TODO: it would be nicer to just make sure there are no null commands here
commands.map(s => if (s == null) "null" else s).toList
}
private def prepareEnvironment(container: Container): HashMap[String, String] = {
private def prepareEnvironment(): HashMap[String, String] = {
val env = new HashMap[String, String]()
Client.populateClasspath(null, yarnConf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
sparkConf.getExecutorEnv.foreach { case (key, value) =>
// This assumes each executor environment variable set here is a path
......@@ -246,20 +242,22 @@ private[yarn] class ExecutorRunnable(
}
// lookup appropriate http scheme for container log urls
val yarnHttpPolicy = yarnConf.get(
val yarnHttpPolicy = conf.get(
YarnConfiguration.YARN_HTTP_POLICY_KEY,
YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
)
val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
// Add log urls
sys.env.get("SPARK_USER").foreach { user =>
val containerId = ConverterUtils.toString(container.getId)
val address = container.getNodeHttpAddress
val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user"
env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096"
env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096"
container.foreach { c =>
sys.env.get("SPARK_USER").foreach { user =>
val containerId = ConverterUtils.toString(c.getId)
val address = c.getNodeHttpAddress
val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user"
env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096"
env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096"
}
}
System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
......
......@@ -26,10 +26,10 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.log4j.{Level, Logger}
......@@ -60,7 +60,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: Configuration,
conf: YarnConfiguration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
......@@ -297,8 +297,9 @@ private[yarn] class YarnAllocator(
val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
if (missing > 0) {
logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
logInfo(s"Will request $missing executor container(s), each with " +
s"${resource.getVirtualCores} core(s) and " +
s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")
// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
......@@ -314,7 +315,9 @@ private[yarn] class YarnAllocator(
amClient.removeContainerRequest(stale)
}
val cancelledContainers = staleRequests.size
logInfo(s"Canceled $cancelledContainers container requests (locality no longer needed)")
if (cancelledContainers > 0) {
logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
}
// consider the number of new containers and cancelled stale containers available
val availableContainers = missing + cancelledContainers
......@@ -344,14 +347,24 @@ private[yarn] class YarnAllocator(
anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
amClient.removeContainerRequest(nonLocal)
}
logInfo(s"Canceled $numToCancel container requests for any host to resubmit with locality")
if (numToCancel > 0) {
logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality")
}
}
newLocalityRequests.foreach { request =>
amClient.addContainerRequest(request)
logInfo(s"Submitted container request (host: ${hostStr(request)}, capability: $resource)")
}
if (log.isInfoEnabled()) {
val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
if (anyHost.nonEmpty) {
logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
}
localized.foreach { request =>
logInfo(s"Submitted container request for host ${hostStr(request)}.")
}
}
} else if (numPendingAllocate > 0 && missing < 0) {
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
......@@ -479,7 +492,7 @@ private[yarn] class YarnAllocator(
val containerId = container.getId
val executorId = executorIdCounter.toString
assert(container.getResource.getMemory >= resource.getMemory)
logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
logInfo(s"Launching container $containerId on host $executorHostname")
def updateInternalState(): Unit = synchronized {
numExecutorsRunning += 1
......@@ -494,14 +507,11 @@ private[yarn] class YarnAllocator(
}
if (launchContainers) {
logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
driverUrl, executorHostname))
launcherPool.execute(new Runnable {
override def run(): Unit = {
try {
new ExecutorRunnable(
container,
Some(container),
conf,
sparkConf,
driverUrl,
......
......@@ -19,12 +19,12 @@ package org.apache.spark.deploy.yarn
import java.util.{Arrays, List => JList}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
import org.apache.hadoop.net.DNSToSwitchMapping
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterEach, Matchers}
......@@ -49,7 +49,7 @@ class MockResolver extends DNSToSwitchMapping {
}
class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
val conf = new Configuration()
val conf = new YarnConfiguration()
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
classOf[MockResolver], classOf[DNSToSwitchMapping])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment