@@ -7,10 +7,45 @@ A "Spark Streaming" receiver can be a simple network stream, streams of messages
This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.
### Write a simple receiver
## A quick and naive walk-through
This starts with implementing [NetworkReceiver](#References)
### Write a simple receiver
Following is a simple socket text-stream receiver.
{% highlight scala %}
class SocketTextStreamReceiver(host: String,
port: Int
) extends NetworkReceiver[String] {
protected lazy val blocksGenerator: BlockGenerator =
new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)
protected def onStart() = {
blocksGenerator.start()
val socket = new Socket(host, port)
val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
var data: String = dataInputStream.readLine()
while (data != null) {
blocksGenerator += data
data = dataInputStream.readLine()
}
}
protected def onStop() {
blocksGenerator.stop()
}
}
{% endhighlight %}
All we did here is extended NetworkReceiver and called blockGenerator's API method (i.e. +=) to push our blocks of data. Please refer to scala-docs of NetworkReceiver for more details.
### An Actor as Receiver.
This starts with implementing [Actor](#References)
...
...
@@ -46,7 +81,16 @@ All we did here is mixed in trait Receiver and called pushBlock api method to pu
{% endhighlight %}
* Plug-in the actor configuration into the spark streaming context and create a DStream.
* Plug-in the custom receiver into the spark streaming context and create a DStream.
{% highlight scala %}
val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
"localhost", 8445))
{% endhighlight %}
* OR Plug-in the actor as receiver into the spark streaming context and create a DStream.
{% highlight scala %}
...
...
@@ -99,3 +143,4 @@ _A more comprehensive example is provided in the spark streaming examples_
## References
1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)