-
Matei Zaharia authoredMatei Zaharia authored
layout: global
title: Spark Streaming Programming Guide
- This will become a table of contents (this text will be scraped). {:toc}
Overview
A Spark Streaming application is very similar to a Spark application; it consists of a driver program that runs the user's main
function and continuous executes various parallel operations on input streams of data. The main abstraction Spark Streaming provides is a discretized stream (DStream), which is a continuous sequence of RDDs (distributed collections of elements) representing a continuous stream of data. DStreams can be created from live incoming data (such as data from a socket, Kafka, etc.) or can be generated by transforming existing DStreams using parallel operators like map
, reduce
, and window
. The basic processing model is as follows:
(i) While a Spark Streaming driver program is running, the system receives data from various sources and and divides it into batches. Each batch of data is treated as an RDD, that is, an immutable parallel collection of data. These input RDDs are saved in memory and replicated to two nodes for fault-tolerance. This sequence of RDDs is collectively called an InputDStream.
(ii) Data received by InputDStreams are processed using DStream operations. Since all data is represented as RDDs and all DStream operations as RDD operations, data is automatically recovered in the event of node failures.
This guide shows some how to start programming with DStreams.
Linking with Spark Streaming
Add the following SBT or Maven dependency to your project to use Spark Streaming:
groupId = org.apache.spark
artifactId = spark-streaming_{{site.SCALA_VERSION}}
version = {{site.SPARK_VERSION}}
Initializing Spark Streaming
The first thing a Spark Streaming program must do is create a StreamingContext
object, which tells Spark how to access a cluster. A StreamingContext
can be created by using
{% highlight scala %} new StreamingContext(master, appName, batchDuration, [sparkHome], [jars]) {% endhighlight %}
The master
parameter is a standard Spark cluster URL and can be "local" for local testing. The appName
is a name of your program, which will be shown on your cluster's web UI. The batchDuration
is the size of the batches (as explained earlier). This must be set carefully such that the cluster can keep up with the processing of the data streams. Start with something conservative like 5 seconds. See the Performance Tuning section for a detailed discussion. Finally, sparkHome
and jars
are necessary when running on a cluster to specify the location of your code, as described in the Spark programming guide.
This constructor creates a SparkContext for your job as well, which can be accessed with streamingContext.sparkContext
.
Attaching Input Sources - InputDStreams
The StreamingContext is used to creating InputDStreams from input sources:
{% highlight scala %} // Assuming ssc is the StreamingContext ssc.textFileStream(directory) // Creates a stream by monitoring and processing new files in a HDFS directory ssc.socketStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port {% endhighlight %}
We also provide a input streams for Kafka, Flume, Akka actor, etc. For a complete list of input streams, take a look at the StreamingContext API documentation.
DStream Operations
Data received from the input streams can be processed using DStream operations. There are two kinds of operations - transformations and output operations. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, you'll need to call the output operations, which writies data out to an external source.
Transformations
DStreams support many of the transformations available on normal Spark RDD's:
Transformation | Meaning |
---|---|
map(func) | Returns a new DStream formed by passing each element of the source DStream through a function func. |
filter(func) | Returns a new DStream formed by selecting those elements of the source DStream on which func returns true. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). |
mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the DStream, so func must be of type Iterator[T] => Iterator[U] when running on an DStream of type T. |
union(otherStream) | Return a new DStream that contains the union of the elements in the source DStream and the argument DStream. |
count() | Returns a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |
reduce(func) | Returns a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. |
countByValue() | When called on a DStream of elements of type K, returns a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. |
groupByKey([numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together all the values of each key in the RDDs of the source DStream. Note: By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluster) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
|
reduceByKey(func, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in groupByKey , the number of reduce tasks is configurable through an optional second argument. |
join(otherStream, [numTasks]) | When called on two DStreams of (K, V) and (K, W) pairs, returns a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. |
cogroup(otherStream, [numTasks]) | When called on DStream of (K, V) and (K, W) pairs, returns a new DStream of (K, Seq[V], Seq[W]) tuples. |
transform(func) | Returns a new DStream by applying func (a RDD-to-RDD function) to every RDD of the stream. This can be used to do arbitrary RDD operations on the DStream. |
updateStateByKey(func) | Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of each key. This can be used to track session state by using the session-id as the key and updating the session state as new data is received. |
Spark Streaming features windowed computations, which allow you to apply transformations over a sliding window of data. All window functions take a windowDuration, which represents the width of the window and a slideTime, which represents the frequency during which the window is calculated.
Transformation | Meaning |
---|---|
window(windowDuration, slideDuration) | Return a new DStream which is computed based on windowed batches of the source DStream. windowDuration is the width of the window and slideTime is the frequency during which the window is calculated. Both times must be multiples of the batch interval. |
countByWindow(windowDuration, slideDuration) | Return a sliding count of elements in the stream. windowDuration and slideDuration are exactly as defined in window() .
|
reduceByWindow(func, windowDuration, slideDuration) | Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative so that it can be computed correctly in parallel. windowDuration and slideDuration are exactly as defined in window() .
|
groupByKeyAndWindow(windowDuration, slideDuration, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together values of each key over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluster) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
reduceByKeyAndWindow(func, windowDuration, slideDuration, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Like in groupByKeyAndWindow , the number of reduce tasks is configurable through an optional second argument.
windowDuration and slideDuration are exactly as defined in window() .
|
reduceByKeyAndWindow(func, invFunc, windowDuration, slideDuration, [numTasks]) | A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated
incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window, and "inverse reducing" the old data that leave the window. An example would be that of "adding" and "subtracting" counts of keys as the window slides. However, it is applicable to only "invertible reduce functions", that is, those reduce functions which have a corresponding "inverse reduce" function (taken as parameter invFunc. Like in groupByKeyAndWindow , the number of reduce tasks is configurable through an optional second argument.
windowDuration and slideDuration are exactly as defined in window() .
|
countByValueAndWindow(windowDuration, slideDuration, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in groupByKeyAndWindow , the number of reduce tasks is configurable through an optional second argument.
windowDuration and slideDuration are exactly as defined in window() .
|
A complete list of DStream operations is available in the API documentation of DStream and PairDStreamFunctions.
Output Operations
When an output operator is called, it triggers the computation of a stream. Currently the following output operators are defined:
Operator | Meaning |
---|---|
foreach(func) | The fundamental output operator. Applies a function, func, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system. |
print() | Prints first ten elements of every batch of data in a DStream on the driver. |
saveAsObjectFiles(prefix, [suffix]) | Save this DStream's contents as a SequenceFile of serialized objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
|
saveAsTextFiles(prefix, [suffix]) | Save this DStream's contents as a text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
saveAsHadoopFiles(prefix, [suffix]) | Save this DStream's contents as a Hadoop file. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
Starting the Streaming computation
All the above DStream operations are completely lazy, that is, the operations will start executing only after the context is started by using {% highlight scala %} ssc.start() {% endhighlight %}
Conversely, the computation can be stopped by using {% highlight scala %} ssc.stop() {% endhighlight %}
Example
A simple example to start off is the NetworkWordCount. This example counts the words received from a network server every second. Given below is the relevant sections of the source code. You can find the full source code in <Spark repo>/streaming/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
.
{% highlight scala %} import org.apache.spark.streaming.{Seconds, StreamingContext} import StreamingContext._ ...
// Create the context and set up a network input stream to receive from a host:port val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1)) val lines = ssc.socketTextStream(args(1), args(2).toInt)
// Split the lines into words, count them, and print some of the counts on the master val words = lines.flatMap(.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey( + _) wordCounts.print()
// Start the computation ssc.start() {% endhighlight %}
The socketTextStream
returns a DStream of lines received from a TCP socket-based source. The lines
DStream is transformed into a DStream using the flatMap
operation, where each line is split into words. This words
DStream is then mapped to a DStream of (word, 1)
pairs, which is finally reduced to get the word counts. wordCounts.print()
will print 10 of the counts generated every second.
To run this example on your local machine, you need to first run a Netcat server by using
{% highlight bash %} $ nc -lk 9999 {% endhighlight %}
Then, in a different terminal, you can start NetworkWordCount by using
{% highlight bash %} $ ./run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999 {% endhighlight %}
This will make NetworkWordCount connect to the netcat server. Any lines typed in the terminal running the netcat server will be counted and printed on screen.
{% highlight bash %}
# TERMINAL 1
# RUNNING NETCAT
$ nc -lk 9999 hello world ... {% endhighlight %} |
{% highlight bash %}
# TERMINAL 2: RUNNING NetworkWordCount
...
2012-12-31 18:47:10,446 INFO SparkContext: Job finished: run at ThreadPoolExecutor.java:886, took 0.038817 s
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
2012-12-31 18:47:10,447 INFO JobManager: Total delay: 0.44700 s for job 8 (execution: 0.44000 s) ... {% endhighlight %} |
You can find more examples in <Spark repo>/streaming/src/main/scala/spark/streaming/examples/
. They can be run in the similar manner using ./run-example org.apache.spark.streaming.examples....
. Executing without any parameter would give the required parameter list. Further explanation to run them can be found in comments in the files.
DStream Persistence
Similar to RDDs, DStreams also allow developers to persist the stream's data in memory. That is, using persist()
method on a DStream would automatically persist every RDD of that DStream in memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple operations on the same data). For window-based operations like reduceByWindow
and reduceByKeyAndWindow
and state-based operations like updateStateByKey
, this is implicitly true. Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling persist()
.
For input streams that receive data from the network (that is, subclasses of NetworkInputDStream like FlumeInputDStream and KafkaInputDStream), the default persistence level is set to replicate the data to two nodes for fault-tolerance.
Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in memory. This is further discussed in the Performance Tuning section. More information on different persistence levels can be found in Spark Programming Guide.