Skip to content
Snippets Groups Projects
Commit 85c4d7bf authored by Karen Feng's avatar Karen Feng
Browse files

Shows number of complete/total/failed tasks (bug: failed tasks assigned to null executor)

parent 8901f379
No related branches found
No related tags found
No related merge requests found
......@@ -67,6 +67,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
// This server must register all handlers, including JobProgressUI, before binding
// JobProgressUI registers a listener with SparkContext, which requires sc to initialize
jobs.start()
exec.start()
}
def stop() {
......
......@@ -5,15 +5,18 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.Handler
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.Properties
import spark.scheduler.cluster.TaskInfo
import spark.{ExceptionFailure, Logging, SparkContext, Success, Utils}
import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo
import spark.scheduler._
import spark.SparkContext
import spark.storage.{StorageStatus, StorageUtils}
import spark.ui.JettyUtils._
import spark.ui.UIUtils.headerSparkPage
import spark.ui.Page.Executors
import spark.storage.{StorageStatus, StorageUtils}
import spark.SparkContext
import spark.ui.UIUtils.headerSparkPage
import spark.ui.UIUtils
import spark.Utils
......@@ -21,6 +24,14 @@ import scala.xml.{Node, XML}
private[spark] class ExecutorsUI(val sc: SparkContext) {
private var _listener: Option[ExecutorsListener] = None
def listener = _listener.get
def start() {
_listener = Some(new ExecutorsListener)
sc.addSparkListener(listener)
}
def getHandlers = Seq[(String, Handler)](
("/executors", (request: HttpServletRequest) => render(request))
)
......@@ -29,13 +40,12 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val storageStatusList = sc.getExecutorStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
val memUsed = storageStatusList.map(_.memUsed()).reduce(_+_)
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
.reduceOption(_+_).getOrElse(0L)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used/Memory total",
"Disk used")
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
"Tasks: Complete/Total")
def execRow(kv: Seq[String]) =
<tr>
<td>{kv(0)}</td>
......@@ -43,6 +53,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<td>{kv(2)}</td>
<td>{kv(3)}</td>
<td>{kv(4)}</td>
<td>{kv(5)}</td>
</tr>
val execInfo =
for (b <- 0 until storageStatusList.size)
......@@ -76,12 +87,46 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val maxMem = Utils.memoryBytesToString(sc.getExecutorStorageStatus(a).maxMem)
val diskUsed = Utils.memoryBytesToString(sc.getExecutorStorageStatus(a).diskUsed())
val rddBlocks = sc.getExecutorStorageStatus(a).blocks.size.toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0)
val totalTasks = listener.executorToTaskInfos(a.toString).size
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0) match {
case f if f > 0 => " (%s failed)".format(f)
case _ => ""
}
Seq(
execId,
hostPort,
rddBlocks,
"%s/%s".format(memUsed, maxMem),
diskUsed
"%s / %s".format(memUsed, maxMem),
diskUsed,
"%s / %s".format(completedTasks, totalTasks) + failedTasks
)
}
private[spark] class ExecutorsListener extends SparkListener with Logging {
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToTaskInfos =
HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskMetrics.executorId
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
logInfo("Executor %s has %s failed tasks.".format(eid, executorToTasksFailed(eid)))
(Some(e), e.metrics)
case _ =>
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
logInfo("Executor %s has %s completed tasks.".format(eid, executorToTasksComplete(eid)))
(None, Some(taskEnd.taskMetrics))
}
val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
executorToTaskInfos(eid) = taskList
}
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment