Skip to content
Snippets Groups Projects
Commit afe36516 authored by Burak Yavuz's avatar Burak Yavuz Committed by Tathagata Das
Browse files

[FLAKY-TEST] InputStreamsSuite.socket input stream

## What changes were proposed in this pull request?

https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.streaming.InputStreamsSuite&test_name=socket+input+stream

## How was this patch tested?

Tested 2,000 times.

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #16343 from brkyvz/sock.
parent 7e8994ff
No related branches found
No related tags found
No related merge requests found
......@@ -67,42 +67,33 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val expectedOutput = input.map(_.toString)
for (i <- input.indices) {
testServer.send(input(i).toString + "\n")
Thread.sleep(500)
clock.advance(batchDuration.milliseconds)
}
// Make sure we finish all batches before "stop"
if (!batchCounter.waitUntilBatchesCompleted(input.size, 30000)) {
fail("Timeout: cannot finish all batches in 30 seconds")
eventually(eventuallyTimeout) {
clock.advance(batchDuration.milliseconds)
// Verify whether data received was as expected
logInfo("--------------------------------")
logInfo("output.size = " + outputQueue.size)
logInfo("output")
outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
assert(output.length === expectedOutput.size)
for (i <- output.indices) {
assert(output(i) === expectedOutput(i))
}
}
// Ensure progress listener has been notified of all events
ssc.sparkContext.listenerBus.waitUntilEmpty(500)
// Verify all "InputInfo"s have been reported
assert(ssc.progressListener.numTotalReceivedRecords === input.size)
assert(ssc.progressListener.numTotalProcessedRecords === input.size)
logInfo("Stopping server")
testServer.stop()
logInfo("Stopping context")
ssc.stop()
// Verify whether data received was as expected
logInfo("--------------------------------")
logInfo("output.size = " + outputQueue.size)
logInfo("output")
outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
assert(output.length === expectedOutput.size)
for (i <- output.indices) {
assert(output(i) === expectedOutput(i))
eventually(eventuallyTimeout) {
assert(ssc.progressListener.numTotalReceivedRecords === input.length)
assert(ssc.progressListener.numTotalProcessedRecords === input.length)
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment