Skip to content
Snippets Groups Projects
Commit 44460ba5 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

HOTFIX: Fix concurrency issue in FlumePollingStreamSuite.

This has been failing on master. One possible cause is that the port
gets contended if multiple test runs happen concurrently and they
hit this test at the same time. Since this test takes a long time
(60 seconds) that's very plausible. This patch randomizes the port
used in this test to avoid contention.
parent 25cad6ad
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume ...@@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.concurrent.{Callable, ExecutorCompletionService, Executors} import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
import java.util.Random
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
...@@ -37,13 +38,16 @@ import org.apache.spark.streaming.flume.sink._ ...@@ -37,13 +38,16 @@ import org.apache.spark.streaming.flume.sink._
class FlumePollingStreamSuite extends TestSuiteBase { class FlumePollingStreamSuite extends TestSuiteBase {
val testPort = 9999 val random = new Random()
/** Return a port in the ephemeral range. */
def getTestPort = random.nextInt(16382) + 49152
val batchCount = 5 val batchCount = 5
val eventsPerBatch = 100 val eventsPerBatch = 100
val totalEventsPerChannel = batchCount * eventsPerBatch val totalEventsPerChannel = batchCount * eventsPerBatch
val channelCapacity = 5000 val channelCapacity = 5000
test("flume polling test") { test("flume polling test") {
val testPort = getTestPort
// Set up the streaming context and input streams // Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration) val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
...@@ -77,6 +81,7 @@ class FlumePollingStreamSuite extends TestSuiteBase { ...@@ -77,6 +81,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
} }
test("flume polling test multiple hosts") { test("flume polling test multiple hosts") {
val testPort = getTestPort
// Set up the streaming context and input streams // Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration) val ssc = new StreamingContext(conf, batchDuration)
val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _)) val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment