Skip to content
Snippets Groups Projects
Commit 2f1736c3 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #725 from karenfeng/task-start

Creates task start events
parents 5364f645 abc78cd3
No related branches found
No related tags found
No related merge requests found
......@@ -52,6 +52,11 @@ class DAGScheduler(
}
taskSched.setListener(this)
// Called by TaskScheduler to report task's starting.
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventQueue.put(BeginEvent(task, taskInfo))
}
// Called by TaskScheduler to report task completions or failures.
override def taskEnded(
task: Task[_],
......@@ -343,6 +348,9 @@ class DAGScheduler(
case ExecutorLost(execId) =>
handleExecutorLost(execId)
case begin: BeginEvent =>
sparkListeners.foreach(_.onTaskStart(SparkListenerTaskStart(begin.task, begin.taskInfo)))
case completion: CompletionEvent =>
sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task,
completion.reason, completion.taskInfo, completion.taskMetrics)))
......
......@@ -43,6 +43,8 @@ private[spark] case class JobSubmitted(
properties: Properties = null)
extends DAGSchedulerEvent
private[spark] case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
private[spark] case class CompletionEvent(
task: Task[_],
reason: TaskEndReason,
......
......@@ -68,6 +68,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
processStageCompletedEvent(stageInfo)
case SparkListenerJobEnd(job, result) =>
processJobEndEvent(job, result)
case SparkListenerTaskStart(task, taskInfo) =>
processTaskStartEvent(task, taskInfo)
case SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics) =>
processTaskEndEvent(task, reason, taskInfo, taskMetrics)
case _ =>
......@@ -252,7 +254,19 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
stageInfo.stage.id + " STATUS=COMPLETED")
}
override def onTaskStart(taskStart: SparkListenerTaskStart) {
eventQueue.put(taskStart)
}
protected def processTaskStartEvent(task: Task[_], taskInfo: TaskInfo) {
var taskStatus = ""
task match {
case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
}
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
eventQueue.put(taskEnd)
}
......
......@@ -29,6 +29,8 @@ case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends Spar
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
taskMetrics: TaskMetrics) extends SparkListenerEvents
......@@ -48,7 +50,12 @@ trait SparkListener {
* Called when a stage is submitted
*/
def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
/**
* Called when a task starts
*/
def onTaskStart(taskEnd: SparkListenerTaskStart) { }
/**
* Called when a task ends
*/
......
......@@ -27,6 +27,9 @@ import spark.executor.TaskMetrics
* Interface for getting events back from the TaskScheduler.
*/
private[spark] trait TaskSchedulerListener {
// A task has started.
def taskStarted(task: Task[_], taskInfo: TaskInfo)
// A task has finished or failed.
def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any],
taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
......
......@@ -496,6 +496,8 @@ private[spark] class ClusterTaskSetManager(
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
if (taskAttempts(index).size == 1)
taskStarted(task,info)
return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
}
case _ =>
......@@ -518,6 +520,10 @@ private[spark] class ClusterTaskSetManager(
}
}
def taskStarted(task: Task[_], info: TaskInfo) {
sched.listener.taskStarted(task, info)
}
def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
if (info.failed) {
......
......@@ -117,6 +117,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val taskName = "task %s:%d".format(taskSet.id, index)
copiesRunning(index) += 1
increaseRunningTasks(1)
taskStarted(task, info)
return Some(new TaskDescription(taskId, null, taskName, bytes))
case None => {}
}
......@@ -146,6 +147,10 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
}
def taskStarted(task: Task[_], info: TaskInfo) {
sched.listener.taskStarted(task, info)
}
def taskEnded(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
val index = info.index
......
......@@ -5,7 +5,7 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.Handler
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Properties
import spark.{ExceptionFailure, Logging, SparkContext, Success, Utils}
......@@ -18,7 +18,6 @@ import spark.ui.JettyUtils._
import spark.ui.Page.Executors
import spark.ui.UIUtils.headerSparkPage
import spark.ui.UIUtils
import spark.Utils
import scala.xml.{Node, XML}
......@@ -45,7 +44,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
.reduceOption(_+_).getOrElse(0L)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
"Failed tasks", "Complete tasks", "Total tasks")
"Active tasks", "Failed tasks", "Complete tasks", "Total tasks")
def execRow(kv: Seq[String]) =
<tr>
<td>{kv(0)}</td>
......@@ -60,6 +59,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<td>{kv(6)}</td>
<td>{kv(7)}</td>
<td>{kv(8)}</td>
<td>{kv(9)}</td>
</tr>
val execInfo =
for (b <- 0 until storageStatusList.size)
......@@ -93,6 +93,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
val activeTasks = listener.executorToTasksActive.getOrElse(a.toString, Seq[Long]()).size.toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
......@@ -104,6 +105,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
memUsed,
maxMem,
diskUsed,
activeTasks,
failedTasks,
completedTasks,
totalTasks
......@@ -111,13 +113,24 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}
private[spark] class ExecutorsListener extends SparkListener with Logging {
val executorToTasksActive = HashMap[String, HashSet[Long]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToTaskInfos =
HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
if (!executorToTasksActive.contains(eid))
executorToTasksActive(eid) = HashSet[Long]()
executorToTasksActive(eid) += taskStart.taskInfo.taskId
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
if (!executorToTasksActive.contains(eid))
executorToTasksActive(eid) = HashSet[Long]()
executorToTasksActive(eid) -= taskStart.taskInfo.taskId
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment