diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b371a2412fd4a70d3d252e51727f10bd03e4df4f..b84986751928a50e9a24ef1e980b86d169e62f81 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -597,14 +597,6 @@ class DAGScheduler(
         listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))
         handleTaskCompletion(completion)
 
-      case LocalJobCompleted(job, result) =>
-        val stage = job.finalStage
-        stageIdToJobIds -= stage.id    // clean up data structures that were populated for a local job,
-        stageIdToStage -= stage.id     // but that won't get cleaned up via the normal paths through
-        stageToInfos -= stage          // completion events or stage abort
-        jobIdToStageIds -= job.jobId
-        listenerBus.post(SparkListenerJobEnd(job, result))
-
       case TaskSetFailed(taskSet, reason) =>
         stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }
 
@@ -691,7 +683,12 @@ class DAGScheduler(
         jobResult = JobFailed(e, Some(job.finalStage))
         job.listener.jobFailed(e)
     } finally {
-      eventProcessActor ! LocalJobCompleted(job, jobResult)
+      val s = job.finalStage
+      stageIdToJobIds -= s.id    // clean up data structures that were populated for a local job,
+      stageIdToStage -= s.id     // but that won't get cleaned up via the normal paths through
+      stageToInfos -= s          // completion events or stage abort
+      jobIdToStageIds -= job.jobId
+      listenerBus.post(SparkListenerJobEnd(job, jobResult))
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index aa496b7ac6801eed11a5e84d580af8c436938fce..add11876130b18eabf6cc8404f223cef31bafa9b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -65,8 +65,6 @@ private[scheduler] case class CompletionEvent(
     taskMetrics: TaskMetrics)
   extends DAGSchedulerEvent
 
-private[scheduler] case class LocalJobCompleted(job: ActiveJob, result: JobResult) extends DAGSchedulerEvent
-
 private[scheduler] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent
 
 private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 8ce8c68af3b68392d2441fa6a6a2fd5b60be1efa..706d84a58b5630a80ea2bf23ea01e197ffa3f43a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -219,8 +219,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     }
     val jobId = scheduler.nextJobId.getAndIncrement()
     runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, listener))
-    assert(scheduler.stageToInfos.size === 1)
-    runEvent(LocalJobCompleted(scheduler.stageToInfos.keys.head))
     assert(results === Map(0 -> 42))
     assertDataStructuresEmpty
   }