Skip to content
Snippets Groups Projects
Commit d9bdae8c authored by Nick Pentreath's avatar Nick Pentreath
Browse files

Adding documentation for HLL and CMS examples. More efficient and clear use of the monoids.

parent 718474b9
No related branches found
No related tags found
No related merge requests found
......@@ -7,8 +7,22 @@ import spark.streaming.StreamingContext._
import spark.SparkContext._
/**
* Example of using CountMinSketch monoid from Twitter's Algebird together with Spark Streaming's
* TwitterInputDStream
* Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
* windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
* <br>
* <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs,
* the example operates on Long IDs. Once the implementation supports other inputs (such as String),
* the same approach could be used for computing popular topics for example.
* <p>
* <p>
* <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
* This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure
* for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc),
* that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the
* estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold
* percentage of the overall total count.
* <p><p>
* Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the reduce operation.
*/
object TwitterAlgebirdCMS {
def main(args: Array[String]) {
......@@ -18,27 +32,28 @@ object TwitterAlgebirdCMS {
System.exit(1)
}
// CMS parameters
val DELTA = 1E-3
val EPS = 0.01
val SEED = 1
val PERC = 0.001
// K highest frequency elements to take
val TOPK = 10
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10))
val stream = ssc.twitterStream(username, password, filters,
StorageLevel.MEMORY_ONLY_SER)
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
var globalCMS = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC).zero
var globalExact = Map[Long, Int]()
val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC)
var globalCMS = cms.zero
val mm = new MapMonoid[Long, Int]()
var globalExact = Map[Long, Int]()
val approxTopUsers = users.mapPartitions(ids => {
val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC)
ids.map(id => cms.create(id))
}).reduce(_ ++ _)
......
......@@ -7,8 +7,17 @@ import com.twitter.algebird.HyperLogLogMonoid
import spark.streaming.dstream.TwitterInputDStream
/**
* Example using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's
* TwitterInputDStream to compute approximate distinct counts of userids.
* Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
* a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
* <p>
* <p>
* This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
* blog post</a> and this
* <a href="http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">blog post</a>
* have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for estimating
* the cardinality of a data stream, i.e. the number of unique elements.
* <p><p>
* Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the reduce operation.
*/
object TwitterAlgebirdHLL {
def main(args: Array[String]) {
......@@ -18,7 +27,7 @@ object TwitterAlgebirdHLL {
System.exit(1)
}
/** Bit size parameter for HyperLogLog */
/** Bit size parameter for HyperLogLog, trades off accuracy vs size */
val BIT_SIZE = 12
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
......@@ -28,11 +37,11 @@ object TwitterAlgebirdHLL {
val users = stream.map(status => status.getUser.getId)
var globalHll = new HyperLogLogMonoid(BIT_SIZE).zero
val hll = new HyperLogLogMonoid(BIT_SIZE)
var globalHll = hll.zero
var userSet: Set[Long] = Set()
val approxUsers = users.mapPartitions(ids => {
val hll = new HyperLogLogMonoid(BIT_SIZE)
ids.map(id => hll(id))
}).reduce(_ + _)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment