diff --git a/docs/plugin-custom-receiver.md b/docs/plugin-custom-receiver.md
new file mode 100644
index 0000000000000000000000000000000000000000..41e6a17e2c31eb10b23b7d76a30b75acf0a27912
--- /dev/null
+++ b/docs/plugin-custom-receiver.md
@@ -0,0 +1,101 @@
+---
+layout: global
+title: Tutorial - Spark streaming, Plugging in a custom receiver.
+---
+
+A "Spark streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need.
+
+This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.
+
+
+## A quick and naive walk-through
+
+### Write a simple receiver
+
+This starts with implementing [Actor](#References)
+
+Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api.
+
+{% highlight scala %}
+
+       class SocketTextStreamReceiver (host:String,
+         port:Int,
+         bytesToString: ByteString => String) extends Actor {
+
+          override def preStart = IOManager(context.system).connect(host, port)
+
+          def receive = {
+           case IO.Read(socket, bytes) => context.parent ! Data(bytesToString(bytes))
+         }
+       }
+
+
+{% endhighlight %}
+
+
+_Please see implementations of NetworkReceiver for more generic NetworkReceivers._
+
+### A sample spark application
+
+* First create a Spark streaming context with master url and batchduration.
+
+{% highlight scala %}
+
+    val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
+      Seconds(batchDuration))
+
+{% endhighlight %}
+
+* Plug-in the actor configuration into the spark streaming context and create a DStream.
+
+{% highlight scala %}
+
+    val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver(
+      "localhost",8445, z => z.utf8String)),"SocketReceiver")
+
+{% endhighlight %}
+
+* Process it.
+
+{% highlight scala %}
+
+    val words = lines.flatMap(_.split(" "))
+    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+
+    wordCounts.print()
+    ssc.start()
+
+
+{% endhighlight %}
+
+* After processing it, stream can be tested using the netcat utility.
+
+     $ nc -l localhost 8445
+     hello world
+     hello hello
+
+
+## Multiple homogeneous/heterogeneous receivers.
+
+A DStream union operation is provided for taking union on multiple input streams.
+
+{% highlight scala %}
+
+    val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver(
+      "localhost",8445, z => z.utf8String)),"SocketReceiver")
+
+    // Another socket stream receiver
+    val lines2 = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver(
+      "localhost",8446, z => z.utf8String)),"SocketReceiver")
+
+    val union = lines.union(lines2)
+
+{% endhighlight %}
+
+Above stream can be easily process as described earlier.
+
+_A more comprehensive example is provided in the spark streaming examples_
+
+## References
+
+1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
diff --git a/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala
new file mode 100644
index 0000000000000000000000000000000000000000..ff05842c71f0d16082be66e72313afb88e2e414e
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala
@@ -0,0 +1,80 @@
+package spark.streaming.examples
+
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.Props
+import akka.actor.actorRef2Scala
+
+import spark.streaming.Seconds
+import spark.streaming.StreamingContext
+import spark.streaming.StreamingContext.toPairDStreamFunctions
+import spark.streaming.receivers.Data
+
+case class SubscribeReceiver(receiverActor: ActorRef)
+case class UnsubscribeReceiver(receiverActor: ActorRef)
+
+/**
+ * A sample actor as receiver is also simplest. This receiver actor
+ * goes and subscribe to a typical publisher/feeder actor and receives
+ * data, thus it is important to have feeder running before this example
+ * can be run. Please see FileTextStreamFeeder(sample) for feeder of this 
+ * receiver.
+ */
+class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
+  extends Actor {
+
+  lazy private val remotePublisher = context.actorFor(urlOfPublisher)
+
+  override def preStart = remotePublisher ! SubscribeReceiver(context.self)
+
+  def receive = {
+    case msg => context.parent ! Data(msg.asInstanceOf[T])
+  }
+
+  override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
+
+}
+
+/**
+ * A sample word count program demonstrating the use of plugging in
+ * AkkaActor as Receiver
+ */
+object AkkaActorWordCount {
+  def main(args: Array[String]) {
+    if (args.length < 4) {
+      System.err.println(
+        "Usage: AkkaActorWordCount <master> <batch-duration in seconds>" +
+          " <remoteAkkaHost> <remoteAkkaPort>" +
+          "In local mode, <master> should be 'local[n]' with n > 1")
+      System.exit(1)
+    }
+
+    val Seq(master, batchDuration, remoteAkkaHost, remoteAkkaPort) = args.toSeq
+
+    // Create the context and set the batch size
+    val ssc = new StreamingContext(master, "AkkaActorWordCount",
+      Seconds(batchDuration.toLong))
+
+    /* 
+     * Following is the use of pluggableActorStream to plug in custom actor as receiver
+     * 
+     * An important point to note:
+     * Since Actor may exist outside the spark framework, It is thus user's responsibility 
+     * to ensure the type safety, i.e type of data received and PluggableInputDstream 
+     * should be same.
+     * 
+     * For example: Both pluggableActorStream and SampleActorReceiver are parameterized
+     * to same type to ensure type safety.
+     */
+
+    val lines = ssc.pluggableActorStream[String](
+      Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format(
+        remoteAkkaHost, remoteAkkaPort.toInt))), "SampleReceiver")
+
+    //compute wordcount 
+    lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()
+
+    ssc.start()
+
+  }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala b/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f4c1b87f0e6a388fc8fa620e7dea7b7781357344
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala
@@ -0,0 +1,63 @@
+package spark.streaming.examples
+
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.mutable.LinkedList
+import scala.io.Source
+
+import akka.actor.{ Actor, ActorRef, actorRef2Scala }
+import akka.actor.Props
+
+import spark.util.AkkaUtils
+
+/**
+ * A feeder to which multiple message receiver (specified by "noOfReceivers")actors 
+ * subscribe and receive file(s)'s text as stream of messages. This is provided
+ * as a demonstration application for trying out Actor as receiver feature. Please see 
+ * SampleActorReceiver or AkkaActorWordCount example for details about the 
+ * receiver of this feeder.
+ */
+
+object FileTextStreamFeeder {
+
+  var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]()
+  var countdownLatch: CountDownLatch = _
+  def main(args: Array[String]) = args.toList match {
+
+    case host :: port :: noOfReceivers :: fileNames =>
+      val acs = AkkaUtils.createActorSystem("spark", host, port.toInt)._1
+      countdownLatch = new CountDownLatch(noOfReceivers.toInt)
+      val actor = acs.actorOf(Props(new FeederActor), "FeederActor")
+      countdownLatch.await() //wait for all the receivers to subscribe
+      for (fileName <- fileNames;line <- Source.fromFile(fileName).getLines) {
+        actor ! line
+      }
+      acs.awaitTermination();
+
+    case _ =>
+      System.err.println("Usage: FileTextStreamFeeder <hostname> <port> <no_of_receivers> <filenames>")
+      System.exit(1)
+  }
+
+  /**
+   * Sends the content to every receiver subscribed
+   */
+  class FeederActor extends Actor {
+
+    def receive: Receive = {
+
+      case SubscribeReceiver(receiverActor: ActorRef) =>
+        println("received subscribe from %s".format(receiverActor.toString))
+        receivers = LinkedList(receiverActor) ++ receivers
+        countdownLatch.countDown()
+
+      case UnsubscribeReceiver(receiverActor: ActorRef) =>
+        println("received unsubscribe from %s".format(receiverActor.toString))
+        receivers = receivers.dropWhile(x => x eq receiverActor)
+
+      case textMessage: String =>
+        receivers.foreach(_ ! textMessage)
+
+    }
+  }
+}
\ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 14500bdcb17a07ccf36e7cdf7f91f48606583a6e..cd7379da14c356a52636ec886eb5394e02179508 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -1,10 +1,15 @@
 package spark.streaming
 
+import akka.actor.Props
+
 import spark.streaming.dstream._
 
 import spark.{RDD, Logging, SparkEnv, SparkContext}
 import spark.storage.StorageLevel
 import spark.util.MetadataCleaner
+import spark.streaming.receivers.ActorReceiver
+import spark.streaming.receivers.Settings
+
 
 import scala.collection.mutable.Queue
 
@@ -134,6 +139,30 @@ class StreamingContext private (
 
   protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
 
+  /**
+   * Create an input stream with any arbitrary user implemented network receiver.
+   * @param receiver Custom implementation of NetworkReceiver
+   */
+  def pluggableNetworkStream[T: ClassManifest](
+    receiver: NetworkReceiver[T]): DStream[T] = {
+    val inputStream = new PluggableInputDStream[T](this,
+      receiver)
+    graph.addInputStream(inputStream)
+    inputStream
+  }
+
+  /**
+   * Create an input stream with any arbitrary user implemented akka actor receiver.
+   * @param props Props object defining creation of the actor
+   * @param name Name of the actor
+   * @param storageLevel RDD storage level. Defaults to memory-only.
+   */
+  def pluggableActorStream[T: ClassManifest](
+    props: Props, name: String, 
+    storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[T] = {
+    pluggableNetworkStream(new ActorReceiver(Settings(props, name, storageLevel)))
+  }                 	                                                                  
+
   /**
    * Create an input stream that pulls messages form a Kafka Broker.
    * @param hostname Zookeper hostname.
diff --git a/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala
new file mode 100644
index 0000000000000000000000000000000000000000..674f1059fe0b75cdb05c5354d0303b6c4810e02a
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala
@@ -0,0 +1,12 @@
+package spark.streaming.dstream
+
+import spark.streaming.StreamingContext
+
+class PluggableInputDStream[T: ClassManifest](
+  @transient ssc_ : StreamingContext,
+  receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) {
+
+  def getReceiver(): NetworkReceiver[T] = {
+    receiver
+  }
+}
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f24c99ad705d67a03a9ede599d0c4f98de69e437
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
@@ -0,0 +1,111 @@
+package spark.streaming.receivers
+
+import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy }
+import akka.actor.{ actorRef2Scala, ActorRef }
+import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
+
+import spark.storage.StorageLevel
+import spark.streaming.dstream.NetworkReceiver
+
+import java.util.concurrent.atomic.AtomicInteger
+
+/** A helper with set of defaults for supervisor strategy **/
+object ReceiverSupervisorStrategy {
+
+  import akka.util.duration._
+  import akka.actor.SupervisorStrategy._
+
+  val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
+    15 millis) {
+    case _: RuntimeException ⇒ Restart
+    case _: Exception ⇒ Escalate
+  }
+}
+
+/**
+ * Settings for configuring the actor creation or defining supervisor strategy
+ */
+case class Settings(props: Props,
+  name: String,
+  storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+  supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy)
+
+/**
+ * Statistcs for querying the supervisor about state of workers
+ */
+case class Statistcs(numberOfMsgs: Int,
+  numberOfWorkers: Int,
+  numberOfHiccups: Int,
+  otherInfo: String)
+
+/** Case class to receive data sent by child actors **/
+case class Data[T: ClassManifest](data: T)
+
+/**
+ * Provides Actors as receivers for receiving stream.
+ *
+ * As Actors can also be used to receive data from almost any stream source.
+ * A nice set of abstraction(s) for actors as receivers is already provided for
+ * a few general cases. It is thus exposed as an API where user may come with
+ * his own Actor to run as receiver for Spark Streaming input source.
+ */
+class ActorReceiver[T: ClassManifest](settings: Settings)
+  extends NetworkReceiver[T] {
+
+  protected lazy val blocksGenerator: BlockGenerator =
+    new BlockGenerator(settings.storageLevel)
+
+  protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor),
+    "Supervisor" + streamId)
+
+  private class Supervisor extends Actor {
+
+    override val supervisorStrategy = settings.supervisorStrategy
+    val worker = context.actorOf(settings.props, settings.name)
+    logInfo("Started receiver worker at:" + worker.path)
+
+    val n: AtomicInteger = new AtomicInteger(0)
+    val hiccups: AtomicInteger = new AtomicInteger(0)
+
+    def receive = {
+
+      case props: Props =>
+        val worker = context.actorOf(props)
+        logInfo("Started receiver worker at:" + worker.path)
+        sender ! worker
+
+      case (props: Props, name: String) =>
+        val worker = context.actorOf(props, name)
+        logInfo("Started receiver worker at:" + worker.path)
+        sender ! worker
+
+      case _: PossiblyHarmful => hiccups.incrementAndGet()
+
+      case _: Statistcs =>
+        val workers = context.children
+        sender ! Statistcs(n.get, workers.size, hiccups.get, workers.mkString("\n"))
+
+      case Data(iter: Iterator[_]) => push(iter.asInstanceOf[Iterator[T]])
+
+      case Data(msg) =>
+        blocksGenerator += msg.asInstanceOf[T]
+        n.incrementAndGet
+    }
+  }
+
+  protected def push(iter: Iterator[T]) {
+    pushBlock("block-" + streamId + "-" + System.nanoTime(),
+      iter, null, settings.storageLevel)
+  }
+
+  protected def onStart() = {
+    blocksGenerator.start()
+    supervisor
+    logInfo("Supervision tree for receivers initialized at:" + supervisor.path)
+  }
+
+  protected def onStop() = {
+    supervisor ! PoisonPill
+  }
+
+}