Skip to content
Snippets Groups Projects
  • Patrick Wendell's avatar
    08c1a42d
    Add a `repartition` operator. · 08c1a42d
    Patrick Wendell authored
    This patch adds an operator called repartition with more straightforward
    semantics than the current `coalesce` operator. There are a few use cases
    where this operator is useful:
    
    1. If a user wants to increase the number of partitions in the RDD. This
    is more common now with streaming. E.g. a user is ingesting data on one
    node but they want to add more partitions to ensure parallelism of
    subsequent operations across threads or the cluster.
    
    Right now they have to call rdd.coalesce(numSplits, shuffle=true) - that's
    super confusing.
    
    2. If a user has input data where the number of partitions is not known. E.g.
    
    > sc.textFile("some file").coalesce(50)....
    
    This is both vague semantically (am I growing or shrinking this RDD) but also,
    may not work correctly if the base RDD has fewer than 50 partitions.
    
    The new operator forces shuffles every time, so it will always produce exactly
    the number of new partitions. It also throws an exception rather than silently
    not-working if a bad input is passed.
    
    I am currently adding streaming tests (requires refactoring some of the test
    suite to allow testing at partition granularity), so this is not ready for
    merge yet. But feedback is welcome.
    08c1a42d
    History
    Add a `repartition` operator.
    Patrick Wendell authored
    This patch adds an operator called repartition with more straightforward
    semantics than the current `coalesce` operator. There are a few use cases
    where this operator is useful:
    
    1. If a user wants to increase the number of partitions in the RDD. This
    is more common now with streaming. E.g. a user is ingesting data on one
    node but they want to add more partitions to ensure parallelism of
    subsequent operations across threads or the cluster.
    
    Right now they have to call rdd.coalesce(numSplits, shuffle=true) - that's
    super confusing.
    
    2. If a user has input data where the number of partitions is not known. E.g.
    
    > sc.textFile("some file").coalesce(50)....
    
    This is both vague semantically (am I growing or shrinking this RDD) but also,
    may not work correctly if the base RDD has fewer than 50 partitions.
    
    The new operator forces shuffles every time, so it will always produce exactly
    the number of new partitions. It also throws an exception rather than silently
    not-working if a bad input is passed.
    
    I am currently adding streaming tests (requires refactoring some of the test
    suite to allow testing at partition granularity), so this is not ready for
    merge yet. But feedback is welcome.
streaming-programming-guide.md 36.94 KiB
layout: global
title: Spark Streaming Programming Guide
  • This will become a table of contents (this text will be scraped). {:toc}

Overview

A Spark Streaming application is very similar to a Spark application; it consists of a driver program that runs the user's main function and continuous executes various parallel operations on input streams of data. The main abstraction Spark Streaming provides is a discretized stream (DStream), which is a continuous sequence of RDDs (distributed collections of elements) representing a continuous stream of data. DStreams can be created from live incoming data (such as data from a socket, Kafka, etc.) or can be generated by transforming existing DStreams using parallel operators like map, reduce, and window. The basic processing model is as follows: (i) While a Spark Streaming driver program is running, the system receives data from various sources and and divides it into batches. Each batch of data is treated as an RDD, that is, an immutable parallel collection of data. These input RDDs are saved in memory and replicated to two nodes for fault-tolerance. This sequence of RDDs is collectively called an InputDStream. (ii) Data received by InputDStreams are processed using DStream operations. Since all data is represented as RDDs and all DStream operations as RDD operations, data is automatically recovered in the event of node failures.

This guide shows some how to start programming with DStreams.

Linking with Spark Streaming

Add the following SBT or Maven dependency to your project to use Spark Streaming:

groupId = org.apache.spark
artifactId = spark-streaming_{{site.SCALA_VERSION}}
version = {{site.SPARK_VERSION}}

Initializing Spark Streaming

The first thing a Spark Streaming program must do is create a StreamingContext object, which tells Spark how to access a cluster. A StreamingContext can be created by using

{% highlight scala %} new StreamingContext(master, appName, batchDuration, [sparkHome], [jars]) {% endhighlight %}

The master parameter is a standard Spark cluster URL and can be "local" for local testing. The appName is a name of your program, which will be shown on your cluster's web UI. The batchDuration is the size of the batches (as explained earlier). This must be set carefully such that the cluster can keep up with the processing of the data streams. Start with something conservative like 5 seconds. See the Performance Tuning section for a detailed discussion. Finally, sparkHome and jars are necessary when running on a cluster to specify the location of your code, as described in the Spark programming guide.

This constructor creates a SparkContext for your job as well, which can be accessed with streamingContext.sparkContext.

Attaching Input Sources - InputDStreams

The StreamingContext is used to creating InputDStreams from input sources:

{% highlight scala %} // Assuming ssc is the StreamingContext ssc.textFileStream(directory) // Creates a stream by monitoring and processing new files in a HDFS directory ssc.socketStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port {% endhighlight %}

We also provide a input streams for Kafka, Flume, Akka actor, etc. For a complete list of input streams, take a look at the StreamingContext API documentation.

DStream Operations

Data received from the input streams can be processed using DStream operations. There are two kinds of operations - transformations and output operations. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, you'll need to call the output operations, which writies data out to an external source.

Transformations