Skip to content
Snippets Groups Projects
Commit d8cee52d authored by Tathagata Das's avatar Tathagata Das
Browse files

Merge branch 'mesos-streaming' into streaming

parents 208edaac 24c0cd61
No related branches found
No related tags found
No related merge requests found
......@@ -20,11 +20,10 @@
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
<groupId>com.twitter</groupId>
<artifactId>algebird-core_2.9.2</artifactId>
<version>0.1.9</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
......
package spark.streaming.examples
import spark.streaming.{Seconds, StreamingContext}
import spark.storage.StorageLevel
import com.twitter.algebird._
import spark.streaming.StreamingContext._
import spark.SparkContext._
/**
* Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
* windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
* <br>
* <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs,
* the example operates on Long IDs. Once the implementation supports other inputs (such as String),
* the same approach could be used for computing popular topics for example.
* <p>
* <p>
* <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
* This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure
* for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc),
* that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the
* estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold
* percentage of the overall total count.
* <p><p>
* Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the reduce operation.
*/
object TwitterAlgebirdCMS {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: TwitterAlgebirdCMS <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
// CMS parameters
val DELTA = 1E-3
val EPS = 0.01
val SEED = 1
val PERC = 0.001
// K highest frequency elements to take
val TOPK = 10
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10))
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC)
var globalCMS = cms.zero
val mm = new MapMonoid[Long, Int]()
var globalExact = Map[Long, Int]()
val approxTopUsers = users.mapPartitions(ids => {
ids.map(id => cms.create(id))
}).reduce(_ ++ _)
val exactTopUsers = users.map(id => (id, 1))
.reduceByKey((a, b) => a + b)
approxTopUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
val partialTopK = partial.heavyHitters.map(id =>
(id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
globalCMS ++= partial
val globalTopK = globalCMS.heavyHitters.map(id =>
(id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC,
partialTopK.mkString("[", ",", "]")))
println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC,
globalTopK.mkString("[", ",", "]")))
}
})
exactTopUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partialMap = rdd.collect().toMap
val partialTopK = rdd.map(
{case (id, count) => (count, id)})
.sortByKey(ascending = false).take(TOPK)
globalExact = mm.plus(globalExact.toMap, partialMap)
val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]")))
println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]")))
}
})
ssc.start()
}
}
package spark.streaming.examples
import spark.streaming.{Seconds, StreamingContext}
import spark.storage.StorageLevel
import com.twitter.algebird.HyperLogLog._
import com.twitter.algebird.HyperLogLogMonoid
import spark.streaming.dstream.TwitterInputDStream
/**
* Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
* a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
* <p>
* <p>
* This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
* blog post</a> and this
* <a href="http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">blog post</a>
* have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for estimating
* the cardinality of a data stream, i.e. the number of unique elements.
* <p><p>
* Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the reduce operation.
*/
object TwitterAlgebirdHLL {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: TwitterAlgebirdHLL <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
/** Bit size parameter for HyperLogLog, trades off accuracy vs size */
val BIT_SIZE = 12
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5))
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
val hll = new HyperLogLogMonoid(BIT_SIZE)
var globalHll = hll.zero
var userSet: Set[Long] = Set()
val approxUsers = users.mapPartitions(ids => {
ids.map(id => hll(id))
}).reduce(_ + _)
val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
approxUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
globalHll += partial
println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt))
}
})
exactUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
userSet ++= partial
println("Exact distinct users this batch: %d".format(partial.size))
println("Exact distinct users overall: %d".format(userSet.size))
println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100))
}
})
ssc.start()
}
}
package spark.streaming.examples
import akka.actor.ActorSystem
import akka.actor.actorRef2Scala
import akka.zeromq._
import spark.streaming.{ Seconds, StreamingContext }
import spark.streaming.StreamingContext._
import akka.zeromq.Subscribe
/**
* A simple publisher for demonstration purposes, repeatedly publishes random Messages
* every one second.
*/
object SimpleZeroMQPublisher {
def main(args: Array[String]) = {
if (args.length < 2) {
System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
System.exit(1)
}
val Seq(url, topic) = args.toSeq
val acs: ActorSystem = ActorSystem()
val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
val messages: Array[String] = Array("words ", "may ", "count ")
while (true) {
Thread.sleep(1000)
pubSocket ! ZMQMessage(Frame(topic) :: messages.map(x => Frame(x.getBytes)).toList)
}
acs.awaitTermination()
}
}
/**
* A sample wordcount with ZeroMQStream stream
*
* To work with zeroMQ, some native libraries have to be installed.
* Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide](http://www.zeromq.org/intro:get-the-software)
*
* Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>
* In local mode, <master> should be 'local[n]' with n > 1
* <zeroMQurl> and <topic> describe where zeroMq publisher is running.
*
* To run this example locally, you may run publisher as
* `$ ./run spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
* and run the example as
* `$ ./run spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
*/
object ZeroMQWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(
"Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>" +
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
val Seq(master, url, topic) = args.toSeq
// Create the context and set the batch size
val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2))
def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator
//For this stream, a zeroMQ publisher should be running.
val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
}
}
\ No newline at end of file
......@@ -154,17 +154,22 @@ object SparkBuild extends Build {
)
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples"
name := "spark-examples",
libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.9")
)
def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
resolvers ++= Seq(
"Akka Repository" at "http://repo.akka.io/releases"
),
libraryDependencies ++= Seq(
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile",
"com.github.sgroschupf" % "zkclient" % "0.1",
"org.twitter4j" % "twitter4j-stream" % "3.0.3"
"org.twitter4j" % "twitter4j-stream" % "3.0.3",
"com.typesafe.akka" % "akka-zeromq" % "2.0.3"
)
) ++ assemblySettings ++ extraAssemblySettings
......
......@@ -47,6 +47,16 @@
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
......
......@@ -2,12 +2,14 @@ package spark.streaming
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.zeromq.Subscribe
import spark.streaming.dstream._
import spark.{RDD, Logging, SparkEnv, SparkContext}
import spark.streaming.receivers.ActorReceiver
import spark.streaming.receivers.ReceiverSupervisorStrategy
import spark.streaming.receivers.ZeroMQReceiver
import spark.storage.StorageLevel
import spark.util.MetadataCleaner
import spark.streaming.receivers.ActorReceiver
......@@ -161,7 +163,7 @@ class StreamingContext private (
* @param props Props object defining creation of the actor
* @param name Name of the actor
* @param storageLevel RDD storage level. Defaults to memory-only.
*
*
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e parametrized type of data received and actorStream
......@@ -174,6 +176,26 @@ class StreamingContext private (
networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param publisherUrl Url of remote zeromq publisher
* @param zeroMQ topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def zeroMQStream[T: ClassManifest](publisherUrl:String,
subscribe: Subscribe,
bytesToObjects: Seq[Seq[Byte]] Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
......@@ -478,4 +500,3 @@ object StreamingContext {
new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
}
}
package spark.streaming.receivers
import akka.actor.Actor
import akka.zeromq._
import spark.Logging
/**
* A receiver to subscribe to ZeroMQ stream.
*/
private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: Seq[Seq[Byte]] Iterator[T])
extends Actor with Receiver with Logging {
override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self),
Connect(publisherUrl), subscribe)
def receive: Receive = {
case Connecting logInfo("connecting ...")
case m: ZMQMessage
logDebug("Received message for:" + m.firstFrameAsString)
//We ignore first frame for processing as it is the topic
val bytes = m.frames.tail.map(_.payload)
pushBlock(bytesToObjects(bytes))
case Closed logInfo("received closed ")
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment