Skip to content
Snippets Groups Projects
Commit def8126d authored by Tathagata Das's avatar Tathagata Das
Browse files

Added TwitterInputDStream from example to StreamingContext. Renamed example...

Added TwitterInputDStream from example to StreamingContext. Renamed example TwitterBasic to TwitterPopularTags.
parent 2eacf224
No related branches found
No related tags found
No related merge requests found
package spark.streaming.examples.twitter package spark.streaming.examples
import spark.streaming.StreamingContext._
import spark.streaming.{Seconds, StreamingContext} import spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import spark.SparkContext._ import spark.SparkContext._
import spark.storage.StorageLevel
/** /**
* Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
* stream. The stream is instantiated with credentials and optionally filters supplied by the * stream. The stream is instantiated with credentials and optionally filters supplied by the
* command line arguments. * command line arguments.
*
*/ */
object TwitterBasic { object TwitterPopularTags {
def main(args: Array[String]) { def main(args: Array[String]) {
if (args.length < 3) { if (args.length < 3) {
System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" + System.err.println("Usage: TwitterPopularTags <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]") " [filter1] [filter2] ... [filter n]")
System.exit(1) System.exit(1)
} }
...@@ -21,10 +21,8 @@ object TwitterBasic { ...@@ -21,10 +21,8 @@ object TwitterBasic {
val Array(master, username, password) = args.slice(0, 3) val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length) val filters = args.slice(3, args.length)
val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2)) val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2))
val stream = new TwitterInputDStream(ssc, username, password, filters, val stream = ssc.twitterStream(username, password, filters)
StorageLevel.MEMORY_ONLY_SER)
ssc.registerInputStream(stream)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
...@@ -39,22 +37,17 @@ object TwitterBasic { ...@@ -39,22 +37,17 @@ object TwitterBasic {
// Print popular hashtags // Print popular hashtags
topCounts60.foreach(rdd => { topCounts60.foreach(rdd => {
if (rdd.count() != 0) { val topList = rdd.take(5)
val topList = rdd.take(5) println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
}
}) })
topCounts10.foreach(rdd => { topCounts10.foreach(rdd => {
if (rdd.count() != 0) { val topList = rdd.take(5)
val topList = rdd.take(5) println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
}
}) })
ssc.start() ssc.start()
} }
} }
...@@ -154,10 +154,7 @@ object SparkBuild extends Build { ...@@ -154,10 +154,7 @@ object SparkBuild extends Build {
) )
def examplesSettings = sharedSettings ++ Seq( def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples", name := "spark-examples"
libraryDependencies ++= Seq(
"org.twitter4j" % "twitter4j-stream" % "3.0.3"
)
) )
def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel") def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
...@@ -166,7 +163,8 @@ object SparkBuild extends Build { ...@@ -166,7 +163,8 @@ object SparkBuild extends Build {
name := "spark-streaming", name := "spark-streaming",
libraryDependencies ++= Seq( libraryDependencies ++= Seq(
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile", "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile",
"com.github.sgroschupf" % "zkclient" % "0.1" "com.github.sgroschupf" % "zkclient" % "0.1",
"org.twitter4j" % "twitter4j-stream" % "3.0.3"
) )
) ++ assemblySettings ++ extraAssemblySettings ) ++ assemblySettings ++ extraAssemblySettings
......
...@@ -17,6 +17,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} ...@@ -17,6 +17,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import java.util.UUID import java.util.UUID
import twitter4j.Status
/** /**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
...@@ -30,14 +31,14 @@ class StreamingContext private ( ...@@ -30,14 +31,14 @@ class StreamingContext private (
) extends Logging { ) extends Logging {
/** /**
* Creates a StreamingContext using an existing SparkContext. * Create a StreamingContext using an existing SparkContext.
* @param sparkContext Existing SparkContext * @param sparkContext Existing SparkContext
* @param batchDuration The time interval at which streaming data will be divided into batches * @param batchDuration The time interval at which streaming data will be divided into batches
*/ */
def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration) def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration)
/** /**
* Creates a StreamingContext by providing the details necessary for creating a new SparkContext. * Create a StreamingContext by providing the details necessary for creating a new SparkContext.
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param frameworkName A name for your job, to display on the cluster web UI * @param frameworkName A name for your job, to display on the cluster web UI
* @param batchDuration The time interval at which streaming data will be divided into batches * @param batchDuration The time interval at which streaming data will be divided into batches
...@@ -46,7 +47,7 @@ class StreamingContext private ( ...@@ -46,7 +47,7 @@ class StreamingContext private (
this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration) this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
/** /**
* Re-creates a StreamingContext from a checkpoint file. * Re-create a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or * @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'. * to the checkpoint file 'graph' or 'graph.bk'.
*/ */
...@@ -101,12 +102,12 @@ class StreamingContext private ( ...@@ -101,12 +102,12 @@ class StreamingContext private (
protected[streaming] var scheduler: Scheduler = null protected[streaming] var scheduler: Scheduler = null
/** /**
* Returns the associated Spark context * Return the associated Spark context
*/ */
def sparkContext = sc def sparkContext = sc
/** /**
* Sets each DStreams in this context to remember RDDs it generated in the last given duration. * Set each DStreams in this context to remember RDDs it generated in the last given duration.
* DStreams remember RDDs only for a limited duration of time and releases them for garbage * DStreams remember RDDs only for a limited duration of time and releases them for garbage
* collection. This method allows the developer to specify how to long to remember the RDDs ( * collection. This method allows the developer to specify how to long to remember the RDDs (
* if the developer wishes to query old data outside the DStream computation). * if the developer wishes to query old data outside the DStream computation).
...@@ -117,7 +118,7 @@ class StreamingContext private ( ...@@ -117,7 +118,7 @@ class StreamingContext private (
} }
/** /**
* Sets the context to periodically checkpoint the DStream operations for master * Set the context to periodically checkpoint the DStream operations for master
* fault-tolerance. By default, the graph will be checkpointed every batch interval. * fault-tolerance. By default, the graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
* @param interval checkpoint interval * @param interval checkpoint interval
...@@ -200,7 +201,7 @@ class StreamingContext private ( ...@@ -200,7 +201,7 @@ class StreamingContext private (
} }
/** /**
* Creates a input stream from a Flume source. * Create a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent * @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent * @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects * @param storageLevel Storage level to use for storing the received objects
...@@ -236,7 +237,7 @@ class StreamingContext private ( ...@@ -236,7 +237,7 @@ class StreamingContext private (
} }
/** /**
* Creates a input stream that monitors a Hadoop-compatible filesystem * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format. * for new files and reads them using the given key-value types and input format.
* File names starting with . are ignored. * File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file * @param directory HDFS directory to monitor for new file
...@@ -255,7 +256,7 @@ class StreamingContext private ( ...@@ -255,7 +256,7 @@ class StreamingContext private (
} }
/** /**
* Creates a input stream that monitors a Hadoop-compatible filesystem * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format. * for new files and reads them using the given key-value types and input format.
* @param directory HDFS directory to monitor for new file * @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process * @param filter Function to filter paths to process
...@@ -274,9 +275,8 @@ class StreamingContext private ( ...@@ -274,9 +275,8 @@ class StreamingContext private (
inputStream inputStream
} }
/** /**
* Creates a input stream that monitors a Hadoop-compatible filesystem * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value * for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). File names starting with . are ignored. * as Text and input format as TextInputFormat). File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file * @param directory HDFS directory to monitor for new file
...@@ -286,7 +286,25 @@ class StreamingContext private ( ...@@ -286,7 +286,25 @@ class StreamingContext private (
} }
/** /**
* Creates an input stream from a queue of RDDs. In each batch, * Create a input stream that returns tweets received from Twitter.
* @param username Twitter username
* @param password Twitter password
* @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
def twitterStream(
username: String,
password: String,
filters: Seq[String],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[Status] = {
val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel)
registerInputStream(inputStream)
inputStream
}
/**
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue. * it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs * @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
...@@ -300,7 +318,7 @@ class StreamingContext private ( ...@@ -300,7 +318,7 @@ class StreamingContext private (
} }
/** /**
* Creates an input stream from a queue of RDDs. In each batch, * Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue. * it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs * @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
...@@ -325,7 +343,7 @@ class StreamingContext private ( ...@@ -325,7 +343,7 @@ class StreamingContext private (
} }
/** /**
* Registers an input stream that will be started (InputDStream.start() called) to get the * Register an input stream that will be started (InputDStream.start() called) to get the
* input data. * input data.
*/ */
def registerInputStream(inputStream: InputDStream[_]) { def registerInputStream(inputStream: InputDStream[_]) {
...@@ -333,7 +351,7 @@ class StreamingContext private ( ...@@ -333,7 +351,7 @@ class StreamingContext private (
} }
/** /**
* Registers an output stream that will be computed every interval * Register an output stream that will be computed every interval
*/ */
def registerOutputStream(outputStream: DStream[_]) { def registerOutputStream(outputStream: DStream[_]) {
graph.addOutputStream(outputStream) graph.addOutputStream(outputStream)
...@@ -351,7 +369,7 @@ class StreamingContext private ( ...@@ -351,7 +369,7 @@ class StreamingContext private (
} }
/** /**
* Starts the execution of the streams. * Start the execution of the streams.
*/ */
def start() { def start() {
if (checkpointDir != null && checkpointDuration == null && graph != null) { if (checkpointDir != null && checkpointDuration == null && graph != null) {
...@@ -379,7 +397,7 @@ class StreamingContext private ( ...@@ -379,7 +397,7 @@ class StreamingContext private (
} }
/** /**
* Stops the execution of the streams. * Stop the execution of the streams.
*/ */
def stop() { def stop() {
try { try {
......
package spark.streaming.examples.twitter package spark.streaming.dstream
import spark._ import spark._
import spark.streaming._ import spark.streaming._
...@@ -6,7 +6,6 @@ import dstream.{NetworkReceiver, NetworkInputDStream} ...@@ -6,7 +6,6 @@ import dstream.{NetworkReceiver, NetworkInputDStream}
import storage.StorageLevel import storage.StorageLevel
import twitter4j._ import twitter4j._
import twitter4j.auth.BasicAuthorization import twitter4j.auth.BasicAuthorization
import collection.JavaConversions._
/* A stream of Twitter statuses, potentially filtered by one or more keywords. /* A stream of Twitter statuses, potentially filtered by one or more keywords.
* *
...@@ -50,7 +49,7 @@ class TwitterReceiver( ...@@ -50,7 +49,7 @@ class TwitterReceiver(
def onTrackLimitationNotice(i: Int) {} def onTrackLimitationNotice(i: Int) {}
def onScrubGeo(l: Long, l1: Long) {} def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {} def onStallWarning(stallWarning: StallWarning) {}
def onException(e: Exception) {} def onException(e: Exception) { stopOnError(e) }
}) })
val query: FilterQuery = new FilterQuery val query: FilterQuery = new FilterQuery
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment