diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 33235d150b4a565da6e2eac5b0c4d8a39fb89f27..13943ed5442b9e14fe3d918b073bfa42feea202b 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -17,103 +17,141 @@ package org.apache.spark.streaming.flume -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} - -import java.net.InetSocketAddress +import java.net.{InetSocketAddress, ServerSocket} import java.nio.ByteBuffer import java.nio.charset.Charset +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} +import scala.concurrent.duration._ +import scala.language.postfixOps + import org.apache.avro.ipc.NettyTransceiver import org.apache.avro.ipc.specific.SpecificRequestor +import org.apache.flume.source.avro import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol} +import org.jboss.netty.channel.ChannelPipeline +import org.jboss.netty.channel.socket.SocketChannel +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory +import org.jboss.netty.handler.codec.compression._ +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +import org.scalatest.concurrent.Eventually._ +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase} -import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream} +import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted} import org.apache.spark.util.Utils -import org.jboss.netty.channel.ChannelPipeline -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory -import org.jboss.netty.channel.socket.SocketChannel -import org.jboss.netty.handler.codec.compression._ +class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging { + val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite") + + var ssc: StreamingContext = null + var transceiver: NettyTransceiver = null -class FlumeStreamSuite extends TestSuiteBase { + after { + if (ssc != null) { + ssc.stop() + } + if (transceiver != null) { + transceiver.close() + } + } test("flume input stream") { - runFlumeStreamTest(false) + testFlumeStream(testCompression = false) } test("flume input compressed stream") { - runFlumeStreamTest(true) + testFlumeStream(testCompression = true) + } + + /** Run test on flume stream */ + private def testFlumeStream(testCompression: Boolean): Unit = { + val input = (1 to 100).map { _.toString } + val testPort = findFreePort() + val outputBuffer = startContext(testPort, testCompression) + writeAndVerify(input, testPort, outputBuffer, testCompression) + } + + /** Find a free port */ + private def findFreePort(): Int = { + Utils.startServiceOnPort(23456, (trialPort: Int) => { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) + })._2 } - - def runFlumeStreamTest(enableDecompression: Boolean) { - // Set up the streaming context and input streams - val ssc = new StreamingContext(conf, batchDuration) - val (flumeStream, testPort) = - Utils.startServiceOnPort(9997, (trialPort: Int) => { - val dstream = FlumeUtils.createStream( - ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression) - (dstream, trialPort) - }) + /** Setup and start the streaming context */ + private def startContext( + testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = { + ssc = new StreamingContext(conf, Milliseconds(200)) + val flumeStream = FlumeUtils.createStream( + ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression) val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) outputStream.register() ssc.start() + outputBuffer + } - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - val input = Seq(1, 2, 3, 4, 5) - Thread.sleep(1000) - val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) - var client: AvroSourceProtocol = null - - if (enableDecompression) { - client = SpecificRequestor.getClient( - classOf[AvroSourceProtocol], - new NettyTransceiver(new InetSocketAddress("localhost", testPort), - new CompressionChannelFactory(6))) - } else { - client = SpecificRequestor.getClient( - classOf[AvroSourceProtocol], transceiver) - } + /** Send data to the flume receiver and verify whether the data was received */ + private def writeAndVerify( + input: Seq[String], + testPort: Int, + outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]], + enableCompression: Boolean + ) { + val testAddress = new InetSocketAddress("localhost", testPort) - for (i <- 0 until input.size) { + val inputEvents = input.map { item => val event = new AvroFlumeEvent - event.setBody(ByteBuffer.wrap(input(i).toString.getBytes("utf-8"))) + event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8"))) event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) - client.append(event) - Thread.sleep(500) - clock.addToTime(batchDuration.milliseconds) + event } - Thread.sleep(1000) - - val startTime = System.currentTimeMillis() - while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size) - Thread.sleep(100) + eventually(timeout(10 seconds), interval(100 milliseconds)) { + // if last attempted transceiver had succeeded, close it + if (transceiver != null) { + transceiver.close() + transceiver = null + } + + // Create transceiver + transceiver = { + if (enableCompression) { + new NettyTransceiver(testAddress, new CompressionChannelFactory(6)) + } else { + new NettyTransceiver(testAddress) + } + } + + // Create Avro client with the transceiver + val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver) + client should not be null + + // Send data + val status = client.appendBatch(inputEvents.toList) + status should be (avro.Status.OK) } - Thread.sleep(1000) - val timeTaken = System.currentTimeMillis() - startTime - assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") - logInfo("Stopping context") - ssc.stop() - - val decoder = Charset.forName("UTF-8").newDecoder() - - assert(outputBuffer.size === input.length) - for (i <- 0 until outputBuffer.size) { - assert(outputBuffer(i).size === 1) - val str = decoder.decode(outputBuffer(i).head.event.getBody) - assert(str.toString === input(i).toString) - assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") + + val decoder = Charset.forName("UTF-8").newDecoder() + eventually(timeout(10 seconds), interval(100 milliseconds)) { + val outputEvents = outputBuffer.flatten.map { _.event } + outputEvents.foreach { + event => + event.getHeaders.get("test") should be("header") + } + val output = outputEvents.map(event => decoder.decode(event.getBody()).toString) + output should be (input) } } - class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory { + /** Class to create socket channel with compression */ + private class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory { override def newChannel(pipeline: ChannelPipeline): SocketChannel = { val encoder = new ZlibEncoder(compressionLevel) pipeline.addFirst("deflater", encoder)