From 015893f0e8983a7e249709d9820d1bf0dd74d607 Mon Sep 17 00:00:00 2001
From: Nick Pentreath <nick.pentreath@gmail.com>
Date: Tue, 19 Feb 2013 13:21:33 +0200
Subject: [PATCH] Adding streaming HyperLogLog example using Algebird

---
 examples/pom.xml                              |  5 ++
 .../examples/twitter/StreamingHLL.scala       | 62 +++++++++++++++++++
 project/SparkBuild.scala                      |  3 +-
 3 files changed, 69 insertions(+), 1 deletion(-)
 create mode 100644 examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala

diff --git a/examples/pom.xml b/examples/pom.xml
index f43af670c6..28da3dbde4 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -24,6 +24,11 @@
       <artifactId>twitter4j-stream</artifactId>
       <version>3.0.3</version>
     </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>algebird-core_2.9.2</artifactId>
+      <version>0.1.8</version>
+    </dependency>
 
     <dependency>
       <groupId>org.scalatest</groupId>
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala
new file mode 100644
index 0000000000..f67bb029c6
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala
@@ -0,0 +1,62 @@
+package spark.streaming.examples.twitter
+
+import spark.streaming.{Seconds, StreamingContext}
+import spark.storage.StorageLevel
+import com.twitter.algebird.HyperLogLog._
+import com.twitter.algebird.HyperLogLogMonoid
+
+/**
+ * Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's
+ * TwitterInputDStream
+ */
+object StreamingHLL {
+  def main(args: Array[String]) {
+    if (args.length < 3) {
+      System.err.println("Usage: TwitterStreamingHLL <master> <twitter_username> <twitter_password>" +
+        " [filter1] [filter2] ... [filter n]")
+      System.exit(1)
+    }
+
+    val Array(master, username, password) = args.slice(0, 3)
+    val filters = args.slice(3, args.length)
+
+    val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2))
+    val stream = new TwitterInputDStream(ssc, username, password, filters,
+      StorageLevel.MEMORY_ONLY_SER)
+    ssc.registerInputStream(stream)
+
+    val users = stream.map(status => status.getUser.getId)
+
+    val globalHll = new HyperLogLogMonoid(12)
+    var userSet: Set[Long] = Set()
+
+    val approxUsers = users.mapPartitions(ids => {
+      val hll = new HyperLogLogMonoid(12)
+      ids.map(id => hll(id))
+    }).reduce(_ + _)
+
+    val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
+
+    var h = globalHll.zero
+    approxUsers.foreach(rdd => {
+      if (rdd.count() != 0) {
+        val partial = rdd.first()
+        h += partial
+        println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
+        println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt))
+      }
+    })
+
+    exactUsers.foreach(rdd => {
+      if (rdd.count() != 0) {
+        val partial = rdd.first()
+        userSet ++= partial
+        println("Exact distinct users this batch: %d".format(partial.size))
+        println("Exact distinct users overall: %d".format(userSet.size))
+        println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100))
+      }
+    })
+
+    ssc.start()
+  }
+}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index af8b5ba017..18cc9ea90e 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -156,7 +156,8 @@ object SparkBuild extends Build {
   def examplesSettings = sharedSettings ++ Seq(
     name := "spark-examples",
     libraryDependencies ++= Seq(
-      "org.twitter4j" % "twitter4j-stream" % "3.0.3"
+      "org.twitter4j" % "twitter4j-stream" % "3.0.3",
+      "com.twitter" % "algebird-core_2.9.2" % "0.1.8"
     )
   )
 
-- 
GitLab