Skip to content
Snippets Groups Projects
Commit 6cb06e87 authored by Xin Ren's avatar Xin Ren Committed by Imran Rashid
Browse files

[SPARK-11155][WEB UI] Stage summary json should include stage duration

The json endpoint for stages doesn't include information on the stage duration that is present in the UI. This looks like a simple oversight, they should be included. eg., the metrics should be included at api/v1/applications/<appId>/stages.

Metrics I've added are: submissionTime, firstTaskLaunchedTime and completionTime

Author: Xin Ren <iamshrek@126.com>

Closes #10107 from keypointt/SPARK-11155.
parent e3735ce1
No related branches found
No related tags found
No related merge requests found
Showing
with 124 additions and 9 deletions
......@@ -17,8 +17,8 @@
package org.apache.spark.status.api.v1
import java.util.{Arrays, Date, List => JList}
import javax.ws.rs.{GET, PathParam, Produces, QueryParam}
import javax.ws.rs.core.MediaType
import javax.ws.rs.{GET, Produces, QueryParam}
import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics}
import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo}
......@@ -59,6 +59,15 @@ private[v1] object AllStagesResource {
stageUiData: StageUIData,
includeDetails: Boolean): StageData = {
val taskLaunchTimes = stageUiData.taskData.values.map(_.taskInfo.launchTime).filter(_ > 0)
val firstTaskLaunchedTime: Option[Date] =
if (taskLaunchTimes.nonEmpty) {
Some(new Date(taskLaunchTimes.min))
} else {
None
}
val taskData = if (includeDetails) {
Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } )
} else {
......@@ -92,6 +101,9 @@ private[v1] object AllStagesResource {
numCompleteTasks = stageUiData.numCompleteTasks,
numFailedTasks = stageUiData.numFailedTasks,
executorRunTime = stageUiData.executorRunTime,
submissionTime = stageInfo.submissionTime.map(new Date(_)),
firstTaskLaunchedTime,
completionTime = stageInfo.completionTime.map(new Date(_)),
inputBytes = stageUiData.inputBytes,
inputRecords = stageUiData.inputRecords,
outputBytes = stageUiData.outputBytes,
......
......@@ -120,6 +120,9 @@ class StageData private[spark](
val numFailedTasks: Int,
val executorRunTime: Long,
val submissionTime: Option[Date],
val firstTaskLaunchedTime: Option[Date],
val completionTime: Option[Date],
val inputBytes: Long,
val inputRecords: Long,
......
......@@ -6,6 +6,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 162,
"submissionTime" : "2015-02-03T16:43:07.191GMT",
"firstTaskLaunchedTime" : "2015-02-03T16:43:07.191GMT",
"completionTime" : "2015-02-03T16:43:07.226GMT",
"inputBytes" : 160,
"inputRecords" : 0,
"outputBytes" : 0,
......@@ -28,6 +31,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 3476,
"submissionTime" : "2015-02-03T16:43:05.829GMT",
"firstTaskLaunchedTime" : "2015-02-03T16:43:05.829GMT",
"completionTime" : "2015-02-03T16:43:06.286GMT",
"inputBytes" : 28000128,
"inputRecords" : 0,
"outputBytes" : 0,
......@@ -50,6 +56,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 4338,
"submissionTime" : "2015-02-03T16:43:04.228GMT",
"firstTaskLaunchedTime" : "2015-02-03T16:43:04.234GMT",
"completionTime" : "2015-02-03T16:43:04.819GMT",
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
......@@ -64,4 +73,4 @@
"details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line9.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:15)\n$line9.$read$$iwC$$iwC$$iwC.<init>(<console>:20)\n$line9.$read$$iwC$$iwC.<init>(<console>:22)\n$line9.$read$$iwC.<init>(<console>:24)\n$line9.$read.<init>(<console>:26)\n$line9.$read$.<init>(<console>:30)\n$line9.$read$.<clinit>(<console>)\n$line9.$eval$.<init>(<console>:7)\n$line9.$eval$.<clinit>(<console>)\n$line9.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
"schedulingPool" : "default",
"accumulatorUpdates" : [ ]
} ]
\ No newline at end of file
} ]
......@@ -6,6 +6,9 @@
"numCompleteTasks" : 7,
"numFailedTasks" : 1,
"executorRunTime" : 278,
"submissionTime" : "2015-02-03T16:43:06.296GMT",
"firstTaskLaunchedTime" : "2015-02-03T16:43:06.296GMT",
"completionTime" : "2015-02-03T16:43:06.347GMT",
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
......@@ -20,4 +23,4 @@
"details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line11.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:20)\n$line11.$read$$iwC$$iwC$$iwC.<init>(<console>:25)\n$line11.$read$$iwC$$iwC.<init>(<console>:27)\n$line11.$read$$iwC.<init>(<console>:29)\n$line11.$read.<init>(<console>:31)\n$line11.$read$.<init>(<console>:35)\n$line11.$read$.<clinit>(<console>)\n$line11.$eval$.<init>(<console>:7)\n$line11.$eval$.<clinit>(<console>)\n$line11.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
"schedulingPool" : "default",
"accumulatorUpdates" : [ ]
} ]
\ No newline at end of file
} ]
......@@ -6,6 +6,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 3476,
"submissionTime" : "2015-02-03T16:43:05.829GMT",
"firstTaskLaunchedTime" : "2015-02-03T16:43:05.829GMT",
"completionTime" : "2015-02-03T16:43:06.286GMT",
"inputBytes" : 28000128,
"inputRecords" : 0,
"outputBytes" : 0,
......@@ -267,4 +270,4 @@
"diskBytesSpilled" : 0
}
}
}
\ No newline at end of file
}
......@@ -6,6 +6,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 3476,
"submissionTime" : "2015-02-03T16:43:05.829GMT",
"firstTaskLaunchedTime" : "2015-02-03T16:43:05.829GMT",
"completionTime" : "2015-02-03T16:43:06.286GMT",
"inputBytes" : 28000128,
"inputRecords" : 0,
"outputBytes" : 0,
......@@ -267,4 +270,4 @@
"diskBytesSpilled" : 0
}
}
} ]
\ No newline at end of file
} ]
......@@ -6,6 +6,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 162,
"submissionTime" : "2015-02-03T16:43:07.191GMT",
"firstTaskLaunchedTime" : "2015-02-03T16:43:07.191GMT",
"completionTime" : "2015-02-03T16:43:07.226GMT",
"inputBytes" : 160,
"inputRecords" : 0,
"outputBytes" : 0,
......@@ -28,6 +31,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 3476,
"submissionTime" : "2015-02-03T16:43:05.829GMT",
"firstTaskLaunchedTime" : "2015-02-03T16:43:05.829GMT",
"completionTime" : "2015-02-03T16:43:06.286GMT",
"inputBytes" : 28000128,
"inputRecords" : 0,
"outputBytes" : 0,
......@@ -50,6 +56,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 4338,
"submissionTime" : "2015-02-03T16:43:04.228GMT",
"firstTaskLaunchedTime" : "2015-02-03T16:43:04.234GMT",
"completionTime" : "2015-02-03T16:43:04.819GMT",
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
......@@ -72,6 +81,9 @@
"numCompleteTasks" : 7,
"numFailedTasks" : 1,
"executorRunTime" : 278,
"submissionTime" : "2015-02-03T16:43:06.296GMT",
"firstTaskLaunchedTime" : "2015-02-03T16:43:06.296GMT",
"completionTime" : "2015-02-03T16:43:06.347GMT",
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
......@@ -86,4 +98,4 @@
"details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line11.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:20)\n$line11.$read$$iwC$$iwC$$iwC.<init>(<console>:25)\n$line11.$read$$iwC$$iwC.<init>(<console>:27)\n$line11.$read$$iwC.<init>(<console>:29)\n$line11.$read.<init>(<console>:31)\n$line11.$read$.<init>(<console>:35)\n$line11.$read$.<clinit>(<console>)\n$line11.$eval$.<init>(<console>:7)\n$line11.$eval$.<clinit>(<console>)\n$line11.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
"schedulingPool" : "default",
"accumulatorUpdates" : [ ]
} ]
\ No newline at end of file
} ]
......@@ -6,6 +6,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 120,
"submissionTime" : "2015-03-16T19:25:36.103GMT",
"firstTaskLaunchedTime" : "2015-03-16T19:25:36.515GMT",
"completionTime" : "2015-03-16T19:25:36.579GMT",
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
......@@ -24,4 +27,4 @@
"name" : "my counter",
"value" : "5050"
} ]
} ]
\ No newline at end of file
} ]
......@@ -6,6 +6,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 120,
"submissionTime" : "2015-03-16T19:25:36.103GMT",
"firstTaskLaunchedTime" : "2015-03-16T19:25:36.515GMT",
"completionTime" : "2015-03-16T19:25:36.579GMT",
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
......@@ -239,4 +242,4 @@
"diskBytesSpilled" : 0
}
}
}
\ No newline at end of file
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.status.api.v1
import java.util.Date
import scala.collection.mutable.HashMap
import org.apache.spark.SparkFunSuite
import org.apache.spark.scheduler.{StageInfo, TaskInfo, TaskLocality}
import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}
class AllStagesResourceSuite extends SparkFunSuite {
def getFirstTaskLaunchTime(taskLaunchTimes: Seq[Long]): Option[Date] = {
val tasks = new HashMap[Long, TaskUIData]
taskLaunchTimes.zipWithIndex.foreach { case (time, idx) =>
tasks(idx.toLong) = new TaskUIData(
new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false), None, None)
}
val stageUiData = new StageUIData()
stageUiData.taskData = tasks
val status = StageStatus.ACTIVE
val stageInfo = new StageInfo(
1, 1, "stage 1", 10, Seq.empty, Seq.empty, "details abc", Seq.empty)
val stageData = AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, false)
stageData.firstTaskLaunchedTime
}
test("firstTaskLaunchedTime when there are no tasks") {
val result = getFirstTaskLaunchTime(Seq())
assert(result == None)
}
test("firstTaskLaunchedTime when there are tasks but none launched") {
val result = getFirstTaskLaunchTime(Seq(-100L, -200L, -300L))
assert(result == None)
}
test("firstTaskLaunchedTime when there are tasks and some launched") {
val result = getFirstTaskLaunchTime(Seq(-100L, 1449255596000L, 1449255597000L))
assert(result == Some(new Date(1449255596000L)))
}
}
......@@ -132,7 +132,9 @@ object MimaExcludes {
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.jdbc.NoopDialect$")
) ++ Seq (
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.status.api.v1.ApplicationInfo.this")
"org.apache.spark.status.api.v1.ApplicationInfo.this"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.status.api.v1.StageData.this")
) ++ Seq(
// SPARK-11766 add toJson to Vector
ProblemFilters.exclude[MissingMethodProblem](
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment