Skip to content
Snippets Groups Projects
Commit e8774158 authored by Sean Owen's avatar Sean Owen
Browse files

[SPARK-16193][TESTS] Address flaky ExternalAppendOnlyMapSuite spilling tests

## What changes were proposed in this pull request?

Make spill tests wait until job has completed before returning the number of stages that spilled

## How was this patch tested?

Existing Jenkins tests.

Author: Sean Owen <sowen@cloudera.com>

Closes #13896 from srowen/SPARK-16193.
parent 3ee9695d
No related branches found
No related tags found
No related merge requests found
...@@ -22,6 +22,7 @@ import java.net.{URI, URL} ...@@ -22,6 +22,7 @@ import java.net.{URI, URL}
import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets
import java.nio.file.Paths import java.nio.file.Paths
import java.util.Arrays import java.util.Arrays
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.jar.{JarEntry, JarOutputStream} import java.util.jar.{JarEntry, JarOutputStream}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
...@@ -190,8 +191,14 @@ private[spark] object TestUtils { ...@@ -190,8 +191,14 @@ private[spark] object TestUtils {
private class SpillListener extends SparkListener { private class SpillListener extends SparkListener {
private val stageIdToTaskMetrics = new mutable.HashMap[Int, ArrayBuffer[TaskMetrics]] private val stageIdToTaskMetrics = new mutable.HashMap[Int, ArrayBuffer[TaskMetrics]]
private val spilledStageIds = new mutable.HashSet[Int] private val spilledStageIds = new mutable.HashSet[Int]
private val stagesDone = new CountDownLatch(1)
def numSpilledStages: Int = spilledStageIds.size def numSpilledStages: Int = {
// Long timeout, just in case somehow the job end isn't notified.
// Fails if a timeout occurs
assert(stagesDone.await(10, TimeUnit.SECONDS))
spilledStageIds.size
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
stageIdToTaskMetrics.getOrElseUpdate( stageIdToTaskMetrics.getOrElseUpdate(
...@@ -206,4 +213,8 @@ private class SpillListener extends SparkListener { ...@@ -206,4 +213,8 @@ private class SpillListener extends SparkListener {
spilledStageIds += stageId spilledStageIds += stageId
} }
} }
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
stagesDone.countDown()
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment