Skip to content
Snippets Groups Projects
Commit 6518ef63 authored by Andrew Or's avatar Andrew Or
Browse files

[SPARK-9948] Fix flaky AccumulatorSuite - internal accumulators

In these tests, we use a custom listener and we assert on fields in the stage / task completion events. However, these events are posted in a separate thread so they're not guaranteed to be posted in time. This commit fixes this flakiness through a job end registration callback.

Author: Andrew Or <andrew@databricks.com>

Closes #8176 from andrewor14/fix-accumulator-suite.
parent 33bae585
No related branches found
No related tags found
No related merge requests found
...@@ -182,26 +182,30 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex ...@@ -182,26 +182,30 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
sc = new SparkContext("local", "test") sc = new SparkContext("local", "test")
sc.addSparkListener(listener) sc.addSparkListener(listener)
// Have each task add 1 to the internal accumulator // Have each task add 1 to the internal accumulator
sc.parallelize(1 to 100, numPartitions).mapPartitions { iter => val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter =>
TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1 TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
iter iter
}.count()
val stageInfos = listener.getCompletedStageInfos
val taskInfos = listener.getCompletedTaskInfos
assert(stageInfos.size === 1)
assert(taskInfos.size === numPartitions)
// The accumulator values should be merged in the stage
val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
assert(stageAccum.value.toLong === numPartitions)
// The accumulator should be updated locally on each task
val taskAccumValues = taskInfos.map { taskInfo =>
val taskAccum = findAccumulableInfo(taskInfo.accumulables, TEST_ACCUMULATOR)
assert(taskAccum.update.isDefined)
assert(taskAccum.update.get.toLong === 1)
taskAccum.value.toLong
} }
// Each task should keep track of the partial value on the way, i.e. 1, 2, ... numPartitions // Register asserts in job completion callback to avoid flakiness
assert(taskAccumValues.sorted === (1L to numPartitions).toSeq) listener.registerJobCompletionCallback { _ =>
val stageInfos = listener.getCompletedStageInfos
val taskInfos = listener.getCompletedTaskInfos
assert(stageInfos.size === 1)
assert(taskInfos.size === numPartitions)
// The accumulator values should be merged in the stage
val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
assert(stageAccum.value.toLong === numPartitions)
// The accumulator should be updated locally on each task
val taskAccumValues = taskInfos.map { taskInfo =>
val taskAccum = findAccumulableInfo(taskInfo.accumulables, TEST_ACCUMULATOR)
assert(taskAccum.update.isDefined)
assert(taskAccum.update.get.toLong === 1)
taskAccum.value.toLong
}
// Each task should keep track of the partial value on the way, i.e. 1, 2, ... numPartitions
assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
}
rdd.count()
} }
test("internal accumulators in multiple stages") { test("internal accumulators in multiple stages") {
...@@ -211,7 +215,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex ...@@ -211,7 +215,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
sc.addSparkListener(listener) sc.addSparkListener(listener)
// Each stage creates its own set of internal accumulators so the // Each stage creates its own set of internal accumulators so the
// values for the same metric should not be mixed up across stages // values for the same metric should not be mixed up across stages
sc.parallelize(1 to 100, numPartitions) val rdd = sc.parallelize(1 to 100, numPartitions)
.map { i => (i, i) } .map { i => (i, i) }
.mapPartitions { iter => .mapPartitions { iter =>
TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1 TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
...@@ -227,16 +231,20 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex ...@@ -227,16 +231,20 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 100 TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 100
iter iter
} }
.count() // Register asserts in job completion callback to avoid flakiness
// We ran 3 stages, and the accumulator values should be distinct listener.registerJobCompletionCallback { _ =>
val stageInfos = listener.getCompletedStageInfos // We ran 3 stages, and the accumulator values should be distinct
assert(stageInfos.size === 3) val stageInfos = listener.getCompletedStageInfos
val firstStageAccum = findAccumulableInfo(stageInfos(0).accumulables.values, TEST_ACCUMULATOR) assert(stageInfos.size === 3)
val secondStageAccum = findAccumulableInfo(stageInfos(1).accumulables.values, TEST_ACCUMULATOR) val (firstStageAccum, secondStageAccum, thirdStageAccum) =
val thirdStageAccum = findAccumulableInfo(stageInfos(2).accumulables.values, TEST_ACCUMULATOR) (findAccumulableInfo(stageInfos(0).accumulables.values, TEST_ACCUMULATOR),
assert(firstStageAccum.value.toLong === numPartitions) findAccumulableInfo(stageInfos(1).accumulables.values, TEST_ACCUMULATOR),
assert(secondStageAccum.value.toLong === numPartitions * 10) findAccumulableInfo(stageInfos(2).accumulables.values, TEST_ACCUMULATOR))
assert(thirdStageAccum.value.toLong === numPartitions * 2 * 100) assert(firstStageAccum.value.toLong === numPartitions)
assert(secondStageAccum.value.toLong === numPartitions * 10)
assert(thirdStageAccum.value.toLong === numPartitions * 2 * 100)
}
rdd.count()
} }
test("internal accumulators in fully resubmitted stages") { test("internal accumulators in fully resubmitted stages") {
...@@ -268,7 +276,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex ...@@ -268,7 +276,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
// This says use 1 core and retry tasks up to 2 times // This says use 1 core and retry tasks up to 2 times
sc = new SparkContext("local[1, 2]", "test") sc = new SparkContext("local[1, 2]", "test")
sc.addSparkListener(listener) sc.addSparkListener(listener)
sc.parallelize(1 to 100, numPartitions).mapPartitionsWithIndex { case (i, iter) => val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitionsWithIndex { case (i, iter) =>
val taskContext = TaskContext.get() val taskContext = TaskContext.get()
taskContext.internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1 taskContext.internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
// Fail the first attempts of a subset of the tasks // Fail the first attempts of a subset of the tasks
...@@ -276,28 +284,32 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex ...@@ -276,28 +284,32 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
throw new Exception("Failing a task intentionally.") throw new Exception("Failing a task intentionally.")
} }
iter iter
}.count() }
val stageInfos = listener.getCompletedStageInfos // Register asserts in job completion callback to avoid flakiness
val taskInfos = listener.getCompletedTaskInfos listener.registerJobCompletionCallback { _ =>
assert(stageInfos.size === 1) val stageInfos = listener.getCompletedStageInfos
assert(taskInfos.size === numPartitions + numFailedPartitions) val taskInfos = listener.getCompletedTaskInfos
val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR) assert(stageInfos.size === 1)
// We should not double count values in the merged accumulator assert(taskInfos.size === numPartitions + numFailedPartitions)
assert(stageAccum.value.toLong === numPartitions) val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
val taskAccumValues = taskInfos.flatMap { taskInfo => // We should not double count values in the merged accumulator
if (!taskInfo.failed) { assert(stageAccum.value.toLong === numPartitions)
// If a task succeeded, its update value should always be 1 val taskAccumValues = taskInfos.flatMap { taskInfo =>
val taskAccum = findAccumulableInfo(taskInfo.accumulables, TEST_ACCUMULATOR) if (!taskInfo.failed) {
assert(taskAccum.update.isDefined) // If a task succeeded, its update value should always be 1
assert(taskAccum.update.get.toLong === 1) val taskAccum = findAccumulableInfo(taskInfo.accumulables, TEST_ACCUMULATOR)
Some(taskAccum.value.toLong) assert(taskAccum.update.isDefined)
} else { assert(taskAccum.update.get.toLong === 1)
// If a task failed, we should not get its accumulator values Some(taskAccum.value.toLong)
assert(taskInfo.accumulables.isEmpty) } else {
None // If a task failed, we should not get its accumulator values
assert(taskInfo.accumulables.isEmpty)
None
}
} }
assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
} }
assert(taskAccumValues.sorted === (1L to numPartitions).toSeq) rdd.count()
} }
} }
...@@ -313,20 +325,27 @@ private[spark] object AccumulatorSuite { ...@@ -313,20 +325,27 @@ private[spark] object AccumulatorSuite {
testName: String)(testBody: => Unit): Unit = { testName: String)(testBody: => Unit): Unit = {
val listener = new SaveInfoListener val listener = new SaveInfoListener
sc.addSparkListener(listener) sc.addSparkListener(listener)
// Verify that the accumulator does not already exist // Register asserts in job completion callback to avoid flakiness
listener.registerJobCompletionCallback { jobId =>
if (jobId == 0) {
// The first job is a dummy one to verify that the accumulator does not already exist
val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
assert(!accums.exists(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY))
} else {
// In the subsequent jobs, verify that peak execution memory is updated
val accum = listener.getCompletedStageInfos
.flatMap(_.accumulables.values)
.find(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY)
.getOrElse {
throw new TestFailedException(
s"peak execution memory accumulator not set in '$testName'", 0)
}
assert(accum.value.toLong > 0)
}
}
// Run the jobs
sc.parallelize(1 to 10).count() sc.parallelize(1 to 10).count()
val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
assert(!accums.exists(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY))
testBody testBody
// Verify that peak execution memory is updated
val accum = listener.getCompletedStageInfos
.flatMap(_.accumulables.values)
.find(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY)
.getOrElse {
throw new TestFailedException(
s"peak execution memory accumulator not set in '$testName'", 0)
}
assert(accum.value.toLong > 0)
} }
} }
...@@ -336,10 +355,22 @@ private[spark] object AccumulatorSuite { ...@@ -336,10 +355,22 @@ private[spark] object AccumulatorSuite {
private class SaveInfoListener extends SparkListener { private class SaveInfoListener extends SparkListener {
private val completedStageInfos: ArrayBuffer[StageInfo] = new ArrayBuffer[StageInfo] private val completedStageInfos: ArrayBuffer[StageInfo] = new ArrayBuffer[StageInfo]
private val completedTaskInfos: ArrayBuffer[TaskInfo] = new ArrayBuffer[TaskInfo] private val completedTaskInfos: ArrayBuffer[TaskInfo] = new ArrayBuffer[TaskInfo]
private var jobCompletionCallback: (Int => Unit) = null // parameter is job ID
def getCompletedStageInfos: Seq[StageInfo] = completedStageInfos.toArray.toSeq def getCompletedStageInfos: Seq[StageInfo] = completedStageInfos.toArray.toSeq
def getCompletedTaskInfos: Seq[TaskInfo] = completedTaskInfos.toArray.toSeq def getCompletedTaskInfos: Seq[TaskInfo] = completedTaskInfos.toArray.toSeq
/** Register a callback to be called on job end. */
def registerJobCompletionCallback(callback: (Int => Unit)): Unit = {
jobCompletionCallback = callback
}
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
if (jobCompletionCallback != null) {
jobCompletionCallback(jobEnd.jobId)
}
}
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
completedStageInfos += stageCompleted.stageInfo completedStageInfos += stageCompleted.stageInfo
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment