Skip to content
Snippets Groups Projects
Commit 4a346e24 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-2284][UI] Mark all failed tasks as failures.

Previously only tasks failed with ExceptionFailure reason was marked as failure.

Author: Reynold Xin <rxin@apache.org>

Closes #1224 from rxin/SPARK-2284 and squashes the following commits:

be79dbd [Reynold Xin] [SPARK-2284][UI] Mark all failed tasks as failures.
parent b88a59a6
No related branches found
No related tags found
No related merge requests found
...@@ -185,12 +185,15 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { ...@@ -185,12 +185,15 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match { taskEnd.reason match {
case org.apache.spark.Success =>
stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1
(None, Option(taskEnd.taskMetrics))
case e: ExceptionFailure => case e: ExceptionFailure =>
stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1 stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
(Some(e), e.metrics) (Some(e), e.metrics)
case _ => case e: org.apache.spark.TaskEndReason =>
stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1 stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
(None, Option(taskEnd.taskMetrics)) (None, None)
} }
stageIdToTime.getOrElseUpdate(sid, 0L) stageIdToTime.getOrElseUpdate(sid, 0L)
......
...@@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs ...@@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.scalatest.Matchers import org.scalatest.Matchers
import org.apache.spark.{LocalSparkContext, SparkConf, Success} import org.apache.spark._
import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics} import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
import org.apache.spark.scheduler._ import org.apache.spark.scheduler._
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
...@@ -101,4 +101,32 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc ...@@ -101,4 +101,32 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail()) assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
.shuffleRead == 1000) .shuffleRead == 1000)
} }
test("test task success vs failure counting for different task end reasons") {
val conf = new SparkConf()
val listener = new JobProgressListener(conf)
val metrics = new TaskMetrics()
val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
taskInfo.finishTime = 1
val task = new ShuffleMapTask(0, null, null, 0, null)
val taskType = Utils.getFormattedClassName(task)
// Go through all the failure cases to make sure we are counting them as failures.
val taskFailedReasons = Seq(
Resubmitted,
new FetchFailed(null, 0, 0, 0),
new ExceptionFailure("Exception", "description", null, None),
TaskResultLost,
TaskKilled,
ExecutorLostFailure,
UnknownReason)
for (reason <- taskFailedReasons) {
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics))
assert(listener.stageIdToTasksComplete.get(task.stageId) === None)
}
// Make sure we count success as success.
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics))
assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1))
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment