Skip to content
Snippets Groups Projects
Commit f55d0b93 authored by Mark Hamstra's avatar Mark Hamstra
Browse files

Synchronous, inline cleanup after runLocally

parent c9fcd909
No related branches found
No related tags found
No related merge requests found
...@@ -597,14 +597,6 @@ class DAGScheduler( ...@@ -597,14 +597,6 @@ class DAGScheduler(
listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics)) listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))
handleTaskCompletion(completion) handleTaskCompletion(completion)
case LocalJobCompleted(job, result) =>
val stage = job.finalStage
stageIdToJobIds -= stage.id // clean up data structures that were populated for a local job,
stageIdToStage -= stage.id // but that won't get cleaned up via the normal paths through
stageToInfos -= stage // completion events or stage abort
jobIdToStageIds -= job.jobId
listenerBus.post(SparkListenerJobEnd(job, result))
case TaskSetFailed(taskSet, reason) => case TaskSetFailed(taskSet, reason) =>
stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) } stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }
...@@ -691,7 +683,12 @@ class DAGScheduler( ...@@ -691,7 +683,12 @@ class DAGScheduler(
jobResult = JobFailed(e, Some(job.finalStage)) jobResult = JobFailed(e, Some(job.finalStage))
job.listener.jobFailed(e) job.listener.jobFailed(e)
} finally { } finally {
eventProcessActor ! LocalJobCompleted(job, jobResult) val s = job.finalStage
stageIdToJobIds -= s.id // clean up data structures that were populated for a local job,
stageIdToStage -= s.id // but that won't get cleaned up via the normal paths through
stageToInfos -= s // completion events or stage abort
jobIdToStageIds -= job.jobId
listenerBus.post(SparkListenerJobEnd(job, jobResult))
} }
} }
......
...@@ -65,8 +65,6 @@ private[scheduler] case class CompletionEvent( ...@@ -65,8 +65,6 @@ private[scheduler] case class CompletionEvent(
taskMetrics: TaskMetrics) taskMetrics: TaskMetrics)
extends DAGSchedulerEvent extends DAGSchedulerEvent
private[scheduler] case class LocalJobCompleted(job: ActiveJob, result: JobResult) extends DAGSchedulerEvent
private[scheduler] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent private[scheduler] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent
private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
......
...@@ -219,8 +219,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont ...@@ -219,8 +219,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
} }
val jobId = scheduler.nextJobId.getAndIncrement() val jobId = scheduler.nextJobId.getAndIncrement()
runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, listener)) runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, listener))
assert(scheduler.stageToInfos.size === 1)
runEvent(LocalJobCompleted(scheduler.stageToInfos.keys.head))
assert(results === Map(0 -> 42)) assert(results === Map(0 -> 42))
assertDataStructuresEmpty assertDataStructuresEmpty
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment