diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cf3820fcb6a359829e95c8a92dd94017d7cab2b6..ad78bdfde2dfb6012ac1f875a1649a1d55022490 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1991,7 +1991,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Note: this code assumes that the task scheduler has been initialized and has contacted // the cluster manager to get an application ID (in case the cluster manager provides one). listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), - startTime, sparkUser, applicationAttemptId)) + startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls)) } /** Post the application end event */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 646820520ea1b3f3c9d3943b2765436f832518a7..8801a761afae31f463a97d33cd7c647d509cf9af 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -49,4 +49,11 @@ private[spark] trait SchedulerBackend { */ def applicationAttemptId(): Option[String] = None + /** + * Get the URLs for the driver logs. These URLs are used to display the links in the UI + * Executors tab for the driver. + * @return Map containing the log names and their respective URLs + */ + def getDriverLogUrls: Option[Map[String, String]] = None + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 169d4fd3a94f0fd6940c2f744fb142fcff79ac16..863d0befbc19e3199b2b444eb73d4741df2ce01c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -110,8 +110,13 @@ case class SparkListenerExecutorMetricsUpdate( extends SparkListenerEvent @DeveloperApi -case class SparkListenerApplicationStart(appName: String, appId: Option[String], - time: Long, sparkUser: String, appAttemptId: Option[String]) extends SparkListenerEvent +case class SparkListenerApplicationStart( + appName: String, + appId: Option[String], + time: Long, + sparkUser: String, + appAttemptId: Option[String], + driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent @DeveloperApi case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 0a08b000e2d033f1808dc6f53c25ac97befd66e7..39583af14390d5c08fd4e38e7a007b4517724fa6 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -19,7 +19,7 @@ package org.apache.spark.ui.exec import scala.collection.mutable.HashMap -import org.apache.spark.ExceptionFailure +import org.apache.spark.{ExceptionFailure, SparkContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageStatus, StorageStatusListener} @@ -73,6 +73,16 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp uiData.finishReason = Some(executorRemoved.reason) } + override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { + applicationStart.driverLogs.foreach { logs => + val storageStatus = storageStatusList.find { s => + s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER || + s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER + } + storageStatus.foreach { s => executorToLogUrls(s.blockManagerId.executorId) = logs.toMap } + } + } + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized { val eid = taskStart.taskInfo.executorId executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 3f162d1f6c3ebb74bee0d0ccfc53c7048bdd5ddc..adf69a4e78e714d91750334eccd2f91d98c39268 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -196,7 +196,8 @@ private[spark] object JsonProtocol { ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~ ("Timestamp" -> applicationStart.time) ~ ("User" -> applicationStart.sparkUser) ~ - ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) + ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) ~ + ("Driver Logs" -> applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing)) } def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = { @@ -570,7 +571,8 @@ private[spark] object JsonProtocol { val time = (json \ "Timestamp").extract[Long] val sparkUser = (json \ "User").extract[String] val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String]) - SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId) + val driverLogs = Utils.jsonOption(json \ "Driver Logs").map(mapFromJson) + SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId, driverLogs) } def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index b134751366522d677de4b2ab6707eda11503b890..ffe71dfd7d2575424b0975f09262a786b09128d4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -89,9 +89,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg /** Returns the attempt ID. */ def getAttemptId(): ApplicationAttemptId = { - val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) - val containerId = ConverterUtils.toContainerId(containerIdString) - containerId.getApplicationAttemptId() + YarnSparkHadoopUtil.get.getContainerId.getApplicationAttemptId() } /** Returns the configuration for the AmIpFilter to add to the Spark UI. */ diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index ba91872107d0cc7682979643efdc39a63d9aa345..5e6531895c7baae3ca308dcfedc198e6739fb040 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -33,7 +33,8 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType} +import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority} +import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.{SecurityManager, SparkConf, SparkException} @@ -136,6 +137,10 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { tokenRenewer.foreach(_.stop()) } + private[spark] def getContainerId: ContainerId = { + val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) + ConverterUtils.toContainerId(containerIdString) + } } object YarnSparkHadoopUtil { diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index aeb218a57545534ce71abfb1bce283ee0e41e3df..1ace1a97d5156e96975304d255effb1c5c7eb3eb 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -17,10 +17,19 @@ package org.apache.spark.scheduler.cluster +import java.net.NetworkInterface + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.yarn.api.records.NodeState +import org.apache.hadoop.yarn.client.api.YarnClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + import org.apache.spark.SparkContext +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.util.IntParam +import org.apache.spark.util.{IntParam, Utils} private[spark] class YarnClusterSchedulerBackend( scheduler: TaskSchedulerImpl, @@ -53,4 +62,70 @@ private[spark] class YarnClusterSchedulerBackend( logError("Application attempt ID is not set.") super.applicationAttemptId } + + override def getDriverLogUrls: Option[Map[String, String]] = { + var yarnClientOpt: Option[YarnClient] = None + var driverLogs: Option[Map[String, String]] = None + try { + val yarnConf = new YarnConfiguration(sc.hadoopConfiguration) + val containerId = YarnSparkHadoopUtil.get.getContainerId + yarnClientOpt = Some(YarnClient.createYarnClient()) + yarnClientOpt.foreach { yarnClient => + yarnClient.init(yarnConf) + yarnClient.start() + + // For newer versions of YARN, we can find the HTTP address for a given node by getting a + // container report for a given container. But container reports came only in Hadoop 2.4, + // so we basically have to get the node reports for all nodes and find the one which runs + // this container. For that we have to compare the node's host against the current host. + // Since the host can have multiple addresses, we need to compare against all of them to + // find out if one matches. + + // Get all the addresses of this node. + val addresses = + NetworkInterface.getNetworkInterfaces.asScala + .flatMap(_.getInetAddresses.asScala) + .toSeq + + // Find a node report that matches one of the addresses + val nodeReport = + yarnClient.getNodeReports(NodeState.RUNNING).asScala.find { x => + val host = x.getNodeId.getHost + addresses.exists { address => + address.getHostAddress == host || + address.getHostName == host || + address.getCanonicalHostName == host + } + } + + // Now that we have found the report for the Node Manager that the AM is running on, we + // can get the base HTTP address for the Node manager from the report. + // The format used for the logs for each container is well-known and can be constructed + // using the NM's HTTP address and the container ID. + // The NM may be running several containers, but we can build the URL for the AM using + // the AM's container ID, which we already know. + nodeReport.foreach { report => + val httpAddress = report.getHttpAddress + // lookup appropriate http scheme for container log urls + val yarnHttpPolicy = yarnConf.get( + YarnConfiguration.YARN_HTTP_POLICY_KEY, + YarnConfiguration.YARN_HTTP_POLICY_DEFAULT + ) + val user = Utils.getCurrentUserName() + val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://" + val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user" + logDebug(s"Base URL for logs: $baseUrl") + driverLogs = Some( + Map("stderr" -> s"$baseUrl/stderr?start=0", "stdout" -> s"$baseUrl/stdout?start=0")) + } + } + } catch { + case e: Exception => + logInfo("Node Report API is not available in the version of YARN being used, so AM" + + " logs link will not appear in application UI", e) + } finally { + yarnClientOpt.foreach(_.close()) + } + driverLogs + } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index d3c606e0ed99869bbecf40c13901858da6a09ab5..dcaeb2e43ff41294da59c8566831c45e7deccde0 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConversions._ import scala.collection.mutable +import scala.io.Source import com.google.common.base.Charsets.UTF_8 import com.google.common.io.ByteStreams @@ -33,7 +34,8 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils} import org.apache.spark.scheduler.cluster.ExecutorInfo -import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListener, SparkListenerExecutorAdded} +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart, + SparkListenerExecutorAdded} import org.apache.spark.util.Utils /** @@ -290,10 +292,15 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit private[spark] class SaveExecutorInfo extends SparkListener { val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() + var driverLogs: Option[collection.Map[String, String]] = None override def onExecutorAdded(executor: SparkListenerExecutorAdded) { addedExecutorInfos(executor.executorId) = executor.executorInfo } + + override def onApplicationStart(appStart: SparkListenerApplicationStart): Unit = { + driverLogs = appStart.driverLogs + } } private object YarnClusterDriver extends Logging with Matchers { @@ -314,6 +321,7 @@ private object YarnClusterDriver extends Logging with Matchers { val sc = new SparkContext(new SparkConf() .set("spark.extraListeners", classOf[SaveExecutorInfo].getName) .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns")) + val conf = sc.getConf val status = new File(args(0)) var result = "failure" try { @@ -335,6 +343,20 @@ private object YarnClusterDriver extends Logging with Matchers { executorInfos.foreach { info => assert(info.logUrlMap.nonEmpty) } + + // If we are running in yarn-cluster mode, verify that driver logs are downloadable. + if (conf.get("spark.master") == "yarn-cluster") { + assert(listener.driverLogs.nonEmpty) + val driverLogs = listener.driverLogs.get + assert(driverLogs.size === 2) + assert(driverLogs.containsKey("stderr")) + assert(driverLogs.containsKey("stdout")) + val stderr = driverLogs("stderr") // YARN puts everything in stderr. + val lines = Source.fromURL(stderr).getLines() + // Look for a line that contains YarnClusterSchedulerBackend, since that is guaranteed in + // cluster mode. + assert(lines.exists(_.contains("YarnClusterSchedulerBackend"))) + } } }