Skip to content
Snippets Groups Projects
Commit 291dd47c authored by Prashant Sharma's avatar Prashant Sharma
Browse files

Taking FeederActor out as seperate program

parent 4496bf19
No related branches found
No related tags found
No related merge requests found
...@@ -49,25 +49,24 @@ class FeederActor extends Actor { ...@@ -49,25 +49,24 @@ class FeederActor extends Actor {
case SubscribeReceiver(receiverActor: ActorRef) => case SubscribeReceiver(receiverActor: ActorRef) =>
println("received subscribe from %s".format(receiverActor.toString)) println("received subscribe from %s".format(receiverActor.toString))
receivers = LinkedList(receiverActor) ++ receivers receivers = LinkedList(receiverActor) ++ receivers
case UnsubscribeReceiver(receiverActor: ActorRef) => case UnsubscribeReceiver(receiverActor: ActorRef) =>
println("received unsubscribe from %s".format(receiverActor.toString)) println("received unsubscribe from %s".format(receiverActor.toString))
receivers = receivers.dropWhile(x => x eq receiverActor) receivers = receivers.dropWhile(x => x eq receiverActor)
} }
} }
/** /**
* A sample actor as receiver is also simplest. This receiver actor * A sample actor as receiver, is also simplest. This receiver actor
* goes and subscribe to a typical publisher/feeder actor and receives * goes and subscribe to a typical publisher/feeder actor and receives
* data, thus it is important to have feeder running before this example * data.
* can be run.
* *
* @see [[spark.streaming.examples.FeederActor]] * @see [[spark.streaming.examples.FeederActor]]
*/ */
class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String) class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
extends Actor with Receiver { extends Actor with Receiver {
lazy private val remotePublisher = context.actorFor(urlOfPublisher) lazy private val remotePublisher = context.actorFor(urlOfPublisher)
...@@ -81,13 +80,42 @@ class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String) ...@@ -81,13 +80,42 @@ class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
} }
/**
* A sample feeder actor
*
* Usage: FeederActor <hostname> <port>
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on.
*/
object FeederActor {
def main(args: Array[String]) {
if(args.length < 2){
System.err.println(
"Usage: FeederActor <hostname> <port>\n"
)
System.exit(1)
}
val Seq(host, port) = args.toSeq
val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1
val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
println("Feeder started as:" + feeder)
actorSystem.awaitTermination();
}
}
/** /**
* A sample word count program demonstrating the use of plugging in * A sample word count program demonstrating the use of plugging in
* Actor as Receiver * Actor as Receiver
* Usage: ActorWordCount <master> <hostname> <port> * Usage: ActorWordCount <master> <hostname> <port>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would work on. * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
* *
* To run this example locally, you may run Feeder Actor as
* `$ ./run spark.streaming.examples.FeederActor 127.0.1.1 9999`
* and then run the example * and then run the example
* `$ ./run spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999` * `$ ./run spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
*/ */
...@@ -96,7 +124,7 @@ object ActorWordCount { ...@@ -96,7 +124,7 @@ object ActorWordCount {
if (args.length < 3) { if (args.length < 3) {
System.err.println( System.err.println(
"Usage: ActorWordCount <master> <hostname> <port>" + "Usage: ActorWordCount <master> <hostname> <port>" +
"In local mode, <master> should be 'local[n]' with n > 1") "In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1) System.exit(1)
} }
...@@ -104,21 +132,16 @@ object ActorWordCount { ...@@ -104,21 +132,16 @@ object ActorWordCount {
// Create the context and set the batch size // Create the context and set the batch size
val ssc = new StreamingContext(master, "ActorWordCount", val ssc = new StreamingContext(master, "ActorWordCount",
Seconds(10)) Seconds(10))
//Start feeder actor on this actor system. /*
val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1
val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
/*
* Following is the use of actorStream to plug in custom actor as receiver * Following is the use of actorStream to plug in custom actor as receiver
* *
* An important point to note: * An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility * Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e type of data received and InputDstream * to ensure the type safety, i.e type of data received and InputDstream
* should be same. * should be same.
* *
* For example: Both actorStream and SampleActorReceiver are parameterized * For example: Both actorStream and SampleActorReceiver are parameterized
* to same type to ensure type safety. * to same type to ensure type safety.
*/ */
...@@ -127,10 +150,9 @@ object ActorWordCount { ...@@ -127,10 +150,9 @@ object ActorWordCount {
Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format( Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format(
host, port.toInt))), "SampleReceiver") host, port.toInt))), "SampleReceiver")
//compute wordcount //compute wordcount
lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
ssc.start() ssc.start()
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment