diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 79a17e26665fd89233ad97714173d95b17aa45e9..5ea161cd0d151dc18e6cf9a9bd1036364681f741 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -915,7 +915,10 @@ private[spark] class AppStatusListener(
       return
     }
 
-    val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime").first(0L)
+    // As the completion time of a skipped stage is always -1, we will remove skipped stages first.
+    // This is safe since the job itself contains enough information to render skipped stages in the
+    // UI.
+    val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime")
     val stages = KVUtils.viewToSeq(view, countToDelete.toInt) { s =>
       s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING
     }
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 673d191b5a4dbd62a2f6c9f78f996f0fec9fccf1..1cd71955ad4d9deb7f1fd97716e5d0787f6ae331 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1089,6 +1089,42 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     }
   }
 
+  test("skipped stages should be evicted before completed stages") {
+    val testConf = conf.clone().set(MAX_RETAINED_STAGES, 2)
+    val listener = new AppStatusListener(store, testConf, true)
+
+    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
+
+    // Sart job 1
+    time += 1
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, stage2), null))
+
+    // Start and stop stage 1
+    time += 1
+    stage1.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
+
+    time += 1
+    stage1.completionTime = Some(time)
+    listener.onStageCompleted(SparkListenerStageCompleted(stage1))
+
+    // Stop job 1 and stage 2 will become SKIPPED
+    time += 1
+    listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
+
+    // Submit stage 3 and verify stage 2 is evicted
+    val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")
+    time += 1
+    stage3.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new Properties()))
+
+    assert(store.count(classOf[StageDataWrapper]) === 2)
+    intercept[NoSuchElementException] {
+      store.read(classOf[StageDataWrapper], Array(2, 0))
+    }
+  }
+
   test("eviction should respect task completion time") {
     val testConf = conf.clone().set(MAX_RETAINED_TASKS_PER_STAGE, 2)
     val listener = new AppStatusListener(store, testConf, true)