Skip to content
Snippets Groups Projects
Commit 8113dbda authored by Jacek Laskowski's avatar Jacek Laskowski Committed by Reynold Xin
Browse files

[STREAMING][DOCS][EXAMPLES] Minor fixes

Author: Jacek Laskowski <jacek@japila.pl>

Closes #10603 from jaceklaskowski/streaming-actor-custom-receiver.
parent fd1dcfaf
No related branches found
No related tags found
No related merge requests found
......@@ -257,9 +257,9 @@ The following table summarizes the characteristics of both types of receivers
## Implementing and Using a Custom Actor-based Receiver
Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can also be used to
Custom [Akka Actors](http://doc.akka.io/docs/akka/2.3.11/scala/actors.html) can also be used to
receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
trait can be applied on any Akka actor, which allows received data to be stored in Spark using
trait can be mixed in to any Akka actor, which allows received data to be stored in Spark using
`store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc.
{% highlight scala %}
......@@ -273,8 +273,8 @@ class CustomActor extends Actor with ActorHelper {
And a new input stream can be created with this custom actor as
{% highlight scala %}
// Assuming ssc is the StreamingContext
val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")
val ssc: StreamingContext = ...
val lines = ssc.actorStream[String](Props[CustomActor], "CustomReceiver")
{% endhighlight %}
See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala)
......
......@@ -62,15 +62,13 @@ class FeederActor extends Actor {
}.start()
def receive: Receive = {
case SubscribeReceiver(receiverActor: ActorRef) =>
println("received subscribe from %s".format(receiverActor.toString))
receivers = LinkedList(receiverActor) ++ receivers
receivers = LinkedList(receiverActor) ++ receivers
case UnsubscribeReceiver(receiverActor: ActorRef) =>
println("received unsubscribe from %s".format(receiverActor.toString))
receivers = receivers.dropWhile(x => x eq receiverActor)
receivers = receivers.dropWhile(x => x eq receiverActor)
}
}
......@@ -129,9 +127,9 @@ object FeederActor {
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
*
* To run this example locally, you may run Feeder Actor as
* `$ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999`
* `$ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.0.1 9999`
* and then run the example
* `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount 127.0.1.1 9999`
* `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount 127.0.0.1 9999`
*/
object ActorWordCount {
def main(args: Array[String]) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment