Skip to content
Snippets Groups Projects
Commit 4be1cdc8 authored by Tathagata Das's avatar Tathagata Das
Browse files

Merge pull request #5 from radlab/flume-integration

Flume integration
parents e4272160 3e796bdd
No related branches found
No related tags found
No related merge requests found
Showing
with 280 additions and 10 deletions
...@@ -2,6 +2,7 @@ package spark ...@@ -2,6 +2,7 @@ package spark
import scala.collection.immutable.NumericRange import scala.collection.immutable.NumericRange
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.Map
private[spark] class ParallelCollectionSplit[T: ClassManifest]( private[spark] class ParallelCollectionSplit[T: ClassManifest](
val rddId: Long, val rddId: Long,
...@@ -24,7 +25,8 @@ private[spark] class ParallelCollectionSplit[T: ClassManifest]( ...@@ -24,7 +25,8 @@ private[spark] class ParallelCollectionSplit[T: ClassManifest](
private[spark] class ParallelCollection[T: ClassManifest]( private[spark] class ParallelCollection[T: ClassManifest](
@transient sc : SparkContext, @transient sc : SparkContext,
@transient data: Seq[T], @transient data: Seq[T],
numSlices: Int) numSlices: Int,
locationPrefs : Map[Int,Seq[String]])
extends RDD[T](sc, Nil) { extends RDD[T](sc, Nil) {
// TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets // TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets
// cached. It might be worthwhile to write the data to a file in the DFS and read it in the split // cached. It might be worthwhile to write the data to a file in the DFS and read it in the split
...@@ -40,7 +42,12 @@ private[spark] class ParallelCollection[T: ClassManifest]( ...@@ -40,7 +42,12 @@ private[spark] class ParallelCollection[T: ClassManifest](
override def compute(s: Split) = s.asInstanceOf[ParallelCollectionSplit[T]].iterator override def compute(s: Split) = s.asInstanceOf[ParallelCollectionSplit[T]].iterator
override def preferredLocations(s: Split): Seq[String] = Nil override def preferredLocations(s: Split): Seq[String] = {
locationPrefs.get(splits_.indexOf(s)) match {
case Some(s) => s
case _ => Nil
}
}
} }
private object ParallelCollection { private object ParallelCollection {
......
...@@ -194,7 +194,7 @@ class SparkContext( ...@@ -194,7 +194,7 @@ class SparkContext(
/** Distribute a local Scala collection to form an RDD. */ /** Distribute a local Scala collection to form an RDD. */
def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
new ParallelCollection[T](this, seq, numSlices) new ParallelCollection[T](this, seq, numSlices, Map[Int, Seq[String]]())
} }
/** Distribute a local Scala collection to form an RDD. */ /** Distribute a local Scala collection to form an RDD. */
...@@ -202,6 +202,14 @@ class SparkContext( ...@@ -202,6 +202,14 @@ class SparkContext(
parallelize(seq, numSlices) parallelize(seq, numSlices)
} }
/** Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item. */
def makeRDD[T: ClassManifest](seq: Seq[(T, Seq[String])]): RDD[T] = {
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollection[T](this, seq.map(_._1), seq.size, indexToPrefs)
}
/** /**
* Read a text file from HDFS, a local file system (available on all nodes), or any * Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings. * Hadoop-supported file system URI, and return it as an RDD of Strings.
......
...@@ -91,7 +91,8 @@ object SparkBuild extends Build { ...@@ -91,7 +91,8 @@ object SparkBuild extends Build {
"org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011", "org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011",
"org.scalatest" %% "scalatest" % "1.6.1" % "test", "org.scalatest" %% "scalatest" % "1.6.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.9" % "test", "org.scalacheck" %% "scalacheck" % "1.9" % "test",
"com.novocode" % "junit-interface" % "0.8" % "test" "com.novocode" % "junit-interface" % "0.8" % "test",
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile"
), ),
parallelExecution := false, parallelExecution := false,
/* Workaround for issue #206 (fixed after SBT 0.11.0) */ /* Workaround for issue #206 (fixed after SBT 0.11.0) */
......
package spark.streaming
import java.io.{ObjectInput, ObjectOutput, Externalizable}
import spark.storage.StorageLevel
import org.apache.flume.source.avro.AvroSourceProtocol
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer
import java.net.InetSocketAddress
import collection.JavaConversions._
import spark.Utils
import java.nio.ByteBuffer
class FlumeInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = {
new FlumeReceiver(id, host, port, storageLevel)
}
}
/**
* A wrapper class for AvroFlumeEvent's with a custom serialization format.
*
* This is necessary because AvroFlumeEvent uses inner data structures
* which are not serializable.
*/
class SparkFlumeEvent() extends Externalizable {
var event : AvroFlumeEvent = new AvroFlumeEvent()
/* De-serialize from bytes. */
def readExternal(in: ObjectInput) {
val bodyLength = in.readInt()
val bodyBuff = new Array[Byte](bodyLength)
in.read(bodyBuff)
val numHeaders = in.readInt()
val headers = new java.util.HashMap[CharSequence, CharSequence]
for (i <- 0 until numHeaders) {
val keyLength = in.readInt()
val keyBuff = new Array[Byte](keyLength)
in.read(keyBuff)
val key : String = Utils.deserialize(keyBuff)
val valLength = in.readInt()
val valBuff = new Array[Byte](valLength)
in.read(valBuff)
val value : String = Utils.deserialize(valBuff)
headers.put(key, value)
}
event.setBody(ByteBuffer.wrap(bodyBuff))
event.setHeaders(headers)
}
/* Serialize to bytes. */
def writeExternal(out: ObjectOutput) {
val body = event.getBody.array()
out.writeInt(body.length)
out.write(body)
val numHeaders = event.getHeaders.size()
out.writeInt(numHeaders)
for ((k, v) <- event.getHeaders) {
val keyBuff = Utils.serialize(k.toString)
out.writeInt(keyBuff.length)
out.write(keyBuff)
val valBuff = Utils.serialize(v.toString)
out.writeInt(valBuff.length)
out.write(valBuff)
}
}
}
object SparkFlumeEvent {
def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
val event = new SparkFlumeEvent
event.event = in
event
}
}
/** A simple server that implements Flume's Avro protocol. */
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)
Status.OK
}
override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
events.foreach (event =>
receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK
}
}
/** A NetworkReceiver which listens for events using the
* Flume Avro interface.*/
class FlumeReceiver(
streamId: Int,
host: String,
port: Int,
storageLevel: StorageLevel
) extends NetworkReceiver[SparkFlumeEvent](streamId) {
lazy val dataHandler = new DataHandler(this, storageLevel)
protected override def onStart() {
val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this));
val server = new NettyServer(responder, new InetSocketAddress(host, port));
dataHandler.start()
server.start()
logInfo("Flume receiver started")
}
protected override def onStop() {
dataHandler.stop()
logInfo("Flume receiver stopped")
}
override def getLocationPreference = Some(host)
}
\ No newline at end of file
...@@ -8,7 +8,6 @@ import spark.streaming.util.{RecurringTimer, SystemClock} ...@@ -8,7 +8,6 @@ import spark.streaming.util.{RecurringTimer, SystemClock}
import spark.storage.StorageLevel import spark.storage.StorageLevel
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.concurrent.ArrayBlockingQueue
import akka.actor.{Props, Actor} import akka.actor.{Props, Actor}
import akka.pattern.ask import akka.pattern.ask
...@@ -63,6 +62,9 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri ...@@ -63,6 +62,9 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
/** This method will be called to stop receiving data. */ /** This method will be called to stop receiving data. */
protected def onStop() protected def onStop()
/** This method conveys a placement preference (hostname) for this receiver. */
def getLocationPreference() : Option[String] = None
/** /**
* This method starts the receiver. First is accesses all the lazy members to * This method starts the receiver. First is accesses all the lazy members to
* materialize them. Then it calls the user-defined onStart() method to start * materialize them. Then it calls the user-defined onStart() method to start
...@@ -151,6 +153,4 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri ...@@ -151,6 +153,4 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
tracker ! DeregisterReceiver(streamId, msg) tracker ! DeregisterReceiver(streamId, msg)
} }
} }
} }
...@@ -98,7 +98,18 @@ class NetworkInputTracker( ...@@ -98,7 +98,18 @@ class NetworkInputTracker(
def startReceivers() { def startReceivers() {
val receivers = networkInputStreams.map(_.createReceiver()) val receivers = networkInputStreams.map(_.createReceiver())
val tempRDD = ssc.sc.makeRDD(receivers, receivers.size)
// Right now, we only honor preferences if all receivers have them
val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _)
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences)
}
else {
ssc.sc.makeRDD(receivers, receivers.size)
}
val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => { val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => {
if (!iterator.hasNext) { if (!iterator.hasNext) {
......
...@@ -31,6 +31,8 @@ class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: S ...@@ -31,6 +31,8 @@ class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: S
var blockPushingThread: Thread = null var blockPushingThread: Thread = null
override def getLocationPreference = None
def onStart() { def onStart() {
// Open a socket to the target address and keep reading from it // Open a socket to the target address and keep reading from it
logInfo("Connecting to " + host + ":" + port) logInfo("Connecting to " + host + ":" + port)
......
...@@ -34,6 +34,8 @@ class SocketReceiver[T: ClassManifest]( ...@@ -34,6 +34,8 @@ class SocketReceiver[T: ClassManifest](
lazy protected val dataHandler = new DataHandler(this, storageLevel) lazy protected val dataHandler = new DataHandler(this, storageLevel)
override def getLocationPreference = None
protected def onStart() { protected def onStart() {
logInfo("Connecting to " + host + ":" + port) logInfo("Connecting to " + host + ":" + port)
val socket = new Socket(host, port) val socket = new Socket(host, port)
......
...@@ -15,6 +15,7 @@ import org.apache.hadoop.io.LongWritable ...@@ -15,6 +15,7 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import java.util.UUID import java.util.UUID
import spark.util.MetadataCleaner import spark.util.MetadataCleaner
...@@ -166,6 +167,16 @@ class StreamingContext private ( ...@@ -166,6 +167,16 @@ class StreamingContext private (
inputStream inputStream
} }
def flumeStream (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel)
graph.addInputStream(inputStream)
inputStream
}
def rawNetworkStream[T: ClassManifest]( def rawNetworkStream[T: ClassManifest](
hostname: String, hostname: String,
port: Int, port: Int,
......
package spark.streaming.examples
import spark.util.IntParam
import spark.storage.StorageLevel
import spark.streaming._
/**
* Produce a streaming count of events received from Flume.
*
* This should be used in conjunction with an AvroSink in Flume. It will start
* an Avro server on at the request host:port address and listen for requests.
* Your Flume AvroSink should be pointed to this address.
*
* Usage: FlumeEventCount <master> <host> <port>
*
* <master> is a Spark master URL
* <host> is the host the Flume receiver will be started on - a receiver
* creates a server and listens for flume events.
* <port> is the port the Flume receiver will listen on.
*/
object FlumeEventCount {
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println(
"Usage: FlumeEventCount <master> <host> <port>")
System.exit(1)
}
val Array(master, host, IntParam(port)) = args
val batchInterval = Milliseconds(2000)
// Create the context and set the batch size
val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval)
// Create a flume stream
val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)
// Print out the count of events received from this server in each batch
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
ssc.start()
}
}
package spark.streaming package spark.streaming
import java.net.{SocketException, Socket, ServerSocket} import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter} import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{TimeUnit, ArrayBlockingQueue} import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}
import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
...@@ -10,7 +10,14 @@ import spark.Logging ...@@ -10,7 +10,14 @@ import spark.Logging
import scala.util.Random import scala.util.Random
import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import org.apache.flume.source.avro.AvroSourceProtocol
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.{specific, NettyTransceiver}
import org.apache.avro.ipc.specific.SpecificRequestor
import java.nio.ByteBuffer
import collection.JavaConversions._
import java.nio.charset.Charset
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
...@@ -123,6 +130,54 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { ...@@ -123,6 +130,54 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
ssc.stop() ssc.stop()
} }
test("flume input stream") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework, batchDuration)
val flumeStream = ssc.flumeStream("localhost", 33333, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 33333));
val client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver);
for (i <- 0 until input.size) {
val event = new AvroFlumeEvent
event.setBody(ByteBuffer.wrap(input(i).toString.getBytes()))
event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
client.append(event)
Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
}
val startTime = System.currentTimeMillis()
while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
Thread.sleep(100)
}
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping context")
ssc.stop()
val decoder = Charset.forName("UTF-8").newDecoder()
assert(outputBuffer.size === input.length)
for (i <- 0 until outputBuffer.size) {
assert(outputBuffer(i).size === 1)
val str = decoder.decode(outputBuffer(i).head.event.getBody)
assert(str.toString === input(i).toString)
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
}
}
test("file input stream") { test("file input stream") {
// Create a temporary directory // Create a temporary directory
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment