Skip to content
Snippets Groups Projects
Commit 64d2c01f authored by Tathagata Das's avatar Tathagata Das Committed by Andrew Or
Browse files

[Spark-5967] [UI] Correctly clean JobProgressListener.stageIdToActiveJobIds

Patch should be self-explanatory
pwendell JoshRosen

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #4741 from tdas/SPARK-5967 and squashes the following commits:

653b5bb [Tathagata Das] Fixed the fix and added test
e2de972 [Tathagata Das] Clear stages which have no corresponding active jobs.
parent 20123662
No related branches found
No related tags found
No related merge requests found
...@@ -203,6 +203,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { ...@@ -203,6 +203,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
for (stageId <- jobData.stageIds) { for (stageId <- jobData.stageIds) {
stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage => stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>
jobsUsingStage.remove(jobEnd.jobId) jobsUsingStage.remove(jobEnd.jobId)
if (jobsUsingStage.isEmpty) {
stageIdToActiveJobIds.remove(stageId)
}
stageIdToInfo.get(stageId).foreach { stageInfo => stageIdToInfo.get(stageId).foreach { stageInfo =>
if (stageInfo.submissionTime.isEmpty) { if (stageInfo.submissionTime.isEmpty) {
// if this stage is pending, it won't complete, so mark it as "skipped": // if this stage is pending, it won't complete, so mark it as "skipped":
......
...@@ -88,6 +88,28 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc ...@@ -88,6 +88,28 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46)) listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46))
} }
test("test clearing of stageIdToActiveJobs") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
val listener = new JobProgressListener(conf)
val jobId = 0
val stageIds = 1 to 50
// Start a job with 50 stages
listener.onJobStart(createJobStartEvent(jobId, stageIds))
for (stageId <- stageIds) {
listener.onStageSubmitted(createStageStartEvent(stageId))
}
listener.stageIdToActiveJobIds.size should be > 0
// Complete the stages and job
for (stageId <- stageIds) {
listener.onStageCompleted(createStageEndEvent(stageId, failed = false))
}
listener.onJobEnd(createJobEndEvent(jobId, false))
assertActiveJobsStateIsEmpty(listener)
listener.stageIdToActiveJobIds.size should be (0)
}
test("test LRU eviction of jobs") { test("test LRU eviction of jobs") {
val conf = new SparkConf() val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString) conf.set("spark.ui.retainedStages", 5.toString)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment