Skip to content
Snippets Groups Projects
Commit 40a8fef4 authored by tmalaska's avatar tmalaska Committed by Tathagata Das
Browse files

[SPARK-1478].3: Upgrade FlumeInputDStream's FlumeReceiver to support FLUME-1915

This is a modified version of this PR https://github.com/apache/spark/pull/1168 done by @tmalaska
Adds MIMA binary check exclusions.

Author: tmalaska <ted.malaska@cloudera.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #1347 from tdas/FLUME-1915 and squashes the following commits:

96065df [Tathagata Das] Added Mima exclusion for FlumeReceiver.
41d5338 [tmalaska] Address line 57 that was too long
12617e5 [tmalaska] SPARK-1478: Upgrade FlumeInputDStream's Flume...
parent 369aa84e
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume ...@@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.io.{ObjectInput, ObjectOutput, Externalizable} import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.concurrent.Executors
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.reflect.ClassTag import scala.reflect.ClassTag
...@@ -29,24 +30,32 @@ import org.apache.flume.source.avro.AvroFlumeEvent ...@@ -29,24 +30,32 @@ import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer import org.apache.avro.ipc.NettyServer
import org.apache.spark.Logging
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.dstream._
import org.apache.spark.Logging import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.receiver.Receiver
import org.jboss.netty.channel.ChannelPipelineFactory
import org.jboss.netty.channel.Channels
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.ChannelFactory
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.compression._
import org.jboss.netty.handler.execution.ExecutionHandler
private[streaming] private[streaming]
class FlumeInputDStream[T: ClassTag]( class FlumeInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext, @transient ssc_ : StreamingContext,
host: String, host: String,
port: Int, port: Int,
storageLevel: StorageLevel storageLevel: StorageLevel,
enableDecompression: Boolean
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) { ) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
override def getReceiver(): Receiver[SparkFlumeEvent] = { override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel) new FlumeReceiver(host, port, storageLevel, enableDecompression)
} }
} }
...@@ -134,22 +143,71 @@ private[streaming] ...@@ -134,22 +143,71 @@ private[streaming]
class FlumeReceiver( class FlumeReceiver(
host: String, host: String,
port: Int, port: Int,
storageLevel: StorageLevel storageLevel: StorageLevel,
enableDecompression: Boolean
) extends Receiver[SparkFlumeEvent](storageLevel) with Logging { ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
lazy val responder = new SpecificResponder( lazy val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this)) classOf[AvroSourceProtocol], new FlumeEventServer(this))
lazy val server = new NettyServer(responder, new InetSocketAddress(host, port)) var server: NettyServer = null
private def initServer() = {
if (enableDecompression) {
val channelFactory = new NioServerSocketChannelFactory
(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
val channelPipelieFactory = new CompressionChannelPipelineFactory()
new NettyServer(
responder,
new InetSocketAddress(host, port),
channelFactory,
channelPipelieFactory,
null)
} else {
new NettyServer(responder, new InetSocketAddress(host, port))
}
}
def onStart() { def onStart() {
server.start() synchronized {
if (server == null) {
server = initServer()
server.start()
} else {
logWarning("Flume receiver being asked to start more then once with out close")
}
}
logInfo("Flume receiver started") logInfo("Flume receiver started")
} }
def onStop() { def onStop() {
server.close() synchronized {
if (server != null) {
server.close()
server = null
}
}
logInfo("Flume receiver stopped") logInfo("Flume receiver stopped")
} }
override def preferredLocation = Some(host) override def preferredLocation = Some(host)
/** A Netty Pipeline factory that will decompress incoming data from
* and the Netty client and compress data going back to the client.
*
* The compression on the return is required because Flume requires
* a successful response to indicate it can remove the event/batch
* from the configured channel
*/
private[streaming]
class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
def getPipeline() = {
val pipeline = Channels.pipeline()
val encoder = new ZlibEncoder(6)
pipeline.addFirst("deflater", encoder)
pipeline.addFirst("inflater", new ZlibDecoder())
pipeline
}
}
} }
...@@ -36,7 +36,27 @@ object FlumeUtils { ...@@ -36,7 +36,27 @@ object FlumeUtils {
port: Int, port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent] = { ): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel) createStream(ssc, hostname, port, storageLevel, false)
}
/**
* Create a input stream from a Flume source.
* @param ssc StreamingContext object
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
* @param enableDecompression should netty server decompress input stream
*/
def createStream (
ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](
ssc, hostname, port, storageLevel, enableDecompression)
inputStream inputStream
} }
...@@ -66,6 +86,23 @@ object FlumeUtils { ...@@ -66,6 +86,23 @@ object FlumeUtils {
port: Int, port: Int,
storageLevel: StorageLevel storageLevel: StorageLevel
): JavaReceiverInputDStream[SparkFlumeEvent] = { ): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel) createStream(jssc.ssc, hostname, port, storageLevel, false)
}
/**
* Creates a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
* @param enableDecompression should netty server decompress input stream
*/
def createStream(
jssc: JavaStreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
} }
} }
...@@ -30,5 +30,7 @@ public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { ...@@ -30,5 +30,7 @@ public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345); JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345, JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2()); StorageLevel.MEMORY_AND_DISK_SER_2());
JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2(), false);
} }
} }
...@@ -33,15 +33,26 @@ import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuite ...@@ -33,15 +33,26 @@ import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuite
import org.apache.spark.streaming.util.ManualClock import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
class FlumeStreamSuite extends TestSuiteBase { import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.socket.SocketChannel
import org.jboss.netty.handler.codec.compression._
val testPort = 9999 class FlumeStreamSuite extends TestSuiteBase {
test("flume input stream") { test("flume input stream") {
runFlumeStreamTest(false, 9998)
}
test("flume input compressed stream") {
runFlumeStreamTest(true, 9997)
}
def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) {
// Set up the streaming context and input streams // Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration) val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] = val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK) FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer) val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
...@@ -52,8 +63,17 @@ class FlumeStreamSuite extends TestSuiteBase { ...@@ -52,8 +63,17 @@ class FlumeStreamSuite extends TestSuiteBase {
val input = Seq(1, 2, 3, 4, 5) val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000) Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
val client = SpecificRequestor.getClient( var client: AvroSourceProtocol = null;
classOf[AvroSourceProtocol], transceiver)
if (enableDecompression) {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol],
new NettyTransceiver(new InetSocketAddress("localhost", testPort),
new CompressionChannelFactory(6)));
} else {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver)
}
for (i <- 0 until input.size) { for (i <- 0 until input.size) {
val event = new AvroFlumeEvent val event = new AvroFlumeEvent
...@@ -64,6 +84,8 @@ class FlumeStreamSuite extends TestSuiteBase { ...@@ -64,6 +84,8 @@ class FlumeStreamSuite extends TestSuiteBase {
clock.addToTime(batchDuration.milliseconds) clock.addToTime(batchDuration.milliseconds)
} }
Thread.sleep(1000)
val startTime = System.currentTimeMillis() val startTime = System.currentTimeMillis()
while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size) logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
...@@ -85,4 +107,13 @@ class FlumeStreamSuite extends TestSuiteBase { ...@@ -85,4 +107,13 @@ class FlumeStreamSuite extends TestSuiteBase {
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
} }
} }
class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
override def newChannel(pipeline:ChannelPipeline) : SocketChannel = {
var encoder : ZlibEncoder = new ZlibEncoder(compressionLevel);
pipeline.addFirst("deflater", encoder);
pipeline.addFirst("inflater", new ZlibDecoder());
super.newChannel(pipeline);
}
}
} }
...@@ -64,6 +64,9 @@ object MimaExcludes { ...@@ -64,6 +64,9 @@ object MimaExcludes {
"org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$" "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$"
+ "createZero$1") + "createZero$1")
) ++ ) ++
Seq(
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
) ++
Seq( // Ignore some private methods in ALS. Seq( // Ignore some private methods in ALS.
ProblemFilters.exclude[MissingMethodProblem]( ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"), "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment