Skip to content
Snippets Groups Projects
Commit 22f68b7e authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #494 from mesos/streaming

Final set of Spark Streaming changes
parents 066ce0ff f9881b10
No related branches found
No related tags found
No related merge requests found
Showing
with 822 additions and 109 deletions
...@@ -22,10 +22,10 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri ...@@ -22,10 +22,10 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri
override def getPartitions: Array[Partition] = { override def getPartitions: Array[Partition] = {
val dirContents = fs.listStatus(new Path(checkpointPath)) val dirContents = fs.listStatus(new Path(checkpointPath))
val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
val numPartitions = splitFiles.size val numPartitions = partitionFiles.size
if (!splitFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) || if (numPartitions > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
!splitFiles(numPartitions-1).endsWith(CheckpointRDD.splitIdToFile(numPartitions-1))) { ! partitionFiles(numPartitions-1).endsWith(CheckpointRDD.splitIdToFile(numPartitions-1)))) {
throw new SparkException("Invalid checkpoint directory: " + checkpointPath) throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
} }
Array.tabulate(numPartitions)(i => new CheckpointRDDPartition(i)) Array.tabulate(numPartitions)(i => new CheckpointRDDPartition(i))
......
...@@ -38,7 +38,7 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging ...@@ -38,7 +38,7 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging
object MetadataCleaner { object MetadataCleaner {
def getDelaySeconds = System.getProperty("spark.cleaner.delay", "-1").toInt def getDelaySeconds = System.getProperty("spark.cleaner.ttl", "-1").toInt
def setDelaySeconds(delay: Int) { System.setProperty("spark.cleaner.delay", delay.toString) } def setDelaySeconds(delay: Int) { System.setProperty("spark.cleaner.ttl", delay.toString) }
} }
...@@ -162,6 +162,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { ...@@ -162,6 +162,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false) rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
} }
test("CheckpointRDD with zero partitions") {
val rdd = new BlockRDD[Int](sc, Array[String]())
assert(rdd.partitions.size === 0)
assert(rdd.isCheckpointed === false)
rdd.checkpoint()
assert(rdd.count() === 0)
assert(rdd.isCheckpointed === true)
assert(rdd.partitions.size === 0)
}
/** /**
* Test checkpointing of the final RDD generated by the given operation. By default, * Test checkpointing of the final RDD generated by the given operation. By default,
* this method tests whether the size of serialized RDD has reduced after checkpointing or not. * this method tests whether the size of serialized RDD has reduced after checkpointing or not.
......
...@@ -183,7 +183,7 @@ Apart from these, the following properties are also available, and may be useful ...@@ -183,7 +183,7 @@ Apart from these, the following properties are also available, and may be useful
</tr> </tr>
<tr> <tr>
<td>spark.broadcast.factory</td> <td>spark.broadcast.factory</td>
<td>spark.broadcast. HttpBroadcastFactory</td> <td>spark.broadcast.HttpBroadcastFactory</td>
<td> <td>
Which broadcast implementation to use. Which broadcast implementation to use.
</td> </td>
...@@ -244,10 +244,10 @@ Apart from these, the following properties are also available, and may be useful ...@@ -244,10 +244,10 @@ Apart from these, the following properties are also available, and may be useful
</td> </td>
</tr> </tr>
<tr> <tr>
<td>spark.cleaner.delay</td> <td>spark.cleaner.ttl</td>
<td>(disable)</td> <td>(disable)</td>
<td> <td>
Duration (minutes) of how long Spark will remember any metadata (stages generated, tasks generated, etc.). Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.).
Periodic cleanups will ensure that metadata older than this duration will be forgetten. This is Periodic cleanups will ensure that metadata older than this duration will be forgetten. This is
useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming
applications). Note that any RDD that persists in memory for more than this duration will be cleared as well. applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.
......
...@@ -87,7 +87,7 @@ By default, the `pyspark` shell creates SparkContext that runs jobs locally. ...@@ -87,7 +87,7 @@ By default, the `pyspark` shell creates SparkContext that runs jobs locally.
To connect to a non-local cluster, set the `MASTER` environment variable. To connect to a non-local cluster, set the `MASTER` environment variable.
For example, to use the `pyspark` shell with a [standalone Spark cluster](spark-standalone.html): For example, to use the `pyspark` shell with a [standalone Spark cluster](spark-standalone.html):
{% highlight shell %} {% highlight bash %}
$ MASTER=spark://IP:PORT ./pyspark $ MASTER=spark://IP:PORT ./pyspark
{% endhighlight %} {% endhighlight %}
......
---
layout: global
title: Tutorial - Spark Streaming, Plugging in a custom receiver.
---
A "Spark Streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need.
This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.
## A quick and naive walk-through
### Write a simple receiver
This starts with implementing [Actor](#References)
Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api.
{% highlight scala %}
class SocketTextStreamReceiver (host:String,
port:Int,
bytesToString: ByteString => String) extends Actor with Receiver {
override def preStart = IOManager(context.system).connect(host, port)
def receive = {
case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
}
}
{% endhighlight %}
All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details.
### A sample spark application
* First create a Spark streaming context with master url and batchduration.
{% highlight scala %}
val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
Seconds(batchDuration))
{% endhighlight %}
* Plug-in the actor configuration into the spark streaming context and create a DStream.
{% highlight scala %}
val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
{% endhighlight %}
* Process it.
{% highlight scala %}
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
{% endhighlight %}
* After processing it, stream can be tested using the netcat utility.
$ nc -l localhost 8445
hello world
hello hello
## Multiple homogeneous/heterogeneous receivers.
A DStream union operation is provided for taking union on multiple input streams.
{% highlight scala %}
val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
// Another socket stream receiver
val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8446, z => z.utf8String)),"SocketReceiver")
val union = lines.union(lines2)
{% endhighlight %}
Above stream can be easily process as described earlier.
_A more comprehensive example is provided in the spark streaming examples_
## References
1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
This diff is collapsed.
...@@ -20,11 +20,10 @@ ...@@ -20,11 +20,10 @@
<artifactId>jetty-server</artifactId> <artifactId>jetty-server</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.twitter4j</groupId> <groupId>com.twitter</groupId>
<artifactId>twitter4j-stream</artifactId> <artifactId>algebird-core_2.9.2</artifactId>
<version>3.0.3</version> <version>0.1.8</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.scalatest</groupId> <groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId> <artifactId>scalatest_${scala.version}</artifactId>
......
...@@ -23,7 +23,7 @@ import spark.streaming.api.java.JavaStreamingContext; ...@@ -23,7 +23,7 @@ import spark.streaming.api.java.JavaStreamingContext;
*/ */
public class JavaNetworkWordCount { public class JavaNetworkWordCount {
public static void main(String[] args) { public static void main(String[] args) {
if (args.length < 2) { if (args.length < 3) {
System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
"In local mode, <master> should be 'local[n]' with n > 1"); "In local mode, <master> should be 'local[n]' with n > 1");
System.exit(1); System.exit(1);
...@@ -35,7 +35,7 @@ public class JavaNetworkWordCount { ...@@ -35,7 +35,7 @@ public class JavaNetworkWordCount {
// Create a NetworkInputDStream on target ip:port and count the // Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc') // words in input stream of \n delimited test (eg. generated by 'nc')
JavaDStream<String> lines = ssc.networkTextStream(args[1], Integer.parseInt(args[2])); JavaDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2]));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override @Override
public Iterable<String> call(String x) { public Iterable<String> call(String x) {
......
package spark.streaming.examples
import scala.collection.mutable.LinkedList
import scala.util.Random
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.actorRef2Scala
import spark.streaming.Seconds
import spark.streaming.StreamingContext
import spark.streaming.StreamingContext.toPairDStreamFunctions
import spark.streaming.receivers.Receiver
import spark.util.AkkaUtils
case class SubscribeReceiver(receiverActor: ActorRef)
case class UnsubscribeReceiver(receiverActor: ActorRef)
/**
* Sends the random content to every receiver subscribed with 1/2
* second delay.
*/
class FeederActor extends Actor {
val rand = new Random()
var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]()
val strings: Array[String] = Array("words ", "may ", "count ")
def makeMessage(): String = {
val x = rand.nextInt(3)
strings(x) + strings(2 - x)
}
/*
* A thread to generate random messages
*/
new Thread() {
override def run() {
while (true) {
Thread.sleep(500)
receivers.foreach(_ ! makeMessage)
}
}
}.start()
def receive: Receive = {
case SubscribeReceiver(receiverActor: ActorRef) =>
println("received subscribe from %s".format(receiverActor.toString))
receivers = LinkedList(receiverActor) ++ receivers
case UnsubscribeReceiver(receiverActor: ActorRef) =>
println("received unsubscribe from %s".format(receiverActor.toString))
receivers = receivers.dropWhile(x => x eq receiverActor)
}
}
/**
* A sample actor as receiver, is also simplest. This receiver actor
* goes and subscribe to a typical publisher/feeder actor and receives
* data.
*
* @see [[spark.streaming.examples.FeederActor]]
*/
class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
extends Actor with Receiver {
lazy private val remotePublisher = context.actorFor(urlOfPublisher)
override def preStart = remotePublisher ! SubscribeReceiver(context.self)
def receive = {
case msg context.parent ! pushBlock(msg.asInstanceOf[T])
}
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
}
/**
* A sample feeder actor
*
* Usage: FeederActor <hostname> <port>
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on.
*/
object FeederActor {
def main(args: Array[String]) {
if(args.length < 2){
System.err.println(
"Usage: FeederActor <hostname> <port>\n"
)
System.exit(1)
}
val Seq(host, port) = args.toSeq
val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1
val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
println("Feeder started as:" + feeder)
actorSystem.awaitTermination();
}
}
/**
* A sample word count program demonstrating the use of plugging in
* Actor as Receiver
* Usage: ActorWordCount <master> <hostname> <port>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
*
* To run this example locally, you may run Feeder Actor as
* `$ ./run spark.streaming.examples.FeederActor 127.0.1.1 9999`
* and then run the example
* `$ ./run spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
*/
object ActorWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(
"Usage: ActorWordCount <master> <hostname> <port>" +
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
val Seq(master, host, port) = args.toSeq
// Create the context and set the batch size
val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2))
/*
* Following is the use of actorStream to plug in custom actor as receiver
*
* An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e type of data received and InputDstream
* should be same.
*
* For example: Both actorStream and SampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
val lines = ssc.actorStream[String](
Props(new SampleActorReceiver[String]("akka://test@%s:%s/user/FeederActor".format(
host, port.toInt))), "SampleReceiver")
//compute wordcount
lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
ssc.start()
}
}
...@@ -10,22 +10,34 @@ import spark.streaming.StreamingContext._ ...@@ -10,22 +10,34 @@ import spark.streaming.StreamingContext._
import spark.storage.StorageLevel import spark.storage.StorageLevel
import spark.streaming.util.RawTextHelper._ import spark.streaming.util.RawTextHelper._
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
* <zkQuorum> is a list of one or more zookeeper servers that make quorum
* <group> is the name of kafka consumer group
* <topics> is a list of one or more kafka topics to consume from
* <numThreads> is the number of threads the kafka consumer should use
*
* Example:
* `./run spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
*/
object KafkaWordCount { object KafkaWordCount {
def main(args: Array[String]) { def main(args: Array[String]) {
if (args.length < 6) { if (args.length < 5) {
System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <group> <topics> <numThreads>") System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>")
System.exit(1) System.exit(1)
} }
val Array(master, hostname, port, group, topics, numThreads) = args val Array(master, zkQuorum, group, topics, numThreads) = args
val sc = new SparkContext(master, "KafkaWordCount") val sc = new SparkContext(master, "KafkaWordCount")
val ssc = new StreamingContext(sc, Seconds(2)) val ssc = new StreamingContext(sc, Seconds(2))
ssc.checkpoint("checkpoint") ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap) val lines = ssc.kafkaStream[String](zkQuorum, group, topicpMap)
val words = lines.flatMap(_.split(" ")) val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
wordCounts.print() wordCounts.print()
...@@ -38,16 +50,16 @@ object KafkaWordCount { ...@@ -38,16 +50,16 @@ object KafkaWordCount {
object KafkaWordCountProducer { object KafkaWordCountProducer {
def main(args: Array[String]) { def main(args: Array[String]) {
if (args.length < 3) { if (args.length < 2) {
System.err.println("Usage: KafkaWordCountProducer <hostname> <port> <topic> <messagesPerSec> <wordsPerMessage>") System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>")
System.exit(1) System.exit(1)
} }
val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args
// Zookeper connection properties // Zookeper connection properties
val props = new Properties() val props = new Properties()
props.put("zk.connect", hostname + ":" + port) props.put("zk.connect", zkQuorum)
props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("serializer.class", "kafka.serializer.StringEncoder")
val config = new ProducerConfig(props) val config = new ProducerConfig(props)
......
...@@ -16,7 +16,7 @@ import spark.streaming.StreamingContext._ ...@@ -16,7 +16,7 @@ import spark.streaming.StreamingContext._
*/ */
object NetworkWordCount { object NetworkWordCount {
def main(args: Array[String]) { def main(args: Array[String]) {
if (args.length < 2) { if (args.length < 3) {
System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
"In local mode, <master> should be 'local[n]' with n > 1") "In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1) System.exit(1)
...@@ -27,7 +27,7 @@ object NetworkWordCount { ...@@ -27,7 +27,7 @@ object NetworkWordCount {
// Create a NetworkInputDStream on target ip:port and count the // Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc') // words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.networkTextStream(args(1), args(2).toInt) val lines = ssc.socketTextStream(args(1), args(2).toInt)
val words = lines.flatMap(_.split(" ")) val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print() wordCounts.print()
......
...@@ -37,7 +37,7 @@ object RawNetworkGrep { ...@@ -37,7 +37,7 @@ object RawNetworkGrep {
RawTextHelper.warmUp(ssc.sc) RawTextHelper.warmUp(ssc.sc)
val rawStreams = (1 to numStreams).map(_ => val rawStreams = (1 to numStreams).map(_ =>
ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
val union = ssc.union(rawStreams) val union = ssc.union(rawStreams)
union.filter(_.contains("the")).count().foreach(r => union.filter(_.contains("the")).count().foreach(r =>
println("Grep count: " + r.collect().mkString)) println("Grep count: " + r.collect().mkString))
......
package spark.streaming.examples
import spark.streaming.{Seconds, StreamingContext}
import spark.storage.StorageLevel
import com.twitter.algebird._
import spark.streaming.StreamingContext._
import spark.SparkContext._
/**
* Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
* windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
* <br>
* <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs,
* the example operates on Long IDs. Once the implementation supports other inputs (such as String),
* the same approach could be used for computing popular topics for example.
* <p>
* <p>
* <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
* This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure
* for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc),
* that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the
* estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold
* percentage of the overall total count.
* <p><p>
* Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the reduce operation.
*/
object TwitterAlgebirdCMS {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: TwitterAlgebirdCMS <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
// CMS parameters
val DELTA = 1E-3
val EPS = 0.01
val SEED = 1
val PERC = 0.001
// K highest frequency elements to take
val TOPK = 10
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10))
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC)
var globalCMS = cms.zero
val mm = new MapMonoid[Long, Int]()
var globalExact = Map[Long, Int]()
val approxTopUsers = users.mapPartitions(ids => {
ids.map(id => cms.create(id))
}).reduce(_ ++ _)
val exactTopUsers = users.map(id => (id, 1))
.reduceByKey((a, b) => a + b)
approxTopUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
val partialTopK = partial.heavyHitters.map(id =>
(id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
globalCMS ++= partial
val globalTopK = globalCMS.heavyHitters.map(id =>
(id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC,
partialTopK.mkString("[", ",", "]")))
println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC,
globalTopK.mkString("[", ",", "]")))
}
})
exactTopUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partialMap = rdd.collect().toMap
val partialTopK = rdd.map(
{case (id, count) => (count, id)})
.sortByKey(ascending = false).take(TOPK)
globalExact = mm.plus(globalExact.toMap, partialMap)
val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]")))
println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]")))
}
})
ssc.start()
}
}
package spark.streaming.examples
import spark.streaming.{Seconds, StreamingContext}
import spark.storage.StorageLevel
import com.twitter.algebird.HyperLogLog._
import com.twitter.algebird.HyperLogLogMonoid
import spark.streaming.dstream.TwitterInputDStream
/**
* Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
* a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
* <p>
* <p>
* This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
* blog post</a> and this
* <a href="http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">blog post</a>
* have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for estimating
* the cardinality of a data stream, i.e. the number of unique elements.
* <p><p>
* Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the reduce operation.
*/
object TwitterAlgebirdHLL {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: TwitterAlgebirdHLL <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
/** Bit size parameter for HyperLogLog, trades off accuracy vs size */
val BIT_SIZE = 12
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5))
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
val hll = new HyperLogLogMonoid(BIT_SIZE)
var globalHll = hll.zero
var userSet: Set[Long] = Set()
val approxUsers = users.mapPartitions(ids => {
ids.map(id => hll(id))
}).reduce(_ + _)
val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
approxUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
globalHll += partial
println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt))
}
})
exactUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
userSet ++= partial
println("Exact distinct users this batch: %d".format(partial.size))
println("Exact distinct users overall: %d".format(userSet.size))
println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100))
}
})
ssc.start()
}
}
package spark.streaming.examples.twitter package spark.streaming.examples
import spark.streaming.StreamingContext._
import spark.streaming.{Seconds, StreamingContext} import spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import spark.SparkContext._ import spark.SparkContext._
import spark.storage.StorageLevel
/** /**
* Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
* stream. The stream is instantiated with credentials and optionally filters supplied by the * stream. The stream is instantiated with credentials and optionally filters supplied by the
* command line arguments. * command line arguments.
*
*/ */
object TwitterBasic { object TwitterPopularTags {
def main(args: Array[String]) { def main(args: Array[String]) {
if (args.length < 3) { if (args.length < 3) {
System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" + System.err.println("Usage: TwitterPopularTags <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]") " [filter1] [filter2] ... [filter n]")
System.exit(1) System.exit(1)
} }
...@@ -21,10 +21,8 @@ object TwitterBasic { ...@@ -21,10 +21,8 @@ object TwitterBasic {
val Array(master, username, password) = args.slice(0, 3) val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length) val filters = args.slice(3, args.length)
val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2)) val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2))
val stream = new TwitterInputDStream(ssc, username, password, filters, val stream = ssc.twitterStream(username, password, filters)
StorageLevel.MEMORY_ONLY_SER)
ssc.registerInputStream(stream)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
...@@ -39,22 +37,17 @@ object TwitterBasic { ...@@ -39,22 +37,17 @@ object TwitterBasic {
// Print popular hashtags // Print popular hashtags
topCounts60.foreach(rdd => { topCounts60.foreach(rdd => {
if (rdd.count() != 0) { val topList = rdd.take(5)
val topList = rdd.take(5) println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
}
}) })
topCounts10.foreach(rdd => { topCounts10.foreach(rdd => {
if (rdd.count() != 0) { val topList = rdd.take(5)
val topList = rdd.take(5) println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
}
}) })
ssc.start() ssc.start()
} }
} }
package spark.streaming.examples
import akka.actor.ActorSystem
import akka.actor.actorRef2Scala
import akka.zeromq._
import spark.streaming.{ Seconds, StreamingContext }
import spark.streaming.StreamingContext._
import akka.zeromq.Subscribe
/**
* A simple publisher for demonstration purposes, repeatedly publishes random Messages
* every one second.
*/
object SimpleZeroMQPublisher {
def main(args: Array[String]) = {
if (args.length < 2) {
System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
System.exit(1)
}
val Seq(url, topic) = args.toSeq
val acs: ActorSystem = ActorSystem()
val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
val messages: Array[String] = Array("words ", "may ", "count ")
while (true) {
Thread.sleep(1000)
pubSocket ! ZMQMessage(Frame(topic) :: messages.map(x => Frame(x.getBytes)).toList)
}
acs.awaitTermination()
}
}
/**
* A sample wordcount with ZeroMQStream stream
*
* To work with zeroMQ, some native libraries have to be installed.
* Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide](http://www.zeromq.org/intro:get-the-software)
*
* Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>
* In local mode, <master> should be 'local[n]' with n > 1
* <zeroMQurl> and <topic> describe where zeroMq publisher is running.
*
* To run this example locally, you may run publisher as
* `$ ./run spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
* and run the example as
* `$ ./run spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
*/
object ZeroMQWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(
"Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>" +
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
val Seq(master, url, topic) = args.toSeq
// Create the context and set the batch size
val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2))
def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator
//For this stream, a zeroMQ publisher should be running.
val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
}
}
\ No newline at end of file
...@@ -27,17 +27,16 @@ object PageViewStream { ...@@ -27,17 +27,16 @@ object PageViewStream {
val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1)) val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1))
// Create a NetworkInputDStream on target host:port and convert each line to a PageView // Create a NetworkInputDStream on target host:port and convert each line to a PageView
val pageViews = ssc.networkTextStream(host, port) val pageViews = ssc.socketTextStream(host, port)
.flatMap(_.split("\n")) .flatMap(_.split("\n"))
.map(PageView.fromString(_)) .map(PageView.fromString(_))
// Return a count of views per URL seen in each batch // Return a count of views per URL seen in each batch
val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey() val pageCounts = pageViews.map(view => view.url).countByValue()
// Return a sliding window of page views per URL in the last ten seconds // Return a sliding window of page views per URL in the last ten seconds
val slidingPageCounts = pageViews.map(view => ((view.url, 1))) val slidingPageCounts = pageViews.map(view => view.url)
.window(Seconds(10), Seconds(2)) .countByValueAndWindow(Seconds(10), Seconds(2))
.countByKey()
// Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment