diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 378cf1aaebe7b0f701629a2a048b07484663883f..82163eadd56e9dba1fcfa2efa774743aad836f83 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -75,9 +75,11 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId) @DeveloperApi case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent +@DeveloperApi case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String) extends SparkListenerEvent +@DeveloperApi case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent /** An event used in the listener to shutdown the listener daemon thread. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 9a4be43ee219f9724368cc13287a5ebd19a4900b..8ec482a6f6d9c6eac10a5f457c8eacc9a2a76869 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -106,6 +106,8 @@ private[spark] class Stage( id } + def attemptId: Int = nextAttemptId + val name = callSite.short val details = callSite.long diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 4c62e4dc0bac83749c02efab80754d288b1a428f..6aecdfe8e66560842905ee681406e767a6e92edd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -27,10 +27,12 @@ import org.apache.spark.annotation.DeveloperApi class TaskInfo( val taskId: Long, val index: Int, + val attempt: Int, val launchTime: Long, val executorId: String, val host: String, - val taskLocality: TaskLocality.TaskLocality) { + val taskLocality: TaskLocality.TaskLocality, + val speculative: Boolean) { /** * The time when the task started remotely getting the result. Will not be set if the diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c0898f64fb0c99cef7f632046d046d000830c029..83ff6b8550b4f57a1e3a3781515d5c875e50c979 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -335,17 +335,19 @@ private[spark] class TaskSetManager( /** * Dequeue a pending task for a given node and return its index and locality level. * Only search for tasks matching the given locality constraint. + * + * @return An option containing (task index within the task set, locality, is speculative?) */ private def findTask(execId: String, host: String, locality: TaskLocality.Value) - : Option[(Int, TaskLocality.Value)] = + : Option[(Int, TaskLocality.Value, Boolean)] = { for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) { - return Some((index, TaskLocality.PROCESS_LOCAL)) + return Some((index, TaskLocality.PROCESS_LOCAL, false)) } if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) { - return Some((index, TaskLocality.NODE_LOCAL)) + return Some((index, TaskLocality.NODE_LOCAL, false)) } } @@ -354,23 +356,25 @@ private[spark] class TaskSetManager( rack <- sched.getRackForHost(host) index <- findTaskFromList(execId, getPendingTasksForRack(rack)) } { - return Some((index, TaskLocality.RACK_LOCAL)) + return Some((index, TaskLocality.RACK_LOCAL, false)) } } // Look for no-pref tasks after rack-local tasks since they can run anywhere. for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) { - return Some((index, TaskLocality.PROCESS_LOCAL)) + return Some((index, TaskLocality.PROCESS_LOCAL, false)) } if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { for (index <- findTaskFromList(execId, allPendingTasks)) { - return Some((index, TaskLocality.ANY)) + return Some((index, TaskLocality.ANY, false)) } } // Finally, if all else has failed, find a speculative task - findSpeculativeTask(execId, host, locality) + findSpeculativeTask(execId, host, locality).map { case (taskIndex, allowedLocality) => + (taskIndex, allowedLocality, true) + } } /** @@ -391,7 +395,7 @@ private[spark] class TaskSetManager( } findTask(execId, host, allowedLocality) match { - case Some((index, taskLocality)) => { + case Some((index, taskLocality, speculative)) => { // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() @@ -400,7 +404,9 @@ private[spark] class TaskSetManager( taskSet.id, index, taskId, execId, host, taskLocality)) // Do various bookkeeping copiesRunning(index) += 1 - val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality) + val attemptNum = taskAttempts(index).size + val info = new TaskInfo( + taskId, index, attemptNum + 1, curTime, execId, host, taskLocality, speculative) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) // Update our locality level for delay scheduling diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 8b65f0671bdb963125d15ffcb63b268166cc0bc8..8e3d5d1cd4c6be964f34e82fdc6f7799be39fd7a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -95,8 +95,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { </div> // scalastyle:on val taskHeaders: Seq[String] = - Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++ - Seq("Duration", "GC Time", "Result Ser Time") ++ + Seq( + "Index", "ID", "Attempt", "Status", "Locality Level", "Executor", + "Launch Time", "Duration", "GC Time") ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++ @@ -245,6 +246,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { <tr> <td>{info.index}</td> <td>{info.taskId}</td> + <td sorttable_customkey={info.attempt.toString}>{ + if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString + }</td> <td>{info.status}</td> <td>{info.taskLocality}</td> <td>{info.host}</td> @@ -255,9 +259,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { <td sorttable_customkey={gcTime.toString}> {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""} </td> + <!-- + TODO: Add this back after we add support to hide certain columns. <td sorttable_customkey={serializationTime.toString}> {if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""} </td> + --> {if (shuffleRead) { <td sorttable_customkey={shuffleReadSortable}> {shuffleReadReadable} diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 7cecbfe62a382c3c47043696b8d808b3c8cad0e1..6245b4b8023c2a6adb28b370a14399f101364877 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -32,6 +32,8 @@ import org.apache.spark.storage._ import org.apache.spark._ private[spark] object JsonProtocol { + // TODO: Remove this file and put JSON serialization into each individual class. + private implicit val format = DefaultFormats /** ------------------------------------------------- * @@ -194,10 +196,12 @@ private[spark] object JsonProtocol { def taskInfoToJson(taskInfo: TaskInfo): JValue = { ("Task ID" -> taskInfo.taskId) ~ ("Index" -> taskInfo.index) ~ + ("Attempt" -> taskInfo.attempt) ~ ("Launch Time" -> taskInfo.launchTime) ~ ("Executor ID" -> taskInfo.executorId) ~ ("Host" -> taskInfo.host) ~ ("Locality" -> taskInfo.taskLocality.toString) ~ + ("Speculative" -> taskInfo.speculative) ~ ("Getting Result Time" -> taskInfo.gettingResultTime) ~ ("Finish Time" -> taskInfo.finishTime) ~ ("Failed" -> taskInfo.failed) ~ @@ -487,16 +491,19 @@ private[spark] object JsonProtocol { def taskInfoFromJson(json: JValue): TaskInfo = { val taskId = (json \ "Task ID").extract[Long] val index = (json \ "Index").extract[Int] + val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1) val launchTime = (json \ "Launch Time").extract[Long] val executorId = (json \ "Executor ID").extract[String] val host = (json \ "Host").extract[String] val taskLocality = TaskLocality.withName((json \ "Locality").extract[String]) + val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false) val gettingResultTime = (json \ "Getting Result Time").extract[Long] val finishTime = (json \ "Finish Time").extract[Long] val failed = (json \ "Failed").extract[Boolean] val serializedSize = (json \ "Serialized Size").extract[Int] - val taskInfo = new TaskInfo(taskId, index, launchTime, executorId, host, taskLocality) + val taskInfo = + new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative) taskInfo.gettingResultTime = gettingResultTime taskInfo.finishTime = finishTime taskInfo.failed = failed diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index e0fec6a068bd14560c35d68cea6d21e6485af20c..fa43b66c6cb5a20d5993cee8ab1494b27069df69 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -66,7 +66,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc // finish this task, should get updated shuffleRead shuffleReadMetrics.remoteBytesRead = 1000 taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) - var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL) + var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 var task = new ShuffleMapTask(0, null, null, 0, null) val taskType = Utils.getFormattedClassName(task) @@ -75,7 +75,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc .shuffleRead == 1000) // finish a task with unknown executor-id, nothing should happen - taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL) + taskInfo = + new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL, true) taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) @@ -84,7 +85,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc // finish this task, should get updated duration shuffleReadMetrics.remoteBytesRead = 1000 taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) - taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL) + taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) @@ -94,7 +95,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc // finish this task, should get updated duration shuffleReadMetrics.remoteBytesRead = 1000 taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) - taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL) + taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) @@ -106,7 +107,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val conf = new SparkConf() val listener = new JobProgressListener(conf) val metrics = new TaskMetrics() - val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL) + val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 val task = new ShuffleMapTask(0, null, null, 0, null) val taskType = Utils.getFormattedClassName(task) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 495e1b7a0a2146ea39f1898cf247b037ca5dd1b0..6c49870455873f0b7599f6e66ab0ed2a9a54d076 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -35,10 +35,11 @@ class JsonProtocolSuite extends FunSuite { val stageSubmitted = SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties) val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 301, 401L, 501L)) - val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 444L)) - val taskGettingResult = SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 3000L)) + val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 1, 444L, false)) + val taskGettingResult = + SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true)) val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, - makeTaskInfo(123L, 234, 345L), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800)) + makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800)) val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties) val jobEnd = SparkListenerJobEnd(20, JobSucceeded) val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]]( @@ -73,7 +74,7 @@ class JsonProtocolSuite extends FunSuite { test("Dependent Classes") { testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) - testTaskInfo(makeTaskInfo(999L, 888, 777L)) + testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000)) @@ -269,10 +270,12 @@ class JsonProtocolSuite extends FunSuite { private def assertEquals(info1: TaskInfo, info2: TaskInfo) { assert(info1.taskId === info2.taskId) assert(info1.index === info2.index) + assert(info1.attempt === info2.attempt) assert(info1.launchTime === info2.launchTime) assert(info1.executorId === info2.executorId) assert(info1.host === info2.host) assert(info1.taskLocality === info2.taskLocality) + assert(info1.speculative === info2.speculative) assert(info1.gettingResultTime === info2.gettingResultTime) assert(info1.finishTime === info2.finishTime) assert(info1.failed === info2.failed) @@ -453,8 +456,8 @@ class JsonProtocolSuite extends FunSuite { new StageInfo(a, "greetings", b, rddInfos, "details") } - private def makeTaskInfo(a: Long, b: Int, c: Long) = { - new TaskInfo(a, b, c, "executor", "your kind sir", TaskLocality.NODE_LOCAL) + private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = { + new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative) } private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = { @@ -510,37 +513,60 @@ class JsonProtocolSuite extends FunSuite { private val taskStartJsonString = """ - {"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222, - "Index":333,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir", - "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":false, - "Serialized Size":0}} - """ + |{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222, + |"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir", + |"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0, + |"Failed":false,"Serialized Size":0}} + """.stripMargin private val taskGettingResultJsonString = """ - {"Event":"SparkListenerTaskGettingResult","Task Info":{"Task ID":1000,"Index": - 2000,"Launch Time":3000,"Executor ID":"executor","Host":"your kind sir", - "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":false, - "Serialized Size":0}} - """ + |{"Event":"SparkListenerTaskGettingResult","Task Info": + | {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor", + | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0, + | "Finish Time":0,"Failed":false,"Serialized Size":0 + | } + |} + """.stripMargin private val taskEndJsonString = """ - {"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask", - "Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":123,"Index": - 234,"Launch Time":345,"Executor ID":"executor","Host":"your kind sir", - "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed": - false,"Serialized Size":0},"Task Metrics":{"Host Name":"localhost", - "Executor Deserialize Time":300,"Executor Run Time":400,"Result Size":500, - "JVM GC Time":600,"Result Serialization Time":700,"Memory Bytes Spilled": - 800,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Shuffle Finish Time": - 900,"Total Blocks Fetched":1500,"Remote Blocks Fetched":800,"Local Blocks Fetched": - 700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write Metrics": - {"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated Blocks": - [{"Block ID":"rdd_0_0","Status":{"Storage Level":{"Use Disk":true,"Use Memory":true, - "Use Tachyon":false,"Deserialized":false,"Replication":2},"Memory Size":0,"Tachyon Size":0, - "Disk Size":0}}]}} - """ + |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask", + |"Task End Reason":{"Reason":"Success"}, + |"Task Info":{ + | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor", + | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false, + | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0 + |}, + |"Task Metrics":{ + | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400, + | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700, + | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0, + | "Shuffle Read Metrics":{ + | "Shuffle Finish Time":900, + | "Total Blocks Fetched":1500, + | "Remote Blocks Fetched":800, + | "Local Blocks Fetched":700, + | "Fetch Wait Time":900, + | "Remote Bytes Read":1000 + | }, + | "Shuffle Write Metrics":{ + | "Shuffle Bytes Written":1200, + | "Shuffle Write Time":1500}, + | "Updated Blocks":[ + | {"Block ID":"rdd_0_0", + | "Status":{ + | "Storage Level":{ + | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false, + | "Replication":2 + | }, + | "Memory Size":0,"Tachyon Size":0,"Disk Size":0 + | } + | } + | ] + | } + |} + """.stripMargin private val jobStartJsonString = """