Skip to content
Snippets Groups Projects
Commit 8764fe36 authored by Sean Owen's avatar Sean Owen Committed by Tathagata Das
Browse files

SPARK-3744 [STREAMING] FlumeStreamSuite will fail during port contention

Since it looked quite easy, I took the liberty of making a quick PR that just uses `Utils.startServiceOnPort` to fix this. It works locally for me.

Author: Sean Owen <sowen@cloudera.com>

Closes #2601 from srowen/SPARK-3744 and squashes the following commits:

ddc9319 [Sean Owen] Avoid port contention in tests by retrying several ports for Flume stream
parent d3a3840e
No related branches found
No related tags found
No related merge requests found
......@@ -31,7 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
import org.apache.spark.util.Utils
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
......@@ -41,21 +41,26 @@ import org.jboss.netty.handler.codec.compression._
class FlumeStreamSuite extends TestSuiteBase {
test("flume input stream") {
runFlumeStreamTest(false, 9998)
runFlumeStreamTest(false)
}
test("flume input compressed stream") {
runFlumeStreamTest(true, 9997)
runFlumeStreamTest(true)
}
def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) {
def runFlumeStreamTest(enableDecompression: Boolean) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
val (flumeStream, testPort) =
Utils.startServiceOnPort(9997, (trialPort: Int) => {
val dstream = FlumeUtils.createStream(
ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
(dstream, trialPort)
})
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
ssc.start()
......@@ -63,13 +68,13 @@ class FlumeStreamSuite extends TestSuiteBase {
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
var client: AvroSourceProtocol = null;
var client: AvroSourceProtocol = null
if (enableDecompression) {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol],
new NettyTransceiver(new InetSocketAddress("localhost", testPort),
new CompressionChannelFactory(6)));
new CompressionChannelFactory(6)))
} else {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment