Skip to content
Snippets Groups Projects
Commit d1636dd7 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-2297][UI] Make task attempt and speculation more explicit in UI.

New UI:

![screen shot 2014-06-26 at 1 43 52 pm](https://cloud.githubusercontent.com/assets/323388/3404643/82b9ddc6-fd73-11e3-96f9-f7592a7aee79.png)

Author: Reynold Xin <rxin@apache.org>

Closes #1236 from rxin/ui-task-attempt and squashes the following commits:

3b645dd [Reynold Xin] Expose attemptId in Stage.
c0474b1 [Reynold Xin] Beefed up unit test.
c404bdd [Reynold Xin] Fix ReplayListenerSuite.
f56be4b [Reynold Xin] Fixed JsonProtocolSuite.
e29e0f7 [Reynold Xin] Minor update.
5e4354a [Reynold Xin] [SPARK-2297][UI] Make task attempt and speculation more explicit in UI.
parent bf578dea
No related branches found
No related tags found
No related merge requests found
......@@ -75,9 +75,11 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
extends SparkListenerEvent
@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
/** An event used in the listener to shutdown the listener daemon thread. */
......
......@@ -106,6 +106,8 @@ private[spark] class Stage(
id
}
def attemptId: Int = nextAttemptId
val name = callSite.short
val details = callSite.long
......
......@@ -27,10 +27,12 @@ import org.apache.spark.annotation.DeveloperApi
class TaskInfo(
val taskId: Long,
val index: Int,
val attempt: Int,
val launchTime: Long,
val executorId: String,
val host: String,
val taskLocality: TaskLocality.TaskLocality) {
val taskLocality: TaskLocality.TaskLocality,
val speculative: Boolean) {
/**
* The time when the task started remotely getting the result. Will not be set if the
......
......@@ -335,17 +335,19 @@ private[spark] class TaskSetManager(
/**
* Dequeue a pending task for a given node and return its index and locality level.
* Only search for tasks matching the given locality constraint.
*
* @return An option containing (task index within the task set, locality, is speculative?)
*/
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
: Option[(Int, TaskLocality.Value, Boolean)] =
{
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL))
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL))
return Some((index, TaskLocality.NODE_LOCAL, false))
}
}
......@@ -354,23 +356,25 @@ private[spark] class TaskSetManager(
rack <- sched.getRackForHost(host)
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL))
return Some((index, TaskLocality.RACK_LOCAL, false))
}
}
// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY))
return Some((index, TaskLocality.ANY, false))
}
}
// Finally, if all else has failed, find a speculative task
findSpeculativeTask(execId, host, locality)
findSpeculativeTask(execId, host, locality).map { case (taskIndex, allowedLocality) =>
(taskIndex, allowedLocality, true)
}
}
/**
......@@ -391,7 +395,7 @@ private[spark] class TaskSetManager(
}
findTask(execId, host, allowedLocality) match {
case Some((index, taskLocality)) => {
case Some((index, taskLocality, speculative)) => {
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
......@@ -400,7 +404,9 @@ private[spark] class TaskSetManager(
taskSet.id, index, taskId, execId, host, taskLocality))
// Do various bookkeeping
copiesRunning(index) += 1
val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)
val attemptNum = taskAttempts(index).size
val info = new TaskInfo(
taskId, index, attemptNum + 1, curTime, execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
......
......@@ -95,8 +95,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
</div>
// scalastyle:on
val taskHeaders: Seq[String] =
Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++
Seq("Duration", "GC Time", "Result Ser Time") ++
Seq(
"Index", "ID", "Attempt", "Status", "Locality Level", "Executor",
"Launch Time", "Duration", "GC Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
{if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
......@@ -245,6 +246,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<tr>
<td>{info.index}</td>
<td>{info.taskId}</td>
<td sorttable_customkey={info.attempt.toString}>{
if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString
}</td>
<td>{info.status}</td>
<td>{info.taskLocality}</td>
<td>{info.host}</td>
......@@ -255,9 +259,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<td sorttable_customkey={gcTime.toString}>
{if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
</td>
<!--
TODO: Add this back after we add support to hide certain columns.
<td sorttable_customkey={serializationTime.toString}>
{if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
</td>
-->
{if (shuffleRead) {
<td sorttable_customkey={shuffleReadSortable}>
{shuffleReadReadable}
......
......@@ -32,6 +32,8 @@ import org.apache.spark.storage._
import org.apache.spark._
private[spark] object JsonProtocol {
// TODO: Remove this file and put JSON serialization into each individual class.
private implicit val format = DefaultFormats
/** ------------------------------------------------- *
......@@ -194,10 +196,12 @@ private[spark] object JsonProtocol {
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
("Task ID" -> taskInfo.taskId) ~
("Index" -> taskInfo.index) ~
("Attempt" -> taskInfo.attempt) ~
("Launch Time" -> taskInfo.launchTime) ~
("Executor ID" -> taskInfo.executorId) ~
("Host" -> taskInfo.host) ~
("Locality" -> taskInfo.taskLocality.toString) ~
("Speculative" -> taskInfo.speculative) ~
("Getting Result Time" -> taskInfo.gettingResultTime) ~
("Finish Time" -> taskInfo.finishTime) ~
("Failed" -> taskInfo.failed) ~
......@@ -487,16 +491,19 @@ private[spark] object JsonProtocol {
def taskInfoFromJson(json: JValue): TaskInfo = {
val taskId = (json \ "Task ID").extract[Long]
val index = (json \ "Index").extract[Int]
val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1)
val launchTime = (json \ "Launch Time").extract[Long]
val executorId = (json \ "Executor ID").extract[String]
val host = (json \ "Host").extract[String]
val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false)
val gettingResultTime = (json \ "Getting Result Time").extract[Long]
val finishTime = (json \ "Finish Time").extract[Long]
val failed = (json \ "Failed").extract[Boolean]
val serializedSize = (json \ "Serialized Size").extract[Int]
val taskInfo = new TaskInfo(taskId, index, launchTime, executorId, host, taskLocality)
val taskInfo =
new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative)
taskInfo.gettingResultTime = gettingResultTime
taskInfo.finishTime = finishTime
taskInfo.failed = failed
......
......@@ -66,7 +66,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
var task = new ShuffleMapTask(0, null, null, 0, null)
val taskType = Utils.getFormattedClassName(task)
......@@ -75,7 +75,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
.shuffleRead == 1000)
// finish a task with unknown executor-id, nothing should happen
taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL)
taskInfo =
new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL, true)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
......@@ -84,7 +85,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
......@@ -94,7 +95,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)
taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
......@@ -106,7 +107,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val conf = new SparkConf()
val listener = new JobProgressListener(conf)
val metrics = new TaskMetrics()
val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
val task = new ShuffleMapTask(0, null, null, 0, null)
val taskType = Utils.getFormattedClassName(task)
......
......@@ -35,10 +35,11 @@ class JsonProtocolSuite extends FunSuite {
val stageSubmitted =
SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 301, 401L, 501L))
val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 444L))
val taskGettingResult = SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 3000L))
val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 1, 444L, false))
val taskGettingResult =
SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
makeTaskInfo(123L, 234, 345L), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
......@@ -73,7 +74,7 @@ class JsonProtocolSuite extends FunSuite {
test("Dependent Classes") {
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 777L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000))
......@@ -269,10 +270,12 @@ class JsonProtocolSuite extends FunSuite {
private def assertEquals(info1: TaskInfo, info2: TaskInfo) {
assert(info1.taskId === info2.taskId)
assert(info1.index === info2.index)
assert(info1.attempt === info2.attempt)
assert(info1.launchTime === info2.launchTime)
assert(info1.executorId === info2.executorId)
assert(info1.host === info2.host)
assert(info1.taskLocality === info2.taskLocality)
assert(info1.speculative === info2.speculative)
assert(info1.gettingResultTime === info2.gettingResultTime)
assert(info1.finishTime === info2.finishTime)
assert(info1.failed === info2.failed)
......@@ -453,8 +456,8 @@ class JsonProtocolSuite extends FunSuite {
new StageInfo(a, "greetings", b, rddInfos, "details")
}
private def makeTaskInfo(a: Long, b: Int, c: Long) = {
new TaskInfo(a, b, c, "executor", "your kind sir", TaskLocality.NODE_LOCAL)
private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = {
new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative)
}
private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = {
......@@ -510,37 +513,60 @@ class JsonProtocolSuite extends FunSuite {
private val taskStartJsonString =
"""
{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
"Index":333,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
"Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":false,
"Serialized Size":0}}
"""
|{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
|"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
|"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,
|"Failed":false,"Serialized Size":0}}
""".stripMargin
private val taskGettingResultJsonString =
"""
{"Event":"SparkListenerTaskGettingResult","Task Info":{"Task ID":1000,"Index":
2000,"Launch Time":3000,"Executor ID":"executor","Host":"your kind sir",
"Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":false,
"Serialized Size":0}}
"""
|{"Event":"SparkListenerTaskGettingResult","Task Info":
| {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor",
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
| "Finish Time":0,"Failed":false,"Serialized Size":0
| }
|}
""".stripMargin
private val taskEndJsonString =
"""
{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
"Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":123,"Index":
234,"Launch Time":345,"Executor ID":"executor","Host":"your kind sir",
"Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":
false,"Serialized Size":0},"Task Metrics":{"Host Name":"localhost",
"Executor Deserialize Time":300,"Executor Run Time":400,"Result Size":500,
"JVM GC Time":600,"Result Serialization Time":700,"Memory Bytes Spilled":
800,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Shuffle Finish Time":
900,"Total Blocks Fetched":1500,"Remote Blocks Fetched":800,"Local Blocks Fetched":
700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write Metrics":
{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated Blocks":
[{"Block ID":"rdd_0_0","Status":{"Storage Level":{"Use Disk":true,"Use Memory":true,
"Use Tachyon":false,"Deserialized":false,"Replication":2},"Memory Size":0,"Tachyon Size":0,
"Disk Size":0}}]}}
"""
|{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
|"Task End Reason":{"Reason":"Success"},
|"Task Info":{
| "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
| "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
|},
|"Task Metrics":{
| "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
| "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
| "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
| "Shuffle Read Metrics":{
| "Shuffle Finish Time":900,
| "Total Blocks Fetched":1500,
| "Remote Blocks Fetched":800,
| "Local Blocks Fetched":700,
| "Fetch Wait Time":900,
| "Remote Bytes Read":1000
| },
| "Shuffle Write Metrics":{
| "Shuffle Bytes Written":1200,
| "Shuffle Write Time":1500},
| "Updated Blocks":[
| {"Block ID":"rdd_0_0",
| "Status":{
| "Storage Level":{
| "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
| "Replication":2
| },
| "Memory Size":0,"Tachyon Size":0,"Disk Size":0
| }
| }
| ]
| }
|}
""".stripMargin
private val jobStartJsonString =
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment