From 85cf0636825d1997d64d0bdc04618f29b7222da1 Mon Sep 17 00:00:00 2001
From: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Date: Tue, 24 Mar 2015 16:13:25 +0000
Subject: [PATCH] [SPARK-5559] [Streaming] [Test] Remove oppotunity we met
 flakiness when running FlumeStreamSuite

When we run FlumeStreamSuite on Jenkins, sometimes we get error like as follows.

    sbt.ForkMain$ForkError: The code passed to eventually never returned normally. Attempted 52 times over 10.094849836 seconds. Last failure message: Error connecting to localhost/127.0.0.1:23456.
	    at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
	    at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
	    at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	    at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:307)
	   at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	   at org.apache.spark.streaming.flume.FlumeStreamSuite.writeAndVerify(FlumeStreamSuite.scala:116)
           at org.apache.spark.streaming.flume.FlumeStreamSuite.org$apache$spark$streaming$flume$FlumeStreamSuite$$testFlumeStream(FlumeStreamSuite.scala:74)
	   at org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply$mcV$sp(FlumeStreamSuite.scala:66)
	    at org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply(FlumeStreamSuite.scala:66)
	    at org.apache.spark.streaming.flume.FlumeStreamSuite$$anonfun$3.apply(FlumeStreamSuite.scala:66)
	    at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	    at org.scalatest.Transformer.apply(Transformer.scala:22)
	    at org.scalatest.Transformer.apply(Transformer.scala:20)
    	    at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
	    at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
	    at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
	    at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
	   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	    at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)

This error is caused by check-then-act logic  when it find free-port .

      /** Find a free port */
      private def findFreePort(): Int = {
        Utils.startServiceOnPort(23456, (trialPort: Int) => {
          val socket = new ServerSocket(trialPort)
          socket.close()
          (null, trialPort)
        }, conf)._2
      }

Removing the check-then-act is not easy but we can reduce the chance of having the error by choosing random value for initial port instead of 23456.

Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #4337 from sarutak/SPARK-5559 and squashes the following commits:

16f109f [Kousuke Saruta] Added `require` to Utils#startServiceOnPort
c39d8b6 [Kousuke Saruta] Merge branch 'SPARK-5559' of github.com:sarutak/spark into SPARK-5559
1610ba2 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5559
33357e3 [Kousuke Saruta] Changed "findFreePort" method in MQTTStreamSuite and FlumeStreamSuite so that it can choose valid random port
a9029fe [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5559
9489ef9 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5559
8212e42 [Kousuke Saruta] Modified default port used in FlumeStreamSuite from 23456 to random value
---
 core/src/main/scala/org/apache/spark/util/Utils.scala        | 4 ++++
 .../org/apache/spark/streaming/flume/FlumeStreamSuite.scala  | 5 +++--
 .../org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala    | 4 +++-
 3 files changed, 10 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d9a671687a..0b5a914e7d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1876,6 +1876,10 @@ private[spark] object Utils extends Logging {
       startService: Int => (T, Int),
       conf: SparkConf,
       serviceName: String = ""): (T, Int) = {
+
+    require(startPort == 0 || (1024 <= startPort && startPort < 65536),
+      "startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.")
+
     val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
     val maxRetries = portMaxRetries(conf)
     for (offset <- 0 to maxRetries) {
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 322de7bf2f..51d273af8d 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -28,6 +28,7 @@ import scala.language.postfixOps
 import com.google.common.base.Charsets
 import org.apache.avro.ipc.NettyTransceiver
 import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.commons.lang3.RandomUtils
 import org.apache.flume.source.avro
 import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
 import org.jboss.netty.channel.ChannelPipeline
@@ -40,7 +41,6 @@ import org.scalatest.concurrent.Eventually._
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
-import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted}
 import org.apache.spark.util.Utils
 
 class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
@@ -76,7 +76,8 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
 
   /** Find a free port */
   private def findFreePort(): Int = {
-    Utils.startServiceOnPort(23456, (trialPort: Int) => {
+    val candidatePort = RandomUtils.nextInt(1024, 65536)
+    Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
       val socket = new ServerSocket(trialPort)
       socket.close()
       (null, trialPort)
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 0f3298af62..24d78ecb3a 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -25,6 +25,7 @@ import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.commons.lang3.RandomUtils
 import org.eclipse.paho.client.mqttv3._
 import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
@@ -113,7 +114,8 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
   }
 
   private def findFreePort(): Int = {
-    Utils.startServiceOnPort(23456, (trialPort: Int) => {
+    val candidatePort = RandomUtils.nextInt(1024, 65536)
+    Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
       val socket = new ServerSocket(trialPort)
       socket.close()
       (null, trialPort)
-- 
GitLab