Skip to content
Snippets Groups Projects
Commit 95470a03 authored by Hari Shreedharan's avatar Hari Shreedharan Committed by Tathagata Das
Browse files

[HOTFIX][STREAMING] Allow the JVM/Netty to decide which port to bind to in Flume Polling Tests.

Author: Hari Shreedharan <harishreedharan@gmail.com>

Closes #1820 from harishreedharan/use-free-ports and squashes the following commits:

b939067 [Hari Shreedharan] Remove unused import.
67856a8 [Hari Shreedharan] Remove findFreePort.
0ea51d1 [Hari Shreedharan] Make some changes to getPort to use map on the serverOpt.
1fb0283 [Hari Shreedharan] Merge branch 'master' of https://github.com/apache/spark into use-free-ports
b351651 [Hari Shreedharan] Allow Netty to choose port, and query it to decide the port to bind to. Leaving findFreePort as is, if other tests want to use it at some point.
e6c9620 [Hari Shreedharan] Making sure the second sink uses the correct port.
11c340d [Hari Shreedharan] Add info about race condition to scaladoc.
e89d135 [Hari Shreedharan] Adding Scaladoc.
6013bb0 [Hari Shreedharan] [STREAMING] Find free ports to use before attempting to create Flume Sink in Flume Polling Suite
parent 99243288
No related branches found
No related tags found
No related merge requests found
......@@ -131,6 +131,14 @@ class SparkSink extends AbstractSink with Logging with Configurable {
blockingLatch.await()
Status.BACKOFF
}
private[flume] def getPort(): Int = {
serverOpt
.map(_.getPort)
.getOrElse(
throw new RuntimeException("Server was not started!")
)
}
}
/**
......
......@@ -22,6 +22,8 @@ import java.net.InetSocketAddress
import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
import java.util.Random
import org.apache.spark.TestUtils
import scala.collection.JavaConversions._
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
......@@ -39,9 +41,6 @@ import org.apache.spark.util.Utils
class FlumePollingStreamSuite extends TestSuiteBase {
val random = new Random()
/** Return a port in the ephemeral range. */
def getTestPort = random.nextInt(16382) + 49152
val batchCount = 5
val eventsPerBatch = 100
val totalEventsPerChannel = batchCount * eventsPerBatch
......@@ -77,17 +76,6 @@ class FlumePollingStreamSuite extends TestSuiteBase {
}
private def testFlumePolling(): Unit = {
val testPort = getTestPort
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)),
StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
// Start the channel and sink.
val context = new Context()
context.put("capacity", channelCapacity.toString)
......@@ -98,10 +86,19 @@ class FlumePollingStreamSuite extends TestSuiteBase {
val sink = new SparkSink()
context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
Configurables.configure(sink, context)
sink.setChannel(channel)
sink.start()
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", sink.getPort())),
StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
ssc.start()
writeAndVerify(Seq(channel), ssc, outputBuffer)
......@@ -111,18 +108,6 @@ class FlumePollingStreamSuite extends TestSuiteBase {
}
private def testFlumePollingMultipleHost(): Unit = {
val testPort = getTestPort
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
eventsPerBatch, 5)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
// Start the channel and sink.
val context = new Context()
context.put("capacity", channelCapacity.toString)
......@@ -136,17 +121,29 @@ class FlumePollingStreamSuite extends TestSuiteBase {
val sink = new SparkSink()
context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
Configurables.configure(sink, context)
sink.setChannel(channel)
sink.start()
val sink2 = new SparkSink()
context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort + 1))
context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
Configurables.configure(sink2, context)
sink2.setChannel(channel2)
sink2.start()
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val addresses = Seq(sink.getPort(), sink2.getPort()).map(new InetSocketAddress("localhost", _))
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
eventsPerBatch, 5)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
ssc.start()
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
assertChannelIsEmpty(channel)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment