diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index a4b575c85d5fbaa1628821a189a0dad4456f360a..ad50ea789a913fc2555181efe1af6a27bbb63529 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -173,7 +173,6 @@ private[spark] class ApplicationMaster( sys.props.remove(e.key) } - logInfo("Prepared Local resources " + resources) resources.toMap } @@ -329,7 +328,7 @@ private[spark] class ApplicationMaster( val appId = client.getAttemptId().getApplicationId().toString() val attemptId = client.getAttemptId().getAttemptId().toString() val historyAddress = - sparkConf.get(HISTORY_SERVER_ADDRESS) + _sparkConf.get(HISTORY_SERVER_ADDRESS) .map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) } .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" } .getOrElse("") @@ -338,6 +337,18 @@ private[spark] class ApplicationMaster( _sparkConf.get("spark.driver.host"), _sparkConf.get("spark.driver.port").toInt, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + // Before we initialize the allocator, let's log the information about how executors will + // be run up front, to avoid printing this out for every single executor being launched. + // Use placeholders for information that changes such as executor IDs. + logInfo { + val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt + val executorCores = sparkConf.get(EXECUTOR_CORES) + val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>", + "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources) + dummyRunner.launchContextDebugInfo() + } + allocator = client.register(driverUrl, driverRef, yarnConf, diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 3d0e996b187207294d303d12a7ec220a92b6a131..8e0533f39ae53e87978ce40016860769c15e50d8 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -24,7 +24,6 @@ import java.util.Collections import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, ListBuffer} -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.UserGroupInformation @@ -45,11 +44,11 @@ import org.apache.spark.network.util.JavaUtils import org.apache.spark.util.Utils private[yarn] class ExecutorRunnable( - container: Container, - conf: Configuration, + container: Option[Container], + conf: YarnConfiguration, sparkConf: SparkConf, masterAddress: String, - slaveId: String, + executorId: String, hostname: String, executorMemory: Int, executorCores: Int, @@ -59,43 +58,46 @@ private[yarn] class ExecutorRunnable( var rpc: YarnRPC = YarnRPC.create(conf) var nmClient: NMClient = _ - val yarnConf: YarnConfiguration = new YarnConfiguration(conf) - lazy val env = prepareEnvironment(container) def run(): Unit = { - logInfo("Starting Executor Container") + logDebug("Starting Executor Container") nmClient = NMClient.createNMClient() - nmClient.init(yarnConf) + nmClient.init(conf) nmClient.start() startContainer() } - def startContainer(): java.util.Map[String, ByteBuffer] = { - logInfo("Setting up ContainerLaunchContext") + def launchContextDebugInfo(): String = { + val commands = prepareCommand() + val env = prepareEnvironment() + + s""" + |=============================================================================== + |YARN executor launch context: + | env: + |${env.map { case (k, v) => s" $k -> $v\n" }.mkString} + | command: + | ${commands.mkString(" \\ \n ")} + | + | resources: + |${localResources.map { case (k, v) => s" $k -> $v\n" }.mkString} + |===============================================================================""".stripMargin + } + def startContainer(): java.util.Map[String, ByteBuffer] = { val ctx = Records.newRecord(classOf[ContainerLaunchContext]) .asInstanceOf[ContainerLaunchContext] + val env = prepareEnvironment().asJava ctx.setLocalResources(localResources.asJava) - ctx.setEnvironment(env.asJava) + ctx.setEnvironment(env) val credentials = UserGroupInformation.getCurrentUser().getCredentials() val dob = new DataOutputBuffer() credentials.writeTokenStorageToStream(dob) ctx.setTokens(ByteBuffer.wrap(dob.getData())) - val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores, - appId) - - logInfo(s""" - |=============================================================================== - |YARN executor launch context: - | env: - |${env.map { case (k, v) => s" $k -> $v\n" }.mkString} - | command: - | ${commands.mkString(" ")} - |=============================================================================== - """.stripMargin) + val commands = prepareCommand() ctx.setCommands(commands.asJava) ctx.setApplicationACLs( @@ -119,21 +121,15 @@ private[yarn] class ExecutorRunnable( // Send the start request to the ContainerManager try { - nmClient.startContainer(container, ctx) + nmClient.startContainer(container.get, ctx) } catch { case ex: Exception => - throw new SparkException(s"Exception while starting container ${container.getId}" + + throw new SparkException(s"Exception while starting container ${container.get.getId}" + s" on host $hostname", ex) } } - private def prepareCommand( - masterAddress: String, - slaveId: String, - hostname: String, - executorMemory: Int, - executorCores: Int, - appId: String): List[String] = { + private def prepareCommand(): List[String] = { // Extra options for the JVM val javaOpts = ListBuffer[String]() @@ -216,23 +212,23 @@ private[yarn] class ExecutorRunnable( "-server") ++ javaOpts ++ Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", - "--driver-url", masterAddress.toString, - "--executor-id", slaveId.toString, - "--hostname", hostname.toString, + "--driver-url", masterAddress, + "--executor-id", executorId, + "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId) ++ userClassPath ++ Seq( - "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", - "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") + s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", + s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr") // TODO: it would be nicer to just make sure there are no null commands here commands.map(s => if (s == null) "null" else s).toList } - private def prepareEnvironment(container: Container): HashMap[String, String] = { + private def prepareEnvironment(): HashMap[String, String] = { val env = new HashMap[String, String]() - Client.populateClasspath(null, yarnConf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH)) + Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH)) sparkConf.getExecutorEnv.foreach { case (key, value) => // This assumes each executor environment variable set here is a path @@ -246,20 +242,22 @@ private[yarn] class ExecutorRunnable( } // lookup appropriate http scheme for container log urls - val yarnHttpPolicy = yarnConf.get( + val yarnHttpPolicy = conf.get( YarnConfiguration.YARN_HTTP_POLICY_KEY, YarnConfiguration.YARN_HTTP_POLICY_DEFAULT ) val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://" // Add log urls - sys.env.get("SPARK_USER").foreach { user => - val containerId = ConverterUtils.toString(container.getId) - val address = container.getNodeHttpAddress - val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user" - - env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096" - env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096" + container.foreach { c => + sys.env.get("SPARK_USER").foreach { user => + val containerId = ConverterUtils.toString(c.getId) + val address = c.getNodeHttpAddress + val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user" + + env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096" + env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096" + } } System.getenv().asScala.filterKeys(_.startsWith("SPARK")) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index dbdac3369b9050721f2aabb132726da86b831eb5..0daf1ea0bc8b42584fa94be16d63890bf044ea42 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -26,10 +26,10 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import scala.collection.JavaConverters._ import scala.util.control.NonFatal -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.RackResolver import org.apache.log4j.{Level, Logger} @@ -60,7 +60,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} private[yarn] class YarnAllocator( driverUrl: String, driverRef: RpcEndpointRef, - conf: Configuration, + conf: YarnConfiguration, sparkConf: SparkConf, amClient: AMRMClient[ContainerRequest], appAttemptId: ApplicationAttemptId, @@ -297,8 +297,9 @@ private[yarn] class YarnAllocator( val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning if (missing > 0) { - logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " + - s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead") + logInfo(s"Will request $missing executor container(s), each with " + + s"${resource.getVirtualCores} core(s) and " + + s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)") // Split the pending container request into three groups: locality matched list, locality // unmatched list and non-locality list. Take the locality matched container request into @@ -314,7 +315,9 @@ private[yarn] class YarnAllocator( amClient.removeContainerRequest(stale) } val cancelledContainers = staleRequests.size - logInfo(s"Canceled $cancelledContainers container requests (locality no longer needed)") + if (cancelledContainers > 0) { + logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)") + } // consider the number of new containers and cancelled stale containers available val availableContainers = missing + cancelledContainers @@ -344,14 +347,24 @@ private[yarn] class YarnAllocator( anyHostRequests.slice(0, numToCancel).foreach { nonLocal => amClient.removeContainerRequest(nonLocal) } - logInfo(s"Canceled $numToCancel container requests for any host to resubmit with locality") + if (numToCancel > 0) { + logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality") + } } newLocalityRequests.foreach { request => amClient.addContainerRequest(request) - logInfo(s"Submitted container request (host: ${hostStr(request)}, capability: $resource)") } + if (log.isInfoEnabled()) { + val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null) + if (anyHost.nonEmpty) { + logInfo(s"Submitted ${anyHost.size} unlocalized container requests.") + } + localized.foreach { request => + logInfo(s"Submitted container request for host ${hostStr(request)}.") + } + } } else if (numPendingAllocate > 0 && missing < 0) { val numToCancel = math.min(numPendingAllocate, -missing) logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " + @@ -479,7 +492,7 @@ private[yarn] class YarnAllocator( val containerId = container.getId val executorId = executorIdCounter.toString assert(container.getResource.getMemory >= resource.getMemory) - logInfo("Launching container %s for on host %s".format(containerId, executorHostname)) + logInfo(s"Launching container $containerId on host $executorHostname") def updateInternalState(): Unit = synchronized { numExecutorsRunning += 1 @@ -494,14 +507,11 @@ private[yarn] class YarnAllocator( } if (launchContainers) { - logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format( - driverUrl, executorHostname)) - launcherPool.execute(new Runnable { override def run(): Unit = { try { new ExecutorRunnable( - container, + Some(container), conf, sparkConf, driverUrl, diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 207dbf56d3606e0528e2543b2afc86c283865c88..696e552c35d12eb27c8e9f5b5f3c2ab74948b28a 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -19,12 +19,12 @@ package org.apache.spark.deploy.yarn import java.util.{Arrays, List => JList} -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.CommonConfigurationKeysPublic import org.apache.hadoop.net.DNSToSwitchMapping import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.conf.YarnConfiguration import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterEach, Matchers} @@ -49,7 +49,7 @@ class MockResolver extends DNSToSwitchMapping { } class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { - val conf = new Configuration() + val conf = new YarnConfiguration() conf.setClass( CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, classOf[MockResolver], classOf[DNSToSwitchMapping])