diff --git a/examples/pom.xml b/examples/pom.xml
index f43af670c613fb6fa3dcaa09717b3e16e83af6da..f6125444e2a46e326482a6e91e860f37b37f2a8a 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -20,11 +20,10 @@
       <artifactId>jetty-server</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.twitter4j</groupId>
-      <artifactId>twitter4j-stream</artifactId>
-      <version>3.0.3</version>
+      <groupId>com.twitter</groupId>
+      <artifactId>algebird-core_2.9.2</artifactId>
+      <version>0.1.9</version>
     </dependency>
-
     <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.version}</artifactId>
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
new file mode 100644
index 0000000000000000000000000000000000000000..39a1a702eeae925f278c0b692e2c929bf5e7b74e
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -0,0 +1,93 @@
+package spark.streaming.examples
+
+import spark.streaming.{Seconds, StreamingContext}
+import spark.storage.StorageLevel
+import com.twitter.algebird._
+import spark.streaming.StreamingContext._
+import spark.SparkContext._
+
+/**
+ * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
+ * windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
+ * <br>
+ *   <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs,
+ *   the example operates on Long IDs. Once the implementation supports other inputs (such as String),
+ *   the same approach could be used for computing popular topics for example.
+ * <p>
+ * <p>
+ *   <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
+ *   This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure
+ *   for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc),
+ *   that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the
+ *   estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold
+ *   percentage of the overall total count.
+ * <p><p>
+ *   Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the reduce operation.
+ */
+object TwitterAlgebirdCMS {
+  def main(args: Array[String]) {
+    if (args.length < 3) {
+      System.err.println("Usage: TwitterAlgebirdCMS <master> <twitter_username> <twitter_password>" +
+        " [filter1] [filter2] ... [filter n]")
+      System.exit(1)
+    }
+
+    // CMS parameters
+    val DELTA = 1E-3
+    val EPS = 0.01
+    val SEED = 1
+    val PERC = 0.001
+    // K highest frequency elements to take
+    val TOPK = 10
+
+    val Array(master, username, password) = args.slice(0, 3)
+    val filters = args.slice(3, args.length)
+
+    val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10))
+    val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
+
+    val users = stream.map(status => status.getUser.getId)
+
+    val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC)
+    var globalCMS = cms.zero
+    val mm = new MapMonoid[Long, Int]()
+    var globalExact = Map[Long, Int]()
+
+    val approxTopUsers = users.mapPartitions(ids => {
+      ids.map(id => cms.create(id))
+    }).reduce(_ ++ _)
+
+    val exactTopUsers = users.map(id => (id, 1))
+      .reduceByKey((a, b) => a + b)
+
+    approxTopUsers.foreach(rdd => {
+      if (rdd.count() != 0) {
+        val partial = rdd.first()
+        val partialTopK = partial.heavyHitters.map(id =>
+          (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+        globalCMS ++= partial
+        val globalTopK = globalCMS.heavyHitters.map(id =>
+          (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+        println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC,
+          partialTopK.mkString("[", ",", "]")))
+        println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC,
+          globalTopK.mkString("[", ",", "]")))
+      }
+    })
+
+    exactTopUsers.foreach(rdd => {
+      if (rdd.count() != 0) {
+        val partialMap = rdd.collect().toMap
+        val partialTopK = rdd.map(
+          {case (id, count) => (count, id)})
+          .sortByKey(ascending = false).take(TOPK)
+        globalExact = mm.plus(globalExact.toMap, partialMap)
+        val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+        println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]")))
+        println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]")))
+      }
+    })
+
+    ssc.start()
+  }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
new file mode 100644
index 0000000000000000000000000000000000000000..914fba4ca22c54a2678ce2c14b2db1d1eb976265
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -0,0 +1,71 @@
+package spark.streaming.examples
+
+import spark.streaming.{Seconds, StreamingContext}
+import spark.storage.StorageLevel
+import com.twitter.algebird.HyperLogLog._
+import com.twitter.algebird.HyperLogLogMonoid
+import spark.streaming.dstream.TwitterInputDStream
+
+/**
+ * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
+ * a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
+ * <p>
+ * <p>
+ *   This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
+ *   blog post</a> and this
+ *   <a href="http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">blog post</a>
+ *   have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for estimating
+ *   the cardinality of a data stream, i.e. the number of unique elements.
+ * <p><p>
+ *   Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the reduce operation.
+ */
+object TwitterAlgebirdHLL {
+  def main(args: Array[String]) {
+    if (args.length < 3) {
+      System.err.println("Usage: TwitterAlgebirdHLL <master> <twitter_username> <twitter_password>" +
+        " [filter1] [filter2] ... [filter n]")
+      System.exit(1)
+    }
+
+    /** Bit size parameter for HyperLogLog, trades off accuracy vs size */
+    val BIT_SIZE = 12
+    val Array(master, username, password) = args.slice(0, 3)
+    val filters = args.slice(3, args.length)
+
+    val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5))
+    val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
+
+    val users = stream.map(status => status.getUser.getId)
+
+    val hll = new HyperLogLogMonoid(BIT_SIZE)
+    var globalHll = hll.zero
+    var userSet: Set[Long] = Set()
+
+    val approxUsers = users.mapPartitions(ids => {
+      ids.map(id => hll(id))
+    }).reduce(_ + _)
+
+    val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
+
+    approxUsers.foreach(rdd => {
+      if (rdd.count() != 0) {
+        val partial = rdd.first()
+        globalHll += partial
+        println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
+        println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt))
+      }
+    })
+
+    exactUsers.foreach(rdd => {
+      if (rdd.count() != 0) {
+        val partial = rdd.first()
+        userSet ++= partial
+        println("Exact distinct users this batch: %d".format(partial.size))
+        println("Exact distinct users overall: %d".format(userSet.size))
+        println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100))
+      }
+    })
+
+    ssc.start()
+  }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
new file mode 100644
index 0000000000000000000000000000000000000000..5ed9b7cb768750e5e140802bdeca782fae134847
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
@@ -0,0 +1,73 @@
+package spark.streaming.examples
+
+import akka.actor.ActorSystem
+import akka.actor.actorRef2Scala
+import akka.zeromq._
+import spark.streaming.{ Seconds, StreamingContext }
+import spark.streaming.StreamingContext._
+import akka.zeromq.Subscribe
+
+/**
+ * A simple publisher for demonstration purposes, repeatedly publishes random Messages
+ * every one second.
+ */
+object SimpleZeroMQPublisher {
+
+  def main(args: Array[String]) = {
+    if (args.length < 2) {
+      System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
+      System.exit(1)
+    }
+
+    val Seq(url, topic) = args.toSeq
+    val acs: ActorSystem = ActorSystem()
+
+    val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
+    val messages: Array[String] = Array("words ", "may ", "count ")
+    while (true) {
+      Thread.sleep(1000)
+      pubSocket ! ZMQMessage(Frame(topic) :: messages.map(x => Frame(x.getBytes)).toList)
+    }
+    acs.awaitTermination()
+  }
+}
+
+/**
+ * A sample wordcount with ZeroMQStream stream
+ *
+ * To work with zeroMQ, some native libraries have to be installed.
+ * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide](http://www.zeromq.org/intro:get-the-software)
+ * 
+ * Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>
+ * In local mode, <master> should be 'local[n]' with n > 1
+ *   <zeroMQurl> and <topic> describe where zeroMq publisher is running.
+ *
+ * To run this example locally, you may run publisher as
+ *    `$ ./run spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
+ * and run the example as
+ *    `$ ./run spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
+ */
+object ZeroMQWordCount {
+  def main(args: Array[String]) {
+    if (args.length < 3) {
+      System.err.println(
+        "Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>" +
+          "In local mode, <master> should be 'local[n]' with n > 1")
+      System.exit(1)
+    }
+    val Seq(master, url, topic) = args.toSeq
+
+    // Create the context and set the batch size
+    val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2))
+
+    def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator
+
+    //For this stream, a zeroMQ publisher should be running.
+    val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator)
+    val words = lines.flatMap(_.split(" "))
+    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+    wordCounts.print()
+    ssc.start()
+  }
+
+}
\ No newline at end of file
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c6d3cc8b1540f51effb85b9fccbe423d27bedca0..5e7c3b5e3aca2b145748c956ac4b3cf5a87a8b33 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -154,17 +154,22 @@ object SparkBuild extends Build {
   )
 
   def examplesSettings = sharedSettings ++ Seq(
-    name := "spark-examples"
+    name := "spark-examples",
+    libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.9")
   )
 
   def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
 
   def streamingSettings = sharedSettings ++ Seq(
     name := "spark-streaming",
+    resolvers ++= Seq(
+      "Akka Repository" at "http://repo.akka.io/releases"
+    ),
     libraryDependencies ++= Seq(
       "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile",
       "com.github.sgroschupf" % "zkclient" % "0.1",
-      "org.twitter4j" % "twitter4j-stream" % "3.0.3"
+      "org.twitter4j" % "twitter4j-stream" % "3.0.3",
+      "com.typesafe.akka" % "akka-zeromq" % "2.0.3"
     )
   ) ++ assemblySettings ++ extraAssemblySettings
 
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 6ee7e59df39d16b1bf47ad23604bfe7f80f40f53..d78c39da0d1a60a1bdbf928c3981392f1bbc11c1 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -47,6 +47,16 @@
       <artifactId>zkclient</artifactId>
       <version>0.1</version>
     </dependency>
+    <dependency>
+      <groupId>org.twitter4j</groupId>
+      <artifactId>twitter4j-stream</artifactId>
+      <version>3.0.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.twitter4j</groupId>
+      <artifactId>twitter4j-core</artifactId>
+      <version>3.0.3</version>
+    </dependency>
 
     <dependency>
       <groupId>org.scalatest</groupId>
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index d76ccfca4f946f36f85f792851027669d491a059..d0430b3f3eef8867c33ea9862df694958224dce8 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -2,12 +2,14 @@ package spark.streaming
 
 import akka.actor.Props
 import akka.actor.SupervisorStrategy
+import akka.zeromq.Subscribe
 
 import spark.streaming.dstream._
 
 import spark.{RDD, Logging, SparkEnv, SparkContext}
 import spark.streaming.receivers.ActorReceiver
 import spark.streaming.receivers.ReceiverSupervisorStrategy
+import spark.streaming.receivers.ZeroMQReceiver
 import spark.storage.StorageLevel
 import spark.util.MetadataCleaner
 import spark.streaming.receivers.ActorReceiver
@@ -161,7 +163,7 @@ class StreamingContext private (
    * @param props Props object defining creation of the actor
    * @param name Name of the actor
    * @param storageLevel RDD storage level. Defaults to memory-only.
-   * 
+   *
    * @note An important point to note:
    *       Since Actor may exist outside the spark framework, It is thus user's responsibility
    *       to ensure the type safety, i.e parametrized type of data received and actorStream
@@ -174,6 +176,26 @@ class StreamingContext private (
     networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
   }
 
+  /**
+   * Create an input stream that receives messages pushed by a zeromq publisher.
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param zeroMQ topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+   *                       of byte thus it needs the converter(which might be deserializer of bytes)
+   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
+   *                       and sub sequence refer to its payload.
+   * @param storageLevel RDD storage level. Defaults to memory-only.
+   */
+  def zeroMQStream[T: ClassManifest](publisherUrl:String,
+      subscribe: Subscribe,
+      bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+      storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+      supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
+
+    actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
+        "ZeroMQReceiver", storageLevel, supervisorStrategy)
+  }
+
   /**
    * Create an input stream that pulls messages form a Kafka Broker.
    * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
@@ -478,4 +500,3 @@ object StreamingContext {
     new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
   }
 }
-
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala
new file mode 100644
index 0000000000000000000000000000000000000000..5533c3cf1ef8b5a316aea8b8fa2b9c88f60872fe
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala
@@ -0,0 +1,33 @@
+package spark.streaming.receivers
+
+import akka.actor.Actor
+import akka.zeromq._
+
+import spark.Logging
+
+/**
+ * A receiver to subscribe to ZeroMQ stream.
+ */
+private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String,
+  subscribe: Subscribe,
+  bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T])
+  extends Actor with Receiver with Logging {
+
+  override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self),
+    Connect(publisherUrl), subscribe)
+
+  def receive: Receive = {
+
+    case Connecting ⇒ logInfo("connecting ...")
+
+    case m: ZMQMessage ⇒
+      logDebug("Received message for:" + m.firstFrameAsString)
+
+      //We ignore first frame for processing as it is the topic
+      val bytes = m.frames.tail.map(_.payload)
+      pushBlock(bytesToObjects(bytes))
+
+    case Closed ⇒ logInfo("received closed ")
+
+  }
+}