Skip to content
Snippets Groups Projects
Commit d0463470 authored by zsxwing's avatar zsxwing Committed by Tathagata Das
Browse files

[SPARK-9504] [STREAMING] [TESTS] Use eventually to fix the flaky test

The previous code uses `ssc.awaitTerminationOrTimeout(500)`. Since nobody will stop it during `awaitTerminationOrTimeout`, it's just like `sleep(500)`. In a super overloaded Jenkins worker, the receiver may be not able to start in 500 milliseconds. Verified this in the log of https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39149/ There is no log about starting the receiver before this failure. That's why `assert(runningCount > 0)` failed.

This PR replaces `awaitTerminationOrTimeout` with `eventually` which should be more reliable.

Author: zsxwing <zsxwing@gmail.com>

Closes #7823 from zsxwing/SPARK-9504 and squashes the following commits:

7af66a6 [zsxwing] Remove wrong assertion
5ba2c99 [zsxwing] Use eventually to fix the flaky test
parent 3afc1de8
No related branches found
No related tags found
No related merge requests found
...@@ -261,7 +261,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo ...@@ -261,7 +261,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
for (i <- 1 to 4) { for (i <- 1 to 4) {
logInfo("==================================\n\n\n") logInfo("==================================\n\n\n")
ssc = new StreamingContext(sc, Milliseconds(100)) ssc = new StreamingContext(sc, Milliseconds(100))
var runningCount = 0 @volatile var runningCount = 0
TestReceiver.counter.set(1) TestReceiver.counter.set(1)
val input = ssc.receiverStream(new TestReceiver) val input = ssc.receiverStream(new TestReceiver)
input.count().foreachRDD { rdd => input.count().foreachRDD { rdd =>
...@@ -270,14 +270,14 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo ...@@ -270,14 +270,14 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
logInfo("Count = " + count + ", Running count = " + runningCount) logInfo("Count = " + count + ", Running count = " + runningCount)
} }
ssc.start() ssc.start()
ssc.awaitTerminationOrTimeout(500) eventually(timeout(10.seconds), interval(10.millis)) {
assert(runningCount > 0)
}
ssc.stop(stopSparkContext = false, stopGracefully = true) ssc.stop(stopSparkContext = false, stopGracefully = true)
logInfo("Running count = " + runningCount) logInfo("Running count = " + runningCount)
logInfo("TestReceiver.counter = " + TestReceiver.counter.get()) logInfo("TestReceiver.counter = " + TestReceiver.counter.get())
assert(runningCount > 0)
assert( assert(
(TestReceiver.counter.get() == runningCount + 1) || TestReceiver.counter.get() == runningCount + 1,
(TestReceiver.counter.get() == runningCount + 2),
"Received records = " + TestReceiver.counter.get() + ", " + "Received records = " + TestReceiver.counter.get() + ", " +
"processed records = " + runningCount "processed records = " + runningCount
) )
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment