Skip to content
Snippets Groups Projects
Commit a0d79498 authored by Shixiong Zhu's avatar Shixiong Zhu Committed by gatorsmile
Browse files

[SPARK-23475][WEBUI] Skipped stages should be evicted before completed stages


## What changes were proposed in this pull request?

The root cause of missing completed stages is because `cleanupStages` will never remove skipped stages.

This PR changes the logic to always remove skipped stage first. This is safe since  the job itself contains enough information to render skipped stages in the UI.

## How was this patch tested?

The new unit tests.

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #20656 from zsxwing/SPARK-23475.

(cherry picked from commit 45cf714e)
Signed-off-by: default avatargatorsmile <gatorsmile@gmail.com>
parent 23ba4416
No related branches found
No related tags found
No related merge requests found
...@@ -863,7 +863,10 @@ private[spark] class AppStatusListener( ...@@ -863,7 +863,10 @@ private[spark] class AppStatusListener(
return return
} }
val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime").first(0L) // As the completion time of a skipped stage is always -1, we will remove skipped stages first.
// This is safe since the job itself contains enough information to render skipped stages in the
// UI.
val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime")
val stages = KVUtils.viewToSeq(view, countToDelete.toInt) { s => val stages = KVUtils.viewToSeq(view, countToDelete.toInt) { s =>
s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING
} }
......
...@@ -1025,6 +1025,42 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { ...@@ -1025,6 +1025,42 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
} }
} }
test("skipped stages should be evicted before completed stages") {
val testConf = conf.clone().set(MAX_RETAINED_STAGES, 2)
val listener = new AppStatusListener(store, testConf, true)
val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
// Sart job 1
time += 1
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, stage2), null))
// Start and stop stage 1
time += 1
stage1.submissionTime = Some(time)
listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
time += 1
stage1.completionTime = Some(time)
listener.onStageCompleted(SparkListenerStageCompleted(stage1))
// Stop job 1 and stage 2 will become SKIPPED
time += 1
listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
// Submit stage 3 and verify stage 2 is evicted
val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")
time += 1
stage3.submissionTime = Some(time)
listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new Properties()))
assert(store.count(classOf[StageDataWrapper]) === 2)
intercept[NoSuchElementException] {
store.read(classOf[StageDataWrapper], Array(2, 0))
}
}
test("eviction should respect task completion time") { test("eviction should respect task completion time") {
val testConf = conf.clone().set(MAX_RETAINED_TASKS_PER_STAGE, 2) val testConf = conf.clone().set(MAX_RETAINED_TASKS_PER_STAGE, 2)
val listener = new AppStatusListener(store, testConf, true) val listener = new AppStatusListener(store, testConf, true)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment