Skip to content
Snippets Groups Projects
Commit a975a19f authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-1504], [SPARK-1505], [SPARK-1558] Updated Spark Streaming guide

- SPARK-1558: Updated custom receiver guide to match it with the new API
- SPARK-1504: Added deployment and monitoring subsection to streaming
- SPARK-1505: Added migration guide for migrating from 0.9.x and below to Spark 1.0
- Updated various Java streaming examples to use JavaReceiverInputDStream to highlight the API change.
- Removed the requirement for cleaner ttl from streaming guide

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #652 from tdas/doc-fix and squashes the following commits:

cb4f4b7 [Tathagata Das] Possible fix for flaky graceful shutdown test.
ab71f7f [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into doc-fix
8d6ff9b [Tathagata Das] Addded migration guide to Spark Streaming.
7d171df [Tathagata Das] Added reference to JavaReceiverInputStream in examples and streaming guide.
49edd7c [Tathagata Das] Change java doc links to use Java docs.
11528d7 [Tathagata Das] Updated links on index page.
ff80970 [Tathagata Das] More updates to streaming guide.
4dc42e9 [Tathagata Das] Added monitoring and other documentation in the streaming guide.
14c6564 [Tathagata Das] Updated custom receiver guide.
parent 3292e2a7
No related branches found
No related tags found
No related merge requests found
...@@ -462,7 +462,7 @@ Apart from these, the following properties are also available, and may be useful ...@@ -462,7 +462,7 @@ Apart from these, the following properties are also available, and may be useful
<td>(infinite)</td> <td>(infinite)</td>
<td> <td>
Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.). Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.).
Periodic cleanups will ensure that metadata older than this duration will be forgetten. This is Periodic cleanups will ensure that metadata older than this duration will be forgotten. This is
useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming
applications). Note that any RDD that persists in memory for more than this duration will be cleared as well. applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.
</td> </td>
...@@ -471,8 +471,8 @@ Apart from these, the following properties are also available, and may be useful ...@@ -471,8 +471,8 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.streaming.blockInterval</td> <td>spark.streaming.blockInterval</td>
<td>200</td> <td>200</td>
<td> <td>
Duration (milliseconds) of how long to batch new objects coming from network receivers used Interval (milliseconds) at which data received by Spark Streaming receivers is coalesced
in Spark Streaming. into blocks of data before storing them in Spark.
</td> </td>
</tr> </tr>
<tr> <tr>
......
...@@ -112,10 +112,10 @@ Note that on Windows, you need to set the environment variables on separate line ...@@ -112,10 +112,10 @@ Note that on Windows, you need to set the environment variables on separate line
* [Shark](http://shark.cs.berkeley.edu): Apache Hive over Spark * [Shark](http://shark.cs.berkeley.edu): Apache Hive over Spark
* [Mailing Lists](http://spark.apache.org/mailing-lists.html): ask questions about Spark here * [Mailing Lists](http://spark.apache.org/mailing-lists.html): ask questions about Spark here
* [AMP Camps](http://ampcamp.berkeley.edu/): a series of training camps at UC Berkeley that featured talks and * [AMP Camps](http://ampcamp.berkeley.edu/): a series of training camps at UC Berkeley that featured talks and
exercises about Spark, Shark, Mesos, and more. [Videos](http://ampcamp.berkeley.edu/agenda-2012), exercises about Spark, Shark, Spark Streaming, Mesos, and more. [Videos](http://ampcamp.berkeley.edu/3/),
[slides](http://ampcamp.berkeley.edu/agenda-2012) and [exercises](http://ampcamp.berkeley.edu/exercises-2012) are [slides](http://ampcamp.berkeley.edu/3/) and [exercises](http://ampcamp.berkeley.edu/3/exercises/) are
available online for free. available online for free.
* [Code Examples](http://spark.apache.org/examples.html): more are also available in the [examples subfolder](https://github.com/apache/spark/tree/master/examples/src/main/scala/) of Spark * [Code Examples](http://spark.apache.org/examples.html): more are also available in the [examples subfolder](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/) of Spark
* [Paper Describing Spark](http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf) * [Paper Describing Spark](http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf)
* [Paper Describing Spark Streaming](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) * [Paper Describing Spark Streaming](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf)
......
...@@ -3,126 +3,219 @@ layout: global ...@@ -3,126 +3,219 @@ layout: global
title: Spark Streaming Custom Receivers title: Spark Streaming Custom Receivers
--- ---
A "Spark Streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need. Spark Streaming can receive streaming data from any arbitrary data source beyond
the one's for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.).
This requires the developer to implement a *receiver* that is customized for receiving data from
the concerned data source. This guide walks through the process of implementing a custom receiver
and using it in a Spark Streaming application.
### Implementing a Custom Receiver
This starts with implementing a [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver).
A custom receiver must extend this abstract class by implementing two methods
- `onStart()`: Things to do to start receiving data.
- `onStop()`: Things to do to stop receiving data.
Note that `onStart()` and `onStop()` must not block indefinitely. Typically, onStart() would start the threads
that responsible for receiving the data and `onStop()` would ensure that the receiving by those threads
are stopped. The receiving threads can also use `isStopped()`, a `Receiver` method, to check whether they
should stop receiving data.
Once the data is received, that data can be stored inside Spark
by calling `store(data)`, which is a method provided by the
[Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) class.
There are number of flavours of `store()` which allow you store the received data
record-at-a-time or as whole collection of objects / serialized bytes.
Any exception in the receiving threads should be caught and handled properly to avoid silent
failures of the receiver. `restart(<exception>)` will restart the receiver by
asynchronously calling `onStop()` and then calling `onStart()` after a delay.
`stop(<exception>)` will call `onStop()` and terminate the receiver. Also, `reportError(<error>)`
reports a error message to the driver (visible in the logs and UI) without stopping / restarting
the receiver.
The following is a custom receiver that receives a stream of text over a socket. It treats
'\n' delimited lines in the text stream as records and stores them with Spark. If the receiving thread
has any error connecting or receiving, the receiver is restarted to make another attempt to connect.
<div class="codetabs">
<div data-lang="scala" markdown="1" >
This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application. {% highlight scala %}
### Writing a Simple Receiver class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
This starts with implementing [NetworkReceiver](api/scala/index.html#org.apache.spark.streaming.dstream.NetworkReceiver). {% endhighlight %}
The following is a simple socket text-stream receiver. </div>
<div data-lang="java" markdown="1">
{% highlight java %}
public class JavaCustomReceiver extends Receiver<String> {
String host = null;
int port = -1;
public JavaCustomReceiver(String host_ , int port_) {
super(StorageLevel.MEMORY_AND_DISK_2());
host = host_;
port = port_;
}
public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
receive();
}
}.start();
}
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private void receive() {
Socket socket = null;
String userInput = null;
try {
// connect to the server
socket = new Socket(host, port);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
// Until stopped or connection broken continue reading
while (!isStopped() && (userInput = reader.readLine()) != null) {
System.out.println("Received data '" + userInput + "'");
store(userInput);
}
reader.close();
socket.close();
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again");
} catch(ConnectException ce) {
// restart if could not connect to server
restart("Could not connect", ce);
} catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
}
{% highlight scala %}
class SocketTextStreamReceiver(host: String, port: Int)
extends NetworkReceiver[String]
{
protected lazy val blocksGenerator: BlockGenerator =
new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)
protected def onStart() = {
blocksGenerator.start()
val socket = new Socket(host, port)
val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
var data: String = dataInputStream.readLine()
while (data != null) {
blocksGenerator += data
data = dataInputStream.readLine()
}
}
protected def onStop() {
blocksGenerator.stop()
}
}
{% endhighlight %} {% endhighlight %}
</div>
</div>
All we did here is extended NetworkReceiver and called blockGenerator's API method (i.e. +=) to push our blocks of data. Please refer to scala-docs of NetworkReceiver for more details.
### Using the custom receiver in a Spark Streaming application
### An Actor as Receiver The custom receiver can be used in a Spark Streaming application by using
`streamingContext.receiverStream(<instance of custom receiver>)`. This will create
input DStream using data received by the instance of custom receiver, as shown below
This starts with implementing [Actor](#References) <div class="codetabs">
<div data-lang="scala" markdown="1" >
Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api.
{% highlight scala %} {% highlight scala %}
class SocketTextStreamReceiver (host:String, // Assuming ssc is the StreamingContext
port:Int, val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
bytesToString: ByteString => String) extends Actor with Receiver { val words = lines.flatMap(_.split(" "))
...
override def preStart = IOManager(context.system).connect(host, port)
def receive = {
case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
}
}
{% endhighlight %} {% endhighlight %}
All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details. The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala).
### A Sample Spark Application
* First create a Spark streaming context with master url and batchduration. </div>
<div data-lang="java" markdown="1">
{% highlight scala %} {% highlight java %}
val ssc = new StreamingContext(master, "WordCountCustomStreamSource", // Assuming ssc is the JavaStreamingContext
Seconds(batchDuration)) JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... });
...
{% endhighlight %} {% endhighlight %}
* Plug-in the custom receiver into the spark streaming context and create a DStream. The full source code is in the example [JavaCustomReceiver.java](https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java).
{% highlight scala %} </div>
val lines = ssc.networkStream[String](new SocketTextStreamReceiver( </div>
"localhost", 8445))
{% endhighlight %}
* OR Plug-in the actor as receiver into the spark streaming context and create a DStream.
{% highlight scala %}
val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
{% endhighlight %}
* Process it. ### Implementing and Using a Custom Actor-based Receiver
{% highlight scala %} Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can also be used to
val words = lines.flatMap(_.split(" ")) receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) trait can be applied on any Akka actor, which allows received data to be stored in Spark using
`store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc.
wordCounts.print() {% highlight scala %}
ssc.start() class CustomActor extends Actor with ActorHelper {
def receive = {
case data: String => store(data)
}
}
{% endhighlight %} {% endhighlight %}
* After processing it, stream can be tested using the netcat utility. And a new input stream can be created with this custom actor as
$ nc -l localhost 8445
hello world
hello hello
## Multiple Homogeneous/Heterogeneous Receivers.
A DStream union operation is provided for taking union on multiple input streams.
{% highlight scala %} {% highlight scala %}
val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( // Assuming ssc is the StreamingContext
"localhost",8445, z => z.utf8String)),"SocketReceiver") val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")
// Another socket stream receiver
val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8446, z => z.utf8String)),"SocketReceiver")
val union = lines.union(lines2)
{% endhighlight %} {% endhighlight %}
Above stream can be easily process as described earlier. See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala)
for an end-to-end example.
_A more comprehensive example is provided in the spark streaming examples_
## References
1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
2.[NetworkReceiver](api/scala/index.html#org.apache.spark.streaming.dstream.NetworkReceiver)
...@@ -136,7 +136,7 @@ The complete code can be found in the Spark Streaming example ...@@ -136,7 +136,7 @@ The complete code can be found in the Spark Streaming example
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
First, we create a First, we create a
[JavaStreamingContext](api/scala/index.html#org.apache.spark.streaming.api.java.JavaStreamingContext) object, [JavaStreamingContext](api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html) object,
which is the main entry point for all streaming which is the main entry point for all streaming
functionality. Besides Spark's configuration, we specify that any DStream would be processed functionality. Besides Spark's configuration, we specify that any DStream would be processed
in 1 second batches. in 1 second batches.
...@@ -155,7 +155,7 @@ by specifying the IP address and port of the data server. ...@@ -155,7 +155,7 @@ by specifying the IP address and port of the data server.
{% highlight java %} {% highlight java %}
// Create a DStream that will connect to serverIP:serverPort, like localhost:9999 // Create a DStream that will connect to serverIP:serverPort, like localhost:9999
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999); JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
{% endhighlight %} {% endhighlight %}
This `lines` DStream represents the stream of data that will be received from the data This `lines` DStream represents the stream of data that will be received from the data
...@@ -863,6 +863,51 @@ For DStreams that must be checkpointed (that is, DStreams created by `updateStat ...@@ -863,6 +863,51 @@ For DStreams that must be checkpointed (that is, DStreams created by `updateStat
`reduceByKeyAndWindow` with inverse function), the checkpoint interval of the DStream is by `reduceByKeyAndWindow` with inverse function), the checkpoint interval of the DStream is by
default set to a multiple of the DStream's sliding interval such that its at least 10 seconds. default set to a multiple of the DStream's sliding interval such that its at least 10 seconds.
## Deployment
A Spark Streaming application is deployed on a cluster in the same way as any other Spark application.
Please refer to the [deployment guide](cluster-overview.html) for more details.
If a running Spark Streaming application needs to be upgraded (with new application code), then
there are two possible mechanism.
- The upgraded Spark Streaming application is started and run in parallel to the existing application.
Once the new one (receiving the same data as the old one) has been warmed up and ready
for prime time, the old one be can be brought down. Note that this can be done for data sources that support
sending the data to two destinations (i.e., the earlier and upgraded applications).
- The existing application is shutdown gracefully (see
[`StreamingContext.stop(...)`](api/scala/index.html#org.apache.spark.streaming.StreamingContext)
or [`JavaStreamingContext.stop(...)`](api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html)
for graceful shutdown options) which ensure data that have been received is completely
processed before shutdown. Then the
upgraded application can be started, which will start processing from the same point where the earlier
application left off. Note that this can be done only with input sources that support source-side buffering
(like Kafka, and Flume) as data needs to be buffered while the previous application down and
the upgraded application is not yet up.
## Monitoring
Beyond Spark's [monitoring capabilities](monitoring.html), there are additional capabilities
specific to Spark Streaming. When a StreamingContext is used, the
[Spark web UI](monitoring.html#web-interfaces) shows
an additional `Streaming` tab which shows statistics about running receivers (whether
receivers are active, number of records received, receiver error, etc.)
and completed batches (batch processing times, queueing delays, etc.). This can be used to
monitor the progress of the streaming application.
The following two metrics in web UI is particularly important -
*Processing Time* and *Scheduling Delay* (under *Batch Processing Statistics*). The first is the
time to process each batch of data, and the second is the time a batch waits in a queue
for the processing of previous batches to finish. If the batch processing time is consistently more
than the batch interval and/or the queueing delay keeps increasing, then it indicates the system is
not able to process the batches as fast they are being generated and falling behind.
In that case, consider
[reducing](#reducing-the-processing-time-of-each-batch) the batch processing time.
The progress of a Spark Streaming program can also be monitored using the
[StreamingListener](api/scala/index.html#org.apache.spark.scheduler.StreamingListener) interface,
which allows you to get receiver status and processing times. Note that this is a developer API
and it is likely to be improved upon (i.e., more information reported) in the future.
*************************************************************************************************** ***************************************************************************************************
# Performance Tuning # Performance Tuning
...@@ -875,7 +920,8 @@ improve the performance of you application. At a high level, you need to conside ...@@ -875,7 +920,8 @@ improve the performance of you application. At a high level, you need to conside
Reducing the processing time of each batch of data by efficiently using cluster resources. Reducing the processing time of each batch of data by efficiently using cluster resources.
</li> </li>
<li> <li>
Setting the right batch size such that the data processing can keep up with the data ingestion. Setting the right batch size such that the batches of data can be processed as fast as they
are received (that is, data processing keeps up with the data ingestion).
</li> </li>
</ol> </ol>
...@@ -884,7 +930,30 @@ There are a number of optimizations that can be done in Spark to minimize the pr ...@@ -884,7 +930,30 @@ There are a number of optimizations that can be done in Spark to minimize the pr
each batch. These have been discussed in detail in [Tuning Guide](tuning.html). This section each batch. These have been discussed in detail in [Tuning Guide](tuning.html). This section
highlights some of the most important ones. highlights some of the most important ones.
### Level of Parallelism ### Level of Parallelism in Data Receiving
Receiving data over the network (like Kafka, Flume, socket, etc.) requires the data to deserialized
and stored in Spark. If the data receiving becomes a bottleneck in the system, then consider
parallelizing the data receiving. Note that each input DStream
creates a single receiver (running on a worker machine) that receives a single stream of data.
Receiving multiple data streams can therefore be achieved by creating multiple input DStreams
and configuring them to receive different partitions of the data stream from the source(s).
For example, a single Kafka input stream receiving two topics of data can be split into two
Kafka input streams, each receiving only one topic. This would run two receivers on two workers,
thus allowing data to received in parallel, and increasing overall throughput.
Another parameter that should be considered is the receiver's blocking interval. For most receivers,
the received data is coalesced together into large blocks of data before storing inside Spark's memory.
The number of blocks in each batch determines the number of tasks that will be used to process those
the received data in a map-like transformation. This blocking interval is determined by the
[configuration parameter](configuration.html) `spark.streaming.blockInterval` and the default value
is 200 milliseconds.
An alternative to receiving data with multiple input streams / receivers is to explicitly repartition
the input data stream (using `inputStream.repartition(<number of partitions>)`).
This distributes the received batches of data across all the machines in the cluster
before further processing.
### Level of Parallelism in Data Processing
Cluster resources maybe under-utilized if the number of parallel tasks used in any stage of the Cluster resources maybe under-utilized if the number of parallel tasks used in any stage of the
computation is not high enough. For example, for distributed reduce operations like `reduceByKey` computation is not high enough. For example, for distributed reduce operations like `reduceByKey`
and `reduceByKeyAndWindow`, the default number of parallel tasks is 8. You can pass the level of and `reduceByKeyAndWindow`, the default number of parallel tasks is 8. You can pass the level of
...@@ -921,16 +990,22 @@ These changes may reduce batch processing time by 100s of milliseconds, ...@@ -921,16 +990,22 @@ These changes may reduce batch processing time by 100s of milliseconds,
thus allowing sub-second batch size to be viable. thus allowing sub-second batch size to be viable.
## Setting the Right Batch Size ## Setting the Right Batch Size
For a Spark Streaming application running on a cluster to be stable, the processing of the data For a Spark Streaming application running on a cluster to be stable, the system should be able to
streams must keep up with the rate of ingestion of the data streams. Depending on the type of process data as fast as it is being received. In other words, batches of data should be processed
computation, the batch size used may have significant impact on the rate of ingestion that can be as fast as they are being generated. Whether this is true for an application can be found by
sustained by the Spark Streaming application on a fixed cluster resources. For example, let us [monitoring](#monitoring) the processing times in the streaming web UI, where the batch
processing time should be less than the batch interval.
Depending on the nature of the streaming
computation, the batch interval used may have significant impact on the data rates that can be
sustained by the application on a fixed set of cluster resources. For example, let us
consider the earlier WordCountNetwork example. For a particular data rate, the system may be able consider the earlier WordCountNetwork example. For a particular data rate, the system may be able
to keep up with reporting word counts every 2 seconds (i.e., batch size of 2 seconds), but not to keep up with reporting word counts every 2 seconds (i.e., batch interval of 2 seconds), but not
every 500 milliseconds. every 500 milliseconds. So the batch interval needs to be set such that the expected data rate in
production can be sustained.
A good approach to figure out the right batch size for your application is to test it with a A good approach to figure out the right batch size for your application is to test it with a
conservative batch size (say, 5-10 seconds) and a low data rate. To verify whether the system conservative batch interval (say, 5-10 seconds) and a low data rate. To verify whether the system
is able to keep up with data rate, you can check the value of the end-to-end delay experienced is able to keep up with data rate, you can check the value of the end-to-end delay experienced
by each processed batch (either look for "Total delay" in Spark driver log4j logs, or use the by each processed batch (either look for "Total delay" in Spark driver log4j logs, or use the
[StreamingListener](api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener) [StreamingListener](api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener)
...@@ -942,29 +1017,6 @@ data rate and/or reducing the batch size. Note that momentary increase in the de ...@@ -942,29 +1017,6 @@ data rate and/or reducing the batch size. Note that momentary increase in the de
temporary data rate increases maybe fine as long as the delay reduces back to a low value temporary data rate increases maybe fine as long as the delay reduces back to a low value
(i.e., less than batch size). (i.e., less than batch size).
## 24/7 Operation
By default, Spark does not forget any of the metadata (RDDs generated, stages processed, etc.).
But for a Spark Streaming application to operate 24/7, it is necessary for Spark to do periodic
cleanup of it metadata. This can be enabled by setting the
[configuration property](configuration.html#spark-properties) `spark.cleaner.ttl` to the number of
seconds you want any metadata to persist. For example, setting `spark.cleaner.ttl` to 600 would
cause Spark periodically cleanup all metadata and persisted RDDs that are older than 10 minutes.
Note, that this property needs to be set before the SparkContext is created.
This value is closely tied with any window operation that is being used. Any window operation
would require the input data to be persisted in memory for at least the duration of the window.
Hence it is necessary to set the delay to at least the value of the largest window operation used
in the Spark Streaming application. If this delay is set too low, the application will throw an
exception saying so.
## Monitoring
Besides Spark's in-built [monitoring capabilities](monitoring.html),
the progress of a Spark Streaming program can also be monitored using the [StreamingListener]
(api/scala/index.html#org.apache.spark.scheduler.StreamingListener) interface,
which allows you to get statistics of batch processing times, queueing delays,
and total end-to-end delays. Note that this is still an experimental API and it is likely to be
improved upon (i.e., more information reported) in the future.
## Memory Tuning ## Memory Tuning
Tuning the memory usage and GC behavior of Spark applications have been discussed in great detail Tuning the memory usage and GC behavior of Spark applications have been discussed in great detail
in the [Tuning Guide](tuning.html). It is recommended that you read that. In this section, in the [Tuning Guide](tuning.html). It is recommended that you read that. In this section,
...@@ -1249,18 +1301,80 @@ in the file. This is what the sequence of outputs would be with and without a dr ...@@ -1249,18 +1301,80 @@ in the file. This is what the sequence of outputs would be with and without a dr
If the driver had crashed in the middle of the processing of time 3, then it will process time 3 If the driver had crashed in the middle of the processing of time 3, then it will process time 3
and output 30 after recovery. and output 30 after recovery.
***************************************************************************************************
# Migration Guide from 0.9.1 or below to 1.x
Between Spark 0.9.1 and Spark 1.0, there were a few API changes made to ensure future API stability.
This section elaborates the steps required to migrate your existing code to 1.0.
**Input DStreams**: All operations that create an input stream (e.g., `StreamingContext.socketStream`,
`FlumeUtils.createStream`, etc.) now returns
[InputDStream](api/scala/index.html#org.apache.spark.streaming.dstream.InputDStream) /
[ReceiverInputDStream](api/scala/index.html#org.apache.spark.streaming.dstream.ReceiverInputDStream)
(instead of DStream) for Scala, and [JavaInputDStream](api/java/org/apache/spark/streaming/api/java/JavaInputDStream.html) /
[JavaPairInputDStream](api/java/org/apache/spark/streaming/api/java/JavaPairInputDStream.html) /
[JavaReceiverInputDStream](api/java/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.html) /
[JavaPairReceiverInputDStream](api/java/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html)
(instead of JavaDStream) for Java. This ensures that functionality specific to input streams can
be added to these classes in the future without breaking binary compatibility.
Note that your existing Spark Streaming applications should not require any change
(as these new classes are subclasses of DStream/JavaDStream) but may require recompilation with Spark 1.0.
**Custom Network Receivers**: Since the release to Spark Streaming, custom network receivers could be defined
in Scala using the class NetworkReceiver. However, the API was limited in terms of error handling
and reporting, and could not be used from Java. Starting Spark 1.0, this class has been
replaced by [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) which has
the following advantages.
* Methods like `stop` and `restart` have been added to for better control of the lifecycle of a receiver. See
the [custom receiver guide](streaming-custom-receiver.html) for more details.
* Custom receivers can be implemented using both Scala and Java.
To migrate your existing custom receivers from the earlier NetworkReceiver to the new Receiver, you have
to do the following.
* Make your custom receiver class extend
[`org.apache.spark.streaming.receiver.Receiver`](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver)
instead of `org.apache.spark.streaming.dstream.NetworkReceiver`.
* Earlier, a BlockGenerator object had to be created by the custom receiver, to which received data was
added for being stored in Spark. It had to be explicitly started and stopped from `onStart()` and `onStop()`
methods. The new Receiver class makes this unnecessary as it adds a set of methods named `store(<data>)`
that can be called to store the data in Spark. So, to migrate your custom network receiver, remove any
BlockGenerator object (does not exist any more in Spark 1.0 anyway), and use `store(...)` methods on
received data.
**Actor-based Receivers**: Data could have been received using any Akka Actors by extending the actor class with
`org.apache.spark.streaming.receivers.Receiver` trait. This has been renamed to
[`org.apache.spark.streaming.receiver.ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
and the `pushBlock(...)` methods to store received data has been renamed to `store(...)`. Other helper classes in
the `org.apache.spark.streaming.receivers` package were also moved
to [`org.apache.spark.streaming.receiver`](api/scala/index.html#org.apache.spark.streaming.receiver.package)
package and renamed for better clarity.
***************************************************************************************************
# Where to Go from Here # Where to Go from Here
* API documentation * API documentation
- Main docs of StreamingContext and DStreams in [Scala](api/scala/index.html#org.apache.spark.streaming.package) - Scala docs
and [Java](api/scala/index.html#org.apache.spark.streaming.api.java.package) * [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) and
- Additional docs for [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream)
[Kafka](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$), * [KafkaUtils](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$),
[Flume](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$), [FlumeUtils](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$),
[Twitter](api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$), [TwitterUtils](api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$),
[ZeroMQ](api/scala/index.html#org.apache.spark.streaming.zeromq.ZeroMQUtils$), and [ZeroMQUtils](api/scala/index.html#org.apache.spark.streaming.zeromq.ZeroMQUtils$), and
[MQTT](api/scala/index.html#org.apache.spark.streaming.mqtt.MQTTUtils$) [MQTTUtils](api/scala/index.html#org.apache.spark.streaming.mqtt.MQTTUtils$)
- Java docs
* [JavaStreamingContext](api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html),
[JavaDStream](api/java/org/apache/spark/streaming/api/java/JavaDStream.html) and
[PairJavaDStream](api/java/org/apache/spark/streaming/api/java/PairJavaDStream.html)
* [KafkaUtils](api/java/org/apache/spark/streaming/kafka/KafkaUtils.html),
[FlumeUtils](api/java/org/apache/spark/streaming/flume/FlumeUtils.html),
[TwitterUtils](api/java/org/apache/spark/streaming/twitter/TwitterUtils.html),
[ZeroMQUtils](api/java/org/apache/spark/streaming/zeromq/ZeroMQUtils.html), and
[MQTTUtils](api/java/org/apache/spark/streaming/mqtt/MQTTUtils.html)
* More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples) * More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples)
and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/streaming/examples) and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/streaming/examples)
* [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) describing Spark Streaming. * [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) and
[video](http://youtu.be/g171ndOHgJ0) describing Spark Streaming.
...@@ -26,6 +26,7 @@ import org.apache.spark.storage.StorageLevel; ...@@ -26,6 +26,7 @@ import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.receiver.Receiver; import org.apache.spark.streaming.receiver.Receiver;
import scala.Tuple2; import scala.Tuple2;
...@@ -69,7 +70,7 @@ public class JavaCustomReceiver extends Receiver<String> { ...@@ -69,7 +70,7 @@ public class JavaCustomReceiver extends Receiver<String> {
// Create a input stream with the custom receiver on target ip:port and count the // Create a input stream with the custom receiver on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc') // words in input stream of \n delimited text (eg. generated by 'nc')
JavaDStream<String> lines = ssc.receiverStream( JavaReceiverInputDStream<String> lines = ssc.receiverStream(
new JavaCustomReceiver(args[1], Integer.parseInt(args[2]))); new JavaCustomReceiver(args[1], Integer.parseInt(args[2])));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override @Override
......
...@@ -58,7 +58,7 @@ public final class JavaFlumeEventCount { ...@@ -58,7 +58,7 @@ public final class JavaFlumeEventCount {
JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
System.getenv("SPARK_HOME"), System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class)); JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port); JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port);
flumeStream.count(); flumeStream.count();
......
...@@ -29,6 +29,7 @@ import org.apache.spark.api.java.function.PairFunction; ...@@ -29,6 +29,7 @@ import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2; import scala.Tuple2;
...@@ -73,7 +74,8 @@ public final class JavaKafkaWordCount { ...@@ -73,7 +74,8 @@ public final class JavaKafkaWordCount {
topicMap.put(topic, numThreads); topicMap.put(topic, numThreads);
} }
JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, args[1], args[2], topicMap); JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[1], args[2], topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override @Override
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.spark.streaming.examples; package org.apache.spark.streaming.examples;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import scala.Tuple2; import scala.Tuple2;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.Function2;
...@@ -57,9 +58,9 @@ public final class JavaNetworkWordCount { ...@@ -57,9 +58,9 @@ public final class JavaNetworkWordCount {
new Duration(1000), System.getenv("SPARK_HOME"), new Duration(1000), System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class)); JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
// Create a NetworkInputDStream on target ip:port and count the // Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc') // words in input stream of \n delimited text (eg. generated by 'nc')
JavaDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2])); JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2]));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override @Override
public Iterable<String> call(String x) { public Iterable<String> call(String x) {
......
...@@ -28,9 +28,11 @@ import org.apache.spark.annotation.DeveloperApi ...@@ -28,9 +28,11 @@ import org.apache.spark.annotation.DeveloperApi
/** /**
* :: DeveloperApi :: * :: DeveloperApi ::
* Abstract class of a receiver that can be run on worker nodes to receive external data. A * Abstract class of a receiver that can be run on worker nodes to receive external data. A
* custom receiver can be defined by defining the functions onStart() and onStop(). onStart() * custom receiver can be defined by defining the functions `onStart()` and `onStop()`. `onStart()`
* should define the setup steps necessary to start receiving data, * should define the setup steps necessary to start receiving data,
* and onStop() should define the cleanup steps necessary to stop receiving data. * and `onStop()` should define the cleanup steps necessary to stop receiving data.
* Exceptions while receiving can be handled either by restarting the receiver with `restart(...)`
* or stopped completely by `stop(...)` or
* *
* A custom receiver in Scala would look like this. * A custom receiver in Scala would look like this.
* *
......
...@@ -183,6 +183,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ...@@ -183,6 +183,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
"Received records = " + TestReceiver.counter.get() + ", " + "Received records = " + TestReceiver.counter.get() + ", " +
"processed records = " + runningCount "processed records = " + runningCount
) )
Thread.sleep(100)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment