From 9a3c5bd7082474cfb01f021aef103e44d12e2ff1 Mon Sep 17 00:00:00 2001
From: Burak Yavuz <brkyvz@gmail.com>
Date: Wed, 21 Dec 2016 17:23:48 -0800
Subject: [PATCH] [FLAKY-TEST] InputStreamsSuite.socket input stream

## What changes were proposed in this pull request?

https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.streaming.InputStreamsSuite&test_name=socket+input+stream

## How was this patch tested?

Tested 2,000 times.

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #16343 from brkyvz/sock.

(cherry picked from commit afe36516e4b4031196ee2e0a04980ac49208ea6b)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
---
 .../spark/streaming/InputStreamsSuite.scala   | 55 ++++++++-----------
 1 file changed, 23 insertions(+), 32 deletions(-)

diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 9ecfa48091..6fb50a4052 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -67,42 +67,33 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         val expectedOutput = input.map(_.toString)
         for (i <- input.indices) {
           testServer.send(input(i).toString + "\n")
-          Thread.sleep(500)
           clock.advance(batchDuration.milliseconds)
         }
-        // Make sure we finish all batches before "stop"
-        if (!batchCounter.waitUntilBatchesCompleted(input.size, 30000)) {
-          fail("Timeout: cannot finish all batches in 30 seconds")
+
+        eventually(eventuallyTimeout) {
+          clock.advance(batchDuration.milliseconds)
+          // Verify whether data received was as expected
+          logInfo("--------------------------------")
+          logInfo("output.size = " + outputQueue.size)
+          logInfo("output")
+          outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+          logInfo("expected output.size = " + expectedOutput.size)
+          logInfo("expected output")
+          expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+          logInfo("--------------------------------")
+
+          // Verify whether all the elements received are as expected
+          // (whether the elements were received one in each interval is not verified)
+          val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
+          assert(output.length === expectedOutput.size)
+          for (i <- output.indices) {
+            assert(output(i) === expectedOutput(i))
+          }
         }
 
-        // Ensure progress listener has been notified of all events
-        ssc.sparkContext.listenerBus.waitUntilEmpty(500)
-
-        // Verify all "InputInfo"s have been reported
-        assert(ssc.progressListener.numTotalReceivedRecords === input.size)
-        assert(ssc.progressListener.numTotalProcessedRecords === input.size)
-
-        logInfo("Stopping server")
-        testServer.stop()
-        logInfo("Stopping context")
-        ssc.stop()
-
-        // Verify whether data received was as expected
-        logInfo("--------------------------------")
-        logInfo("output.size = " + outputQueue.size)
-        logInfo("output")
-        outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-        logInfo("expected output.size = " + expectedOutput.size)
-        logInfo("expected output")
-        expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
-        logInfo("--------------------------------")
-
-        // Verify whether all the elements received are as expected
-        // (whether the elements were received one in each interval is not verified)
-        val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
-        assert(output.length === expectedOutput.size)
-        for (i <- output.indices) {
-          assert(output(i) === expectedOutput(i))
+        eventually(eventuallyTimeout) {
+          assert(ssc.progressListener.numTotalReceivedRecords === input.length)
+          assert(ssc.progressListener.numTotalProcessedRecords === input.length)
         }
       }
     }
-- 
GitLab