diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala index 1534705bc5ac7311a649833d040831715cb1c0fd..7599f82a94414558f182c775a5a191a67c177ca7 100644 --- a/core/src/main/scala/spark/ui/SparkUI.scala +++ b/core/src/main/scala/spark/ui/SparkUI.scala @@ -67,6 +67,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { // This server must register all handlers, including JobProgressUI, before binding // JobProgressUI registers a listener with SparkContext, which requires sc to initialize jobs.start() + exec.start() } def stop() { diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala index 5fb75c60fd3a915c7b18d3a8be5e828c0a683706..a981c680d2201f0a2e8956e4a2ae883a30a6b0ae 100644 --- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala @@ -5,15 +5,18 @@ import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.Handler +import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.util.Properties -import spark.scheduler.cluster.TaskInfo +import spark.{ExceptionFailure, Logging, SparkContext, Success, Utils} import spark.executor.TaskMetrics +import spark.scheduler.cluster.TaskInfo +import spark.scheduler._ +import spark.SparkContext +import spark.storage.{StorageStatus, StorageUtils} import spark.ui.JettyUtils._ -import spark.ui.UIUtils.headerSparkPage import spark.ui.Page.Executors -import spark.storage.{StorageStatus, StorageUtils} -import spark.SparkContext +import spark.ui.UIUtils.headerSparkPage import spark.ui.UIUtils import spark.Utils @@ -21,6 +24,14 @@ import scala.xml.{Node, XML} private[spark] class ExecutorsUI(val sc: SparkContext) { + private var _listener: Option[ExecutorsListener] = None + def listener = _listener.get + + def start() { + _listener = Some(new ExecutorsListener) + sc.addSparkListener(listener) + } + def getHandlers = Seq[(String, Handler)]( ("/executors", (request: HttpServletRequest) => render(request)) ) @@ -29,13 +40,12 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val storageStatusList = sc.getExecutorStorageStatus val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) val memUsed = storageStatusList.map(_.memUsed()).reduce(_+_) val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) .reduceOption(_+_).getOrElse(0L) - val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used/Memory total", - "Disk used") + val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used", + "Tasks: Complete/Total") def execRow(kv: Seq[String]) = <tr> <td>{kv(0)}</td> @@ -43,6 +53,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { <td>{kv(2)}</td> <td>{kv(3)}</td> <td>{kv(4)}</td> + <td>{kv(5)}</td> </tr> val execInfo = for (b <- 0 until storageStatusList.size) @@ -76,12 +87,46 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val maxMem = Utils.memoryBytesToString(sc.getExecutorStorageStatus(a).maxMem) val diskUsed = Utils.memoryBytesToString(sc.getExecutorStorageStatus(a).diskUsed()) val rddBlocks = sc.getExecutorStorageStatus(a).blocks.size.toString + val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0) + val totalTasks = listener.executorToTaskInfos(a.toString).size + val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0) match { + case f if f > 0 => " (%s failed)".format(f) + case _ => "" + } + Seq( execId, hostPort, rddBlocks, - "%s/%s".format(memUsed, maxMem), - diskUsed + "%s / %s".format(memUsed, maxMem), + diskUsed, + "%s / %s".format(completedTasks, totalTasks) + failedTasks ) } + + private[spark] class ExecutorsListener extends SparkListener with Logging { + val executorToTasksComplete = HashMap[String, Int]() + val executorToTasksFailed = HashMap[String, Int]() + val executorToTaskInfos = + HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + val eid = taskEnd.taskMetrics.executorId + val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = + taskEnd.reason match { + case e: ExceptionFailure => + executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1 + logInfo("Executor %s has %s failed tasks.".format(eid, executorToTasksFailed(eid))) + (Some(e), e.metrics) + case _ => + executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 + logInfo("Executor %s has %s completed tasks.".format(eid, executorToTasksComplete(eid))) + (None, Some(taskEnd.taskMetrics)) + } + val taskList = executorToTaskInfos.getOrElse( + eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) + taskList += ((taskEnd.taskInfo, metrics, failureInfo)) + executorToTaskInfos(eid) = taskList + } + } } \ No newline at end of file