diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 5476c00d020cb2cfece90853d7e010d08055c00d..dfa343bf94f90684eb2edf91c16ef3d7fdeb9803 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -7,10 +7,45 @@ A "Spark Streaming" receiver can be a simple network stream, streams of messages This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application. +### Write a simple receiver -## A quick and naive walk-through +This starts with implementing [NetworkReceiver](#References) -### Write a simple receiver +Following is a simple socket text-stream receiver. + +{% highlight scala %} + + class SocketTextStreamReceiver(host: String, + port: Int + ) extends NetworkReceiver[String] { + + protected lazy val blocksGenerator: BlockGenerator = + new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2) + + protected def onStart() = { + blocksGenerator.start() + val socket = new Socket(host, port) + val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) + var data: String = dataInputStream.readLine() + while (data != null) { + blocksGenerator += data + data = dataInputStream.readLine() + } + } + + protected def onStop() { + blocksGenerator.stop() + } + + } + +{% endhighlight %} + + +All we did here is extended NetworkReceiver and called blockGenerator's API method (i.e. +=) to push our blocks of data. Please refer to scala-docs of NetworkReceiver for more details. + + +### An Actor as Receiver. This starts with implementing [Actor](#References) @@ -46,7 +81,16 @@ All we did here is mixed in trait Receiver and called pushBlock api method to pu {% endhighlight %} -* Plug-in the actor configuration into the spark streaming context and create a DStream. +* Plug-in the custom receiver into the spark streaming context and create a DStream. + +{% highlight scala %} + + val lines = ssc.networkStream[String](new SocketTextStreamReceiver( + "localhost", 8445)) + +{% endhighlight %} + +* OR Plug-in the actor as receiver into the spark streaming context and create a DStream. {% highlight scala %} @@ -99,3 +143,4 @@ _A more comprehensive example is provided in the spark streaming examples_ ## References 1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html) +2.[NetworkReceiver](http://spark-project.org/docs/latest/api/streaming/index.html#spark.streaming.dstream.NetworkReceiver) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 8cd1b0cd66043cfc7ed29bf850ab482f471560b7..a74c17bdb7a0ae20965e4d24bd2727fe6710d84a 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -301,6 +301,9 @@ dstream.checkpoint(checkpointInterval) // checkpointInterval must be a multiple For DStreams that must be checkpointed (that is, DStreams created by `updateStateByKey` and `reduceByKeyAndWindow` with inverse function), the checkpoint interval of the DStream is by default set to a multiple of the DStream's sliding interval such that its at least 10 seconds. +## Customizing Receiver +Spark comes with a built in support for most common usage scenarios where input stream source can be either a network socket stream to support for a few message queues. Apart from that it is also possible to supply your own custom receiver via a convenient API. Find more details at [Custom Receiver Guide](streaming-custom-receivers.html) + # Performance Tuning Getting the best performance of a Spark Streaming application on a cluster requires a bit of tuning. This section explains a number of the parameters and configurations that can tuned to improve the performance of you application. At a high level, you need to consider two things: <ol> diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index ffd656227ddce65e5065260bc936eeebf65167b6..62c95b573aaa3d04f75bb966680a58f43d02b9e0 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -183,6 +183,7 @@ class StreamingContext private ( /** * Create an input stream with any arbitrary user implemented network receiver. + * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html * @param receiver Custom implementation of NetworkReceiver */ def networkStream[T: ClassManifest]( @@ -195,6 +196,7 @@ class StreamingContext private ( /** * Create an input stream with any arbitrary user implemented actor receiver. + * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html * @param props Props object defining creation of the actor * @param name Name of the actor * @param storageLevel RDD storage level. Defaults to memory-only. diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 344b41c4d0f02c7442f07e243e784a30c742e8dd..1db0a69a2f109869a6a70b0d474565ef6fb66a50 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -145,8 +145,8 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log } /** - * Stops the receiver and reports to exception to the tracker. - * This should be called whenever an exception has happened on any thread + * Stops the receiver and reports exception to the tracker. + * This should be called whenever an exception is to be handled on any thread * of the receiver. */ protected def stopOnError(e: Exception) { @@ -202,7 +202,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log } /** - * Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into + * Batches objects created by a [[spark.streaming.dstream.NetworkReceiver]] and puts them into * appropriately named blocks at regular intervals. This class starts two threads, * one to periodically start a new batch and prepare the previous batch of as a block, * the other to push the blocks into the block manager. diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala index 2d9937eab81d650616b598ae9757db541e7e79e5..abeeff11b93b52b6d5fdc185a342eb7020687a9d 100644 --- a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala @@ -45,6 +45,8 @@ object ReceiverSupervisorStrategy { * A receiver trait to be mixed in with your Actor to gain access to * pushBlock API. * + * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html + * * @example {{{ * class MyActor extends Actor with Receiver{ * def receive {