From 2022193412e832393a29b94609841c3ffe8e3d66 Mon Sep 17 00:00:00 2001 From: Timothy Chen <tnachen@gmail.com> Date: Fri, 1 May 2015 18:36:42 -0700 Subject: [PATCH] [SPARK-7216] [MESOS] Add driver details page to Mesos cluster UI. Add a details page that displays Mesos driver in the Mesos cluster UI Author: Timothy Chen <tnachen@gmail.com> Closes #5763 from tnachen/mesos_cluster_page and squashes the following commits: 55f36eb [Timothy Chen] Add driver details page to Mesos cluster UI. --- .../spark/deploy/mesos/ui/DriverPage.scala | 180 ++++++++++++++++++ .../deploy/mesos/ui/MesosClusterPage.scala | 9 +- .../deploy/mesos/ui/MesosClusterUI.scala | 1 + .../deploy/rest/mesos/MesosRestServer.scala | 6 +- .../cluster/mesos/MesosClusterScheduler.scala | 33 +++- .../cluster/mesos/MesosSchedulerBackend.scala | 4 +- 6 files changed, 222 insertions(+), 11 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala new file mode 100644 index 0000000000..be8560d10f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos.ui + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.deploy.Command +import org.apache.spark.deploy.mesos.MesosDriverDescription +import org.apache.spark.scheduler.cluster.mesos.{MesosClusterSubmissionState, MesosClusterRetryState} +import org.apache.spark.ui.{UIUtils, WebUIPage} + + +private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") { + + override def render(request: HttpServletRequest): Seq[Node] = { + val driverId = request.getParameter("id") + require(driverId != null && driverId.nonEmpty, "Missing id parameter") + + val state = parent.scheduler.getDriverState(driverId) + if (state.isEmpty) { + val content = + <div> + <p>Cannot find driver {driverId}</p> + </div> + return UIUtils.basicSparkPage(content, s"Details for Job $driverId") + } + val driverState = state.get + val driverHeaders = Seq("Driver property", "Value") + val schedulerHeaders = Seq("Scheduler property", "Value") + val commandEnvHeaders = Seq("Command environment variable", "Value") + val launchedHeaders = Seq("Launched property", "Value") + val commandHeaders = Seq("Comamnd property", "Value") + val retryHeaders = Seq("Last failed status", "Next retry time", "Retry count") + val driverDescription = Iterable.apply(driverState.description) + val submissionState = Iterable.apply(driverState.submissionState) + val command = Iterable.apply(driverState.description.command) + val schedulerProperties = Iterable.apply(driverState.description.schedulerProperties) + val commandEnv = Iterable.apply(driverState.description.command.environment) + val driverTable = + UIUtils.listingTable(driverHeaders, driverRow, driverDescription) + val commandTable = + UIUtils.listingTable(commandHeaders, commandRow, command) + val commandEnvTable = + UIUtils.listingTable(commandEnvHeaders, propertiesRow, commandEnv) + val schedulerTable = + UIUtils.listingTable(schedulerHeaders, propertiesRow, schedulerProperties) + val launchedTable = + UIUtils.listingTable(launchedHeaders, launchedRow, submissionState) + val retryTable = + UIUtils.listingTable( + retryHeaders, retryRow, Iterable.apply(driverState.description.retryState)) + val content = + <p>Driver state information for driver id {driverId}</p> + <a href="/">Back to Drivers</a> + <div class="row-fluid"> + <div class="span12"> + <h4>Driver state: {driverState.state}</h4> + <h4>Driver properties</h4> + {driverTable} + <h4>Driver command</h4> + {commandTable} + <h4>Driver command environment</h4> + {commandEnvTable} + <h4>Scheduler properties</h4> + {schedulerTable} + <h4>Launched state</h4> + {launchedTable} + <h4>Retry state</h4> + {retryTable} + </div> + </div>; + + UIUtils.basicSparkPage(content, s"Details for Job $driverId") + } + + private def launchedRow(submissionState: Option[MesosClusterSubmissionState]): Seq[Node] = { + submissionState.map { state => + <tr> + <td>Mesos Slave ID</td> + <td>{state.slaveId.getValue}</td> + </tr> + <tr> + <td>Mesos Task ID</td> + <td>{state.taskId.getValue}</td> + </tr> + <tr> + <td>Launch Time</td> + <td>{state.startDate}</td> + </tr> + <tr> + <td>Finish Time</td> + <td>{state.finishDate.map(_.toString).getOrElse("")}</td> + </tr> + <tr> + <td>Last Task Status</td> + <td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td> + </tr> + }.getOrElse(Seq[Node]()) + } + + private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = { + properties.map { case (k, v) => + <tr> + <td>{k}</td><td>{v}</td> + </tr> + }.toSeq + } + + private def commandRow(command: Command): Seq[Node] = { + <tr> + <td>Main class</td><td>{command.mainClass}</td> + </tr> + <tr> + <td>Arguments</td><td>{command.arguments.mkString(" ")}</td> + </tr> + <tr> + <td>Class path entries</td><td>{command.classPathEntries.mkString(" ")}</td> + </tr> + <tr> + <td>Java options</td><td>{command.javaOpts.mkString((" "))}</td> + </tr> + <tr> + <td>Library path entries</td><td>{command.libraryPathEntries.mkString((" "))}</td> + </tr> + } + + private def driverRow(driver: MesosDriverDescription): Seq[Node] = { + <tr> + <td>Name</td><td>{driver.name}</td> + </tr> + <tr> + <td>Id</td><td>{driver.submissionId}</td> + </tr> + <tr> + <td>Cores</td><td>{driver.cores}</td> + </tr> + <tr> + <td>Memory</td><td>{driver.mem}</td> + </tr> + <tr> + <td>Submitted</td><td>{driver.submissionDate}</td> + </tr> + <tr> + <td>Supervise</td><td>{driver.supervise}</td> + </tr> + } + + private def retryRow(retryState: Option[MesosClusterRetryState]): Seq[Node] = { + retryState.map { state => + <tr> + <td> + {state.lastFailureStatus} + </td> + <td> + {state.nextRetry} + </td> + <td> + {state.retries} + </td> + </tr> + }.getOrElse(Seq[Node]()) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala index 7b2005e0f1..7419fa9699 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala @@ -56,8 +56,9 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage( } private def queuedRow(submission: MesosDriverDescription): Seq[Node] = { + val id = submission.submissionId <tr> - <td>{submission.submissionId}</td> + <td><a href={s"driver?id=$id"}>{id}</a></td> <td>{submission.submissionDate}</td> <td>{submission.command.mainClass}</td> <td>cpus: {submission.cores}, mem: {submission.mem}</td> @@ -65,8 +66,9 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage( } private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = { + val id = state.driverDescription.submissionId <tr> - <td>{state.driverDescription.submissionId}</td> + <td><a href={s"driver?id=$id"}>{id}</a></td> <td>{state.driverDescription.submissionDate}</td> <td>{state.driverDescription.command.mainClass}</td> <td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td> @@ -77,8 +79,9 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage( } private def retryRow(submission: MesosDriverDescription): Seq[Node] = { + val id = submission.submissionId <tr> - <td>{submission.submissionId}</td> + <td><a href={s"driver?id=$id"}>{id}</a></td> <td>{submission.submissionDate}</td> <td>{submission.command.mainClass}</td> <td>{submission.retryState.get.lastFailureStatus}</td> diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala index 4865d46dbc..3f693545a0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala @@ -39,6 +39,7 @@ private[spark] class MesosClusterUI( override def initialize() { attachPage(new MesosClusterPage(this)) + attachPage(new DriverPage(this)) attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, "/static")) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index fd17a980c9..8198296eeb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -53,7 +53,7 @@ private[spark] class MesosRestServer( new MesosStatusRequestServlet(scheduler, masterConf) } -private[deploy] class MesosSubmitRequestServlet( +private[mesos] class MesosSubmitRequestServlet( scheduler: MesosClusterScheduler, conf: SparkConf) extends SubmitRequestServlet { @@ -139,7 +139,7 @@ private[deploy] class MesosSubmitRequestServlet( } } -private[deploy] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf) +private[mesos] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf) extends KillRequestServlet { protected override def handleKill(submissionId: String): KillSubmissionResponse = { val k = scheduler.killDriver(submissionId) @@ -148,7 +148,7 @@ private[deploy] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, } } -private[deploy] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf) +private[mesos] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf) extends StatusRequestServlet { protected override def handleStatus(submissionId: String): SubmissionStatusResponse = { val d = scheduler.getDriverStatus(submissionId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 0396e62be5..06f0e2881c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -50,12 +50,13 @@ private[spark] class MesosClusterSubmissionState( val taskId: TaskID, val slaveId: SlaveID, var mesosTaskStatus: Option[TaskStatus], - var startDate: Date) + var startDate: Date, + var finishDate: Option[Date]) extends Serializable { def copy(): MesosClusterSubmissionState = { new MesosClusterSubmissionState( - driverDescription, taskId, slaveId, mesosTaskStatus, startDate) + driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate) } } @@ -95,6 +96,14 @@ private[spark] class MesosClusterSchedulerState( val finishedDrivers: Iterable[MesosClusterSubmissionState], val pendingRetryDrivers: Iterable[MesosDriverDescription]) +/** + * The full state of a Mesos driver, that is being used to display driver information on the UI. + */ +private[spark] class MesosDriverState( + val state: String, + val description: MesosDriverDescription, + val submissionState: Option[MesosClusterSubmissionState] = None) + /** * A Mesos scheduler that is responsible for launching submitted Spark drivers in cluster mode * as Mesos tasks in a Mesos cluster. @@ -233,6 +242,22 @@ private[spark] class MesosClusterScheduler( s } + /** + * Gets the driver state to be displayed on the Web UI. + */ + def getDriverState(submissionId: String): Option[MesosDriverState] = { + stateLock.synchronized { + queuedDrivers.find(_.submissionId.equals(submissionId)) + .map(d => new MesosDriverState("QUEUED", d)) + .orElse(launchedDrivers.get(submissionId) + .map(d => new MesosDriverState("RUNNING", d.driverDescription, Some(d)))) + .orElse(finishedDrivers.find(_.driverDescription.submissionId.equals(submissionId)) + .map(d => new MesosDriverState("FINISHED", d.driverDescription, Some(d)))) + .orElse(pendingRetryDrivers.find(_.submissionId.equals(submissionId)) + .map(d => new MesosDriverState("RETRYING", d))) + } + } + private def isQueueFull(): Boolean = launchedDrivers.size >= queuedCapacity /** @@ -439,7 +464,7 @@ private[spark] class MesosClusterScheduler( logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " + submission.submissionId) val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId, - None, new Date()) + None, new Date(), None) launchedDrivers(submission.submissionId) = newState launchedDriversState.persist(submission.submissionId, newState) afterLaunchCallback(submission.submissionId) @@ -534,6 +559,7 @@ private[spark] class MesosClusterScheduler( // Check if the driver is supervise enabled and can be relaunched. if (state.driverDescription.supervise && shouldRelaunch(status.getState)) { removeFromLaunchedDrivers(taskId) + state.finishDate = Some(new Date()) val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState val (retries, waitTimeSec) = retryState .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) } @@ -546,6 +572,7 @@ private[spark] class MesosClusterScheduler( pendingRetryDriversState.persist(taskId, newDriverDescription) } else if (TaskState.isFinished(TaskState.fromMesos(status.getState))) { removeFromLaunchedDrivers(taskId) + state.finishDate = Some(new Date()) if (finishedDrivers.size >= retainedDrivers) { val toRemove = math.max(retainedDrivers / 10, 1) finishedDrivers.trimStart(toRemove) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 8346a24074..86a7d0fb58 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -23,7 +23,7 @@ import java.util.{ArrayList => JArrayList, Collections, List => JList} import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, HashSet} -import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} +import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _} import org.apache.mesos.protobuf.ByteString import org.apache.mesos.{Scheduler => MScheduler, _} import org.apache.spark.executor.MesosExecutorBackend @@ -56,7 +56,7 @@ private[spark] class MesosSchedulerBackend( // The listener bus to publish executor added/removed events. val listenerBus = sc.listenerBus - + private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1) @volatile var appId: String = _ -- GitLab