diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 396cbcbc8d268f70687163dd206296d791f8cfd6..bfefe4dbc408927f0b12c7fad70f1d1556fd3d2c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -185,12 +185,15 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
 
       val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
         taskEnd.reason match {
+          case org.apache.spark.Success =>
+            stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1
+            (None, Option(taskEnd.taskMetrics))
           case e: ExceptionFailure =>
             stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
             (Some(e), e.metrics)
-          case _ =>
-            stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1
-            (None, Option(taskEnd.taskMetrics))
+          case e: org.apache.spark.TaskEndReason =>
+            stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
+            (None, None)
         }
 
       stageIdToTime.getOrElseUpdate(sid, 0L)
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index c3a14f48de38ea15a44532f9bd3d778fa71e6140..e0fec6a068bd14560c35d68cea6d21e6485af20c 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs
 import org.scalatest.FunSuite
 import org.scalatest.Matchers
 
-import org.apache.spark.{LocalSparkContext, SparkConf, Success}
+import org.apache.spark._
 import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
 import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
@@ -101,4 +101,32 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
       .shuffleRead == 1000)
   }
+
+  test("test task success vs failure counting for different task end reasons") {
+    val conf = new SparkConf()
+    val listener = new JobProgressListener(conf)
+    val metrics = new TaskMetrics()
+    val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+    taskInfo.finishTime = 1
+    val task = new ShuffleMapTask(0, null, null, 0, null)
+    val taskType = Utils.getFormattedClassName(task)
+
+    // Go through all the failure cases to make sure we are counting them as failures.
+    val taskFailedReasons = Seq(
+      Resubmitted,
+      new FetchFailed(null, 0, 0, 0),
+      new ExceptionFailure("Exception", "description", null, None),
+      TaskResultLost,
+      TaskKilled,
+      ExecutorLostFailure,
+      UnknownReason)
+    for (reason <- taskFailedReasons) {
+      listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics))
+      assert(listener.stageIdToTasksComplete.get(task.stageId) === None)
+    }
+
+    // Make sure we count success as success.
+    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics))
+    assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1))
+  }
 }