Skip to content
Snippets Groups Projects
Commit d8ee184d authored by Nick Pentreath's avatar Nick Pentreath
Browse files

Dependencies and refactoring for streaming HLL example, and using context.twitterStream method

parent 315ea069
No related branches found
No related tags found
No related merge requests found
......@@ -19,17 +19,11 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>algebird-core_2.9.2</artifactId>
<version>0.1.8</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
......
package spark.streaming.examples.twitter
package spark.streaming.examples
import spark.streaming.{Seconds, StreamingContext}
import spark.storage.StorageLevel
......@@ -7,44 +7,43 @@ import com.twitter.algebird.HyperLogLogMonoid
import spark.streaming.dstream.TwitterInputDStream
/**
* Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's
* TwitterInputDStream
* Example using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's
* TwitterInputDStream to compute approximate distinct counts of userids.
*/
object StreamingHLL {
object TwitterAlgebirdHLL {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: TwitterStreamingHLL <master> <twitter_username> <twitter_password>" +
System.err.println("Usage: TwitterAlgebirdHLL <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
/** Bit size parameter for HyperLogLog */
val BIT_SIZE = 12
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2))
val stream = new TwitterInputDStream(ssc, username, password, filters,
StorageLevel.MEMORY_ONLY_SER)
ssc.registerInputStream(stream)
val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5))
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
val globalHll = new HyperLogLogMonoid(12)
var globalHll = new HyperLogLogMonoid(BIT_SIZE).zero
var userSet: Set[Long] = Set()
val approxUsers = users.mapPartitions(ids => {
val hll = new HyperLogLogMonoid(12)
val hll = new HyperLogLogMonoid(BIT_SIZE)
ids.map(id => hll(id))
}).reduce(_ + _)
val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
var h = globalHll.zero
approxUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
h += partial
globalHll += partial
println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt))
println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt))
}
})
......@@ -54,7 +53,7 @@ object StreamingHLL {
userSet ++= partial
println("Exact distinct users this batch: %d".format(partial.size))
println("Exact distinct users overall: %d".format(userSet.size))
println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100))
println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100))
}
})
......
......@@ -47,6 +47,16 @@
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment