Skip to content
Snippets Groups Projects
Commit 015893f0 authored by Nick Pentreath's avatar Nick Pentreath
Browse files

Adding streaming HyperLogLog example using Algebird

parent b53174a6
No related branches found
No related tags found
No related merge requests found
......@@ -24,6 +24,11 @@
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>algebird-core_2.9.2</artifactId>
<version>0.1.8</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
......
package spark.streaming.examples.twitter
import spark.streaming.{Seconds, StreamingContext}
import spark.storage.StorageLevel
import com.twitter.algebird.HyperLogLog._
import com.twitter.algebird.HyperLogLogMonoid
/**
* Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's
* TwitterInputDStream
*/
object StreamingHLL {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: TwitterStreamingHLL <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2))
val stream = new TwitterInputDStream(ssc, username, password, filters,
StorageLevel.MEMORY_ONLY_SER)
ssc.registerInputStream(stream)
val users = stream.map(status => status.getUser.getId)
val globalHll = new HyperLogLogMonoid(12)
var userSet: Set[Long] = Set()
val approxUsers = users.mapPartitions(ids => {
val hll = new HyperLogLogMonoid(12)
ids.map(id => hll(id))
}).reduce(_ + _)
val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
var h = globalHll.zero
approxUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
h += partial
println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt))
}
})
exactUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
userSet ++= partial
println("Exact distinct users this batch: %d".format(partial.size))
println("Exact distinct users overall: %d".format(userSet.size))
println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100))
}
})
ssc.start()
}
}
......@@ -156,7 +156,8 @@ object SparkBuild extends Build {
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples",
libraryDependencies ++= Seq(
"org.twitter4j" % "twitter4j-stream" % "3.0.3"
"org.twitter4j" % "twitter4j-stream" % "3.0.3",
"com.twitter" % "algebird-core_2.9.2" % "0.1.8"
)
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment