Skip to content
Snippets Groups Projects
Commit 078c71c2 authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd...

[SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd cleanup - map and window

## What changes were proposed in this pull request?

The issue in this test is the cleanup of RDDs may not be able to finish before stopping StreamingContext. This PR basically just puts the assertions into `eventually` and runs it before stopping StreamingContext.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16362 from zsxwing/SPARK-18954.
parent ccfe60a8
No related branches found
No related tags found
No related merge requests found
......@@ -19,13 +19,13 @@ package org.apache.spark.streaming
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.existentials
import scala.reflect.ClassTag
import org.scalatest.concurrent.Eventually.eventually
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, WindowedDStream}
......@@ -657,48 +657,57 @@ class BasicOperationsSuite extends TestSuiteBase {
.window(Seconds(4), Seconds(2))
}
val operatedStream = runCleanupTest(conf, operation _,
numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3))
val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
val mappedStream = windowedStream1.dependencies.head
// Checkpoint remember durations
assert(windowedStream2.rememberDuration === rememberDuration)
assert(windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration)
assert(mappedStream.rememberDuration ===
rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration)
// WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7
// WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4
// MappedStream should remember till 2 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2
// WindowedStream2
assert(windowedStream2.generatedRDDs.contains(Time(10000)))
assert(windowedStream2.generatedRDDs.contains(Time(8000)))
assert(!windowedStream2.generatedRDDs.contains(Time(6000)))
// WindowedStream1
assert(windowedStream1.generatedRDDs.contains(Time(10000)))
assert(windowedStream1.generatedRDDs.contains(Time(4000)))
assert(!windowedStream1.generatedRDDs.contains(Time(3000)))
// MappedStream
assert(mappedStream.generatedRDDs.contains(Time(10000)))
assert(mappedStream.generatedRDDs.contains(Time(2000)))
assert(!mappedStream.generatedRDDs.contains(Time(1000)))
runCleanupTest(
conf,
operation _,
numExpectedOutput = cleanupTestInput.size / 2,
rememberDuration = Seconds(3)) { operatedStream =>
eventually(eventuallyTimeout) {
val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
val mappedStream = windowedStream1.dependencies.head
// Checkpoint remember durations
assert(windowedStream2.rememberDuration === rememberDuration)
assert(
windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration)
assert(mappedStream.rememberDuration ===
rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration)
// WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7
// WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4
// MappedStream should remember till 2 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2
// WindowedStream2
assert(windowedStream2.generatedRDDs.contains(Time(10000)))
assert(windowedStream2.generatedRDDs.contains(Time(8000)))
assert(!windowedStream2.generatedRDDs.contains(Time(6000)))
// WindowedStream1
assert(windowedStream1.generatedRDDs.contains(Time(10000)))
assert(windowedStream1.generatedRDDs.contains(Time(4000)))
assert(!windowedStream1.generatedRDDs.contains(Time(3000)))
// MappedStream
assert(mappedStream.generatedRDDs.contains(Time(10000)))
assert(mappedStream.generatedRDDs.contains(Time(2000)))
assert(!mappedStream.generatedRDDs.contains(Time(1000)))
}
}
}
test("rdd cleanup - updateStateByKey") {
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
Some(values.sum + state.getOrElse(0))
}
val stateStream = runCleanupTest(
conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3)))
assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2)
assert(stateStream.generatedRDDs.contains(Time(10000)))
assert(!stateStream.generatedRDDs.contains(Time(4000)))
runCleanupTest(
conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3))) { stateStream =>
eventually(eventuallyTimeout) {
assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2)
assert(stateStream.generatedRDDs.contains(Time(10000)))
assert(!stateStream.generatedRDDs.contains(Time(4000)))
}
}
}
test("rdd cleanup - input blocks and persisted RDDs") {
......@@ -779,13 +788,16 @@ class BasicOperationsSuite extends TestSuiteBase {
}
}
/** Test cleanup of RDDs in DStream metadata */
/**
* Test cleanup of RDDs in DStream metadata. `assertCleanup` is the function that asserts the
* cleanup of RDDs is successful.
*/
def runCleanupTest[T: ClassTag](
conf2: SparkConf,
operation: DStream[Int] => DStream[T],
numExpectedOutput: Int = cleanupTestInput.size,
rememberDuration: Duration = null
): DStream[T] = {
)(assertCleanup: (DStream[T]) => Unit): DStream[T] = {
// Setup the stream computation
assert(batchDuration === Seconds(1),
......@@ -794,7 +806,11 @@ class BasicOperationsSuite extends TestSuiteBase {
val operatedStream =
ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]]
if (rememberDuration != null) ssc.remember(rememberDuration)
val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
val output = runStreams[(Int, Int)](
ssc,
cleanupTestInput.size,
numExpectedOutput,
() => assertCleanup(operatedStream))
val clock = ssc.scheduler.clock.asInstanceOf[Clock]
assert(clock.getTimeMillis() === Seconds(10).milliseconds)
assert(output.size === numExpectedOutput)
......
......@@ -359,14 +359,20 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
* output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
*
* Returns a sequence of items for each RDD.
*
* @param ssc The StreamingContext
* @param numBatches The number of batches should be run
* @param numExpectedOutput The number of expected output
* @param preStop The function to run before stopping StreamingContext
*/
def runStreams[V: ClassTag](
ssc: StreamingContext,
numBatches: Int,
numExpectedOutput: Int
numExpectedOutput: Int,
preStop: () => Unit = () => {}
): Seq[Seq[V]] = {
// Flatten each RDD into a single Seq
runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq)
runStreamsWithPartitions(ssc, numBatches, numExpectedOutput, preStop).map(_.flatten.toSeq)
}
/**
......@@ -376,11 +382,17 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
*
* Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
* representing one partition.
*
* @param ssc The StreamingContext
* @param numBatches The number of batches should be run
* @param numExpectedOutput The number of expected output
* @param preStop The function to run before stopping StreamingContext
*/
def runStreamsWithPartitions[V: ClassTag](
ssc: StreamingContext,
numBatches: Int,
numExpectedOutput: Int
numExpectedOutput: Int,
preStop: () => Unit = () => {}
): Seq[Seq[Seq[V]]] = {
assert(numBatches > 0, "Number of batches to run stream computation is zero")
assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
......@@ -424,6 +436,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
assert(output.size === numExpectedOutput, "Unexpected number of outputs generated")
Thread.sleep(100) // Give some time for the forgetting old RDDs to complete
preStop()
} finally {
ssc.stop(stopSparkContext = true)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment