diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b576d4c5f3c451bb393912a1fb638a778c7b604e..8a36af27bdd27090c4808226ec16995e9132379d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1148,13 +1148,13 @@ class DAGScheduler(
         null
       }
 
-    // The success case is dealt with separately below.
-    // TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here.
-    if (event.reason != Success) {
-      val attemptId = task.stageAttemptId
-      listenerBus.post(SparkListenerTaskEnd(
-        stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics))
-    }
+    // The stage may have already finished when we get this event -- eg. maybe it was a
+    // speculative task. It is important that we send the TaskEnd event in any case, so listeners
+    // are properly notified and can chose to handle it. For instance, some listeners are
+    // doing their own accounting and if they don't get the task end event they think
+    // tasks are still running when they really aren't.
+    listenerBus.post(SparkListenerTaskEnd(
+       stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))
 
     if (!stageIdToStage.contains(task.stageId)) {
       // Skip all the actions if the stage has been cancelled.
@@ -1164,8 +1164,6 @@ class DAGScheduler(
     val stage = stageIdToStage(task.stageId)
     event.reason match {
       case Success =>
-        listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
-          event.reason, event.taskInfo, taskMetrics))
         stage.pendingPartitions -= task.partitionId
         task match {
           case rt: ResultTask[_, _] =>
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index d1c7143abf834f5102646e054e4904e722602d6a..55f4190680dd5b9fd361e92e691bcf3bd9ba02c1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -134,6 +134,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     val successfulStages = new HashSet[Int]
     val failedStages = new ArrayBuffer[Int]
     val stageByOrderOfExecution = new ArrayBuffer[Int]
+    val endedTasks = new HashSet[Long]
 
     override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
       submittedStageInfos += stageSubmitted.stageInfo
@@ -148,6 +149,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         failedStages += stageInfo.stageId
       }
     }
+
+    override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+      endedTasks += taskEnd.taskInfo.taskId
+    }
   }
 
   var mapOutputTracker: MapOutputTrackerMaster = null
@@ -195,6 +200,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     sparkListener.submittedStageInfos.clear()
     sparkListener.successfulStages.clear()
     sparkListener.failedStages.clear()
+    sparkListener.endedTasks.clear()
     failure = null
     sc.addSparkListener(sparkListener)
     taskSets.clear()
@@ -982,6 +988,52 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     assert(countSubmittedMapStageAttempts() === 2)
   }
 
+  test("task events always posted in speculation / when stage is killed") {
+    val baseRdd = new MyRDD(sc, 4, Nil)
+    val finalRdd = new MyRDD(sc, 4, List(new OneToOneDependency(baseRdd)))
+    submit(finalRdd, Array(0, 1, 2, 3))
+
+    // complete two tasks
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, 42,
+      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(0)))
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(1), Success, 42,
+      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(1)))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+    // verify stage exists
+    assert(scheduler.stageIdToStage.contains(0))
+    assert(sparkListener.endedTasks.size == 2)
+
+    // finish other 2 tasks
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(2), Success, 42,
+      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(2)))
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(3), Success, 42,
+      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(3)))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+    assert(sparkListener.endedTasks.size == 4)
+
+    // verify the stage is done
+    assert(!scheduler.stageIdToStage.contains(0))
+
+    // Stage should be complete. Finish one other Successful task to simulate what can happen
+    // with a speculative task and make sure the event is sent out
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(3), Success, 42,
+      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(5)))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+    assert(sparkListener.endedTasks.size == 5)
+
+    // make sure non successful tasks also send out event
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(3), UnknownReason, 42,
+      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(6)))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+    assert(sparkListener.endedTasks.size == 6)
+  }
+
   test("ignore late map task completions") {
     val shuffleMapRdd = new MyRDD(sc, 2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
@@ -1944,6 +1996,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     info
   }
 
+  private def createFakeTaskInfoWithId(taskId: Long): TaskInfo = {
+    val info = new TaskInfo(taskId, 0, 0, 0L, "", "", TaskLocality.ANY, false)
+    info.finishTime = 1  // to prevent spurious errors in JobProgressListener
+    info
+  }
+
   private def makeCompletionEvent(
       task: Task[_],
       reason: TaskEndReason,