From c6889d2cb9cd99f7e3e0ee14a4fdf301f1f9810e Mon Sep 17 00:00:00 2001
From: Andrew Or <andrewor14@gmail.com>
Date: Wed, 6 Aug 2014 16:34:53 -0700
Subject: [PATCH] [HOTFIX][Streaming] Handle port collisions in flume polling
 test

This is failing my tests in #1777. @tdas

Author: Andrew Or <andrewor14@gmail.com>

Closes #1803 from andrewor14/fix-flaky-streaming-test and squashes the following commits:

ea11a03 [Andrew Or] Catch all exceptions caused by BindExceptions
54a0ca0 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-flaky-streaming-test
664095c [Andrew Or] Tone down bind exception message
af3ddc9 [Andrew Or] Handle port collisions in flume polling test
---
 .../flume/FlumePollingStreamSuite.scala       | 32 ++++++++++++++++++-
 1 file changed, 31 insertions(+), 1 deletion(-)

diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 27bf2ac962..a69baa1698 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -35,6 +35,7 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream
 import org.apache.spark.streaming.util.ManualClock
 import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext}
 import org.apache.spark.streaming.flume.sink._
+import org.apache.spark.util.Utils
 
 class FlumePollingStreamSuite extends TestSuiteBase {
 
@@ -45,8 +46,37 @@ class FlumePollingStreamSuite extends TestSuiteBase {
   val eventsPerBatch = 100
   val totalEventsPerChannel = batchCount * eventsPerBatch
   val channelCapacity = 5000
+  val maxAttempts = 5
 
   test("flume polling test") {
+    testMultipleTimes(testFlumePolling)
+  }
+
+  test("flume polling test multiple hosts") {
+    testMultipleTimes(testFlumePollingMultipleHost)
+  }
+
+  /**
+   * Run the given test until no more java.net.BindException's are thrown.
+   * Do this only up to a certain attempt limit.
+   */
+  private def testMultipleTimes(test: () => Unit): Unit = {
+    var testPassed = false
+    var attempt = 0
+    while (!testPassed && attempt < maxAttempts) {
+      try {
+        test()
+        testPassed = true
+      } catch {
+        case e: Exception if Utils.isBindCollision(e) =>
+          logWarning("Exception when running flume polling test: " + e)
+          attempt += 1
+      }
+    }
+    assert(testPassed, s"Test failed after $attempt attempts!")
+  }
+
+  private def testFlumePolling(): Unit = {
     val testPort = getTestPort
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
@@ -80,7 +110,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
     channel.stop()
   }
 
-  test("flume polling test multiple hosts") {
+  private def testFlumePollingMultipleHost(): Unit = {
     val testPort = getTestPort
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
-- 
GitLab