Skip to content
Snippets Groups Projects
Commit 092c6725 authored by Tathagata Das's avatar Tathagata Das Committed by Shixiong Zhu
Browse files

[SPARK-18669][SS][DOCS] Update Apache docs for Structured Streaming regarding...

[SPARK-18669][SS][DOCS] Update Apache docs for Structured Streaming regarding watermarking and status

## What changes were proposed in this pull request?

- Extended the Window operation section with code snippet and explanation of watermarking
- Extended the Output Mode section with a table showing the compatibility between query type and output mode
- Rewrote the Monitoring section with updated jsons generated by StreamingQuery.progress/status
- Updated API changes in the StreamingQueryListener example

TODO
- [x] Figure showing the watermarking

## How was this patch tested?

N/A

## Screenshots
### Section: Windowed Aggregation with Event Time

<img width="927" alt="screen shot 2016-12-15 at 3 33 10 pm" src="https://cloud.githubusercontent.com/assets/663212/21246197/0e02cb1a-c2dc-11e6-8816-0cd28d8201d7.png">

![image](https://cloud.githubusercontent.com/assets/663212/21246241/45b0f87a-c2dc-11e6-9c29-d0a89e07bf8d.png)

<img width="929" alt="screen shot 2016-12-15 at 3 33 46 pm" src="https://cloud.githubusercontent.com/assets/663212/21246202/1652cefa-c2dc-11e6-8c64-3c05977fb3fc.png">

----------------------------
### Section: Output Modes
![image](https://cloud.githubusercontent.com/assets/663212/21246276/8ee44948-c2dc-11e6-9fa2-30502fcf9a55.png)

----------------------------
### Section: Monitoring
![image](https://cloud.githubusercontent.com/assets/663212/21246535/3c5baeb2-c2de-11e6-88cd-ca71db7c5cf9.png)
![image](https://cloud.githubusercontent.com/assets/663212/21246574/789492c2-c2de-11e6-8471-7bef884e1837.png)

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16294 from tdas/SPARK-18669.
parent 6a475ae4
No related branches found
No related tags found
No related merge requests found
docs/img/structured-streaming-watermark.png

246 KiB

No preview for this file type
...@@ -10,7 +10,7 @@ title: Structured Streaming Programming Guide ...@@ -10,7 +10,7 @@ title: Structured Streaming Programming Guide
# Overview # Overview
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data.The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java or Python to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.* Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data.The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java or Python to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.*
**Spark 2.0 is the ALPHA RELEASE of Structured Streaming** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count. **Structured Streaming is still ALPHA in Spark 2.1** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count.
# Quick Example # Quick Example
Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in
...@@ -400,7 +400,14 @@ see how this model handles event-time based processing and late arriving data. ...@@ -400,7 +400,14 @@ see how this model handles event-time based processing and late arriving data.
## Handling Event-time and Late Data ## Handling Event-time and Late Data
Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model -- each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the even-time column -- each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier. Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model -- each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the even-time column -- each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier.
Furthermore, this model naturally handles data that has arrived later than expected based on its event-time. Since Spark is updating the Result Table, it has full control over updating/cleaning up the aggregates when there is late data. While not yet implemented in Spark 2.0, event-time watermarking will be used to manage this data. These are explained later in more details in the [Window Operations](#window-operations-on-event-time) section. Furthermore, this model naturally handles data that has arrived later than
expected based on its event-time. Since Spark is updating the Result Table,
it has full control over updating old aggregates when there is late data,
as well as cleaning up old aggregates to limit the size of intermediate
state data. Since Spark 2.1, we have support for watermarking which
allows the user to specify the threshold of late data, and allows the engine
to accordingly clean up old state. These are explained later in more
details in the [Window Operations](#window-operations-on-event-time) section.
## Fault Tolerance Semantics ## Fault Tolerance Semantics
Delivering end-to-end exactly-once semantics was one of key goals behind the design of Structured Streaming. To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) Delivering end-to-end exactly-once semantics was one of key goals behind the design of Structured Streaming. To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers)
...@@ -671,12 +678,123 @@ windowedCounts = words.groupBy( ...@@ -671,12 +678,123 @@ windowedCounts = words.groupBy(
</div> </div>
### Handling Late Data and Watermarking
Now consider what happens if one of the events arrives late to the application. Now consider what happens if one of the events arrives late to the application.
For example, a word that was generated at 12:04 but it was received at 12:11. For example, say, a word generated at 12:04 (i.e. event time) could be received received by
Since this windowing is based on the time in the data, the time 12:04 should be considered for windowing. This occurs naturally in our window-based grouping – the late data is automatically placed in the proper windows and the correct aggregates are updated as illustrated below. the application at 12:11. The application should use the time 12:04 instead of 12:11
to update the older counts for the window `12:00 - 12:10`. This occurs
naturally in our window-based grouping – Structured Streaming can maintain the intermediate state
for partial aggregates for a long period of time such that late data can update aggregates of
old windows correctly, as illustrated below.
![Handling Late Data](img/structured-streaming-late-data.png) ![Handling Late Data](img/structured-streaming-late-data.png)
However, to run this query for days, its necessary for the system to bound the amount of
intermediate in-memory state it accumulates. This means the system needs to know when an old
aggregate can be dropped from the in-memory state because the application is not going to receive
late data for that aggregate any more. To enable this, in Spark 2.1, we have introduced
**watermarking**, which let's the engine automatically track the current event time in the data and
and attempt to clean up old state accordingly. You can define the watermark of a query by
specifying the event time column and the threshold on how late the data is expected be in terms of
event time. For a specific window starting at time `T`, the engine will maintain state and allow late
data to be update the state until `(max event time seen by the engine - late threshold > T)`.
In other words, late data within the threshold will be aggregated,
but data later than the threshold will be dropped. Let's understand this with an example. We can
easily define watermarking on the previous example using `withWatermark()` as shown below.
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
words.col("word"))
.count();
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word)
.count()
{% endhighlight %}
</div>
</div>
In this example, we are defining the watermark of the query on the value of the column "timestamp",
and also defining "10 minutes" as the threshold of how late is the data allowed to be. If this query
is run in Append output mode (discussed later in [Output Modes](#output-modes) section),
the engine will track the current event time from the column "timestamp" and wait for additional
"10 minutes" in event time before finalizing the windowed counts and adding them to the Result Table.
Here is an illustration.
![Watermarking in Append Mode](img/structured-streaming-watermark.png)
As shown in the illustration, the maximum event time tracked by the engine is the
*blue dashed line*, and the watermark set as `(max event time - '10 mins')`
at the beginning of every trigger is the red line For example, when the engine observes the data
`(12:14, dog)`, it sets the watermark for the next trigger as `12:04`.
For the window `12:00 - 12:10`, the partial counts are maintained as internal state while the system
is waiting for late data. After the system finds data (i.e. `(12:21, owl)`) such that the
watermark exceeds 12:10, the partial count is finalized and appended to the table. This count will
not change any further as all "too-late" data older than 12:10 will be ignored.
Note that in Append output mode, the system has to wait for "late threshold" time
before it can output the aggregation of a window. This may not be ideal if data can be very late,
(say 1 day) and you like to have partial counts without waiting for a day. In future, we will add
Update output mode which would allows every update to aggregates to be written to sink every trigger.
**Conditions for watermarking to clean aggregation state**
It is important to note that the following conditions must be satisfied for the watermarking to
clean the state in aggregation queries *(as of Spark 2.1, subject to change in the future)*.
- **Output mode must be Append.** Complete mode requires all aggregate data to be preserved, and hence
cannot use watermarking to drop intermediate state. See the [Output Modes](#output-modes) section
for detailed explanation of the semantics of each output mode.
- The aggregation must have either the event-time column, or a `window` on the event-time column.
- `withWatermark` must be called on the
same column as the timestamp column used in the aggregate. For example,
`df.withWatermark("time", "1 min").groupBy("time2").count()` is invalid
in Append output mode, as watermark is defined on a different column
as the aggregation column.
- `withWatermark` must be called before the aggregation for the watermark details to be used.
For example, `df.groupBy("time").count().withWatermark("time", "1 min")` is invalid in Append
output mode.
### Join Operations ### Join Operations
Streaming DataFrames can be joined with static DataFrames to create new streaming DataFrames. Here are a few examples. Streaming DataFrames can be joined with static DataFrames to create new streaming DataFrames. Here are a few examples.
...@@ -763,16 +881,78 @@ returned through `Dataset.writeStream()`. You will have to specify one or more o ...@@ -763,16 +881,78 @@ returned through `Dataset.writeStream()`. You will have to specify one or more o
- *Checkpoint location:* For some output sinks where the end-to-end fault-tolerance can be guaranteed, specify the location where the system will write all the checkpoint information. This should be a directory in an HDFS-compatible fault-tolerant file system. The semantics of checkpointing is discussed in more detail in the next section. - *Checkpoint location:* For some output sinks where the end-to-end fault-tolerance can be guaranteed, specify the location where the system will write all the checkpoint information. This should be a directory in an HDFS-compatible fault-tolerant file system. The semantics of checkpointing is discussed in more detail in the next section.
#### Output Modes #### Output Modes
There are two types of output mode currently implemented. There are a few types of output modes.
- **Append mode (default)** - This is the default mode, where only the
new rows added to the Result Table since the last trigger will be
outputted to the sink. This is supported for only those queries where
rows added to the Result Table is never going to change. Hence, this mode
guarantees that each row will be output only once (assuming
fault-tolerant sink). For example, queries with only `select`,
`where`, `map`, `flatMap`, `filter`, `join`, etc. will support Append mode.
- **Append mode (default)** - This is the default mode, where only the new rows added to the result table since the last trigger will be outputted to the sink. This is only applicable to queries that *do not have any aggregations* (e.g. queries with only `select`, `where`, `map`, `flatMap`, `filter`, `join`, etc.). - **Complete mode** - The whole Result Table will be outputted to the sink after every trigger.
This is supported for aggregation queries.
- **Complete mode** - The whole result table will be outputted to the sink.This is only applicable to queries that *have aggregations*. - **Update mode** - (*not available in Spark 2.1*) Only the rows in the Result Table that were
updated since the last trigger will be outputted to the sink.
More information to be added in future releases.
Different types of streaming queries support different output modes.
Here is the compatibility matrix.
<table class="table">
<tr>
<th>Query Type</th>
<th></th>
<th>Supported Output Modes</th>
<th>Notes</th>
</tr>
<tr>
<td colspan="2" valign="middle"><br/>Queries without aggregation</td>
<td>Append</td>
<td>
Complete mode note supported as it is infeasible to keep all data in the Result Table.
</td>
</tr>
<tr>
<td rowspan="2">Queries with aggregation</td>
<td>Aggregation on event-time with watermark</td>
<td>Append, Complete</td>
<td>
Append mode uses watermark to drop old aggregation state. But the output of a
windowed aggregation is delayed the late threshold specified in `withWatermark()` as by
the modes semantics, rows can be added to the Result Table only once after they are
finalized (i.e. after watermark is crossed). See
<a href="#handling-late-data">Late Data</a> section for more details.
<br/><br/>
Complete mode does drop not old aggregation state since by definition this mode
preserves all data in the Result Table.
</td>
</tr>
<tr>
<td>Other aggregations</td>
<td>Complete</td>
<td>
Append mode is not supported as aggregates can update thus violating the semantics of
this mode.
<br/><br/>
Complete mode does drop not old aggregation state since by definition this mode
preserves all data in the Result Table.
</td>
</tr>
<tr>
<td></td>
<td></td>
<td></td>
<td></td>
</tr>
</table>
#### Output Sinks #### Output Sinks
There are a few types of built-in output sinks. There are a few types of built-in output sinks.
- **File sink** - Stores the output to a directory. As of Spark 2.0, this only supports Parquet file format, and Append output mode. - **File sink** - Stores the output to a directory.
- **Foreach sink** - Runs arbitrary computation on the records in the output. See later in the section for more details. - **Foreach sink** - Runs arbitrary computation on the records in the output. See later in the section for more details.
...@@ -791,7 +971,7 @@ Here is a table of all the sinks, and the corresponding settings. ...@@ -791,7 +971,7 @@ Here is a table of all the sinks, and the corresponding settings.
<th>Notes</th> <th>Notes</th>
</tr> </tr>
<tr> <tr>
<td><b>File Sink</b><br/>(only parquet in Spark 2.0)</td> <td><b>File Sink</b></td>
<td>Append</td> <td>Append</td>
<td><pre>writeStream<br/> .format("parquet")<br/> .start()</pre></td> <td><pre>writeStream<br/> .format("parquet")<br/> .start()</pre></td>
<td>Yes</td> <td>Yes</td>
...@@ -817,7 +997,14 @@ Here is a table of all the sinks, and the corresponding settings. ...@@ -817,7 +997,14 @@ Here is a table of all the sinks, and the corresponding settings.
<td><pre>writeStream<br/> .format("memory")<br/> .queryName("table")<br/> .start()</pre></td> <td><pre>writeStream<br/> .format("memory")<br/> .queryName("table")<br/> .start()</pre></td>
<td>No</td> <td>No</td>
<td>Saves the output data as a table, for interactive querying. Table name is the query name.</td> <td>Saves the output data as a table, for interactive querying. Table name is the query name.</td>
</tr> </tr>
<tr>
<td></td>
<td></td>
<td></td>
<td></td>
<td></td>
</tr>
</table> </table>
Finally, you have to call `start()` to actually start the execution of the query. This returns a StreamingQuery object which is a handle to the continuously running execution. You can use this object to manage the query, which we will discuss in the next subsection. For now, let’s understand all this with a few examples. Finally, you have to call `start()` to actually start the execution of the query. This returns a StreamingQuery object which is a handle to the continuously running execution. You can use this object to manage the query, which we will discuss in the next subsection. For now, let’s understand all this with a few examples.
...@@ -947,7 +1134,7 @@ spark.sql("select * from aggregates").show() # interactively query in-memory t ...@@ -947,7 +1134,7 @@ spark.sql("select * from aggregates").show() # interactively query in-memory t
</div> </div>
#### Using Foreach #### Using Foreach
The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.0, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), ([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
...@@ -1089,11 +1276,28 @@ spark.streams().awaitAnyTermination() # block until any one of them terminates ...@@ -1089,11 +1276,28 @@ spark.streams().awaitAnyTermination() # block until any one of them terminates
## Monitoring Streaming Queries ## Monitoring Streaming Queries
There are two ways you can monitor queries. You can directly get the current status There are two APIs for monitoring and debugging active queries -
of an active query using `streamingQuery.status`, which will return a `StreamingQueryStatus` object interactively and asynchronously.
([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryStatus)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryStatus.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.StreamingQueryStatus) docs)
that has all the details like current ingestion rates, processing rates, average latency, ### Interactive APIs
details of the currently active trigger, etc.
You can directly get the current status and metrics of an active query using
`streamingQuery.lastProgress()` and `streamingQuery.status()`.
`lastProgress()` returns a `StreamingQueryProgress` object
in [Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryProgress)
and [Java](api/java/org/apache/spark/sql/streaming/StreamingQueryProgress.html)
and an dictionary with the same fields in Python. It has all the information about
the progress made in the last trigger of the stream - what data was processed,
what were the processing rates, latencies, etc. There is also
`streamingQuery.recentProgress` which returns an array of last few progresses.
In addition, `streamingQuery.status()` returns `StreamingQueryStatus` object
in [Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryStatus)
and [Java](api/java/org/apache/spark/sql/streaming/StreamingQueryStatus.html)
and an dictionary with the same fields in Python. It gives information about
what the query is immediately doing - is a trigger active, is data being processed, etc.
Here are a few examples.
<div class="codetabs"> <div class="codetabs">
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
...@@ -1101,34 +1305,65 @@ details of the currently active trigger, etc. ...@@ -1101,34 +1305,65 @@ details of the currently active trigger, etc.
{% highlight scala %} {% highlight scala %}
val query: StreamingQuery = ... val query: StreamingQuery = ...
println(query.lastProgress)
/* Will print something like the following.
{
"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
"runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
"name" : "MyQuery",
"timestamp" : "2016-12-14T18:45:24.873Z",
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0,
"durationMs" : {
"triggerExecution" : 3,
"getOffset" : 2
},
"eventTime" : {
"watermark" : "2016-12-14T18:45:24.873Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-0]]",
"startOffset" : {
"topic-0" : {
"2" : 0,
"4" : 1,
"1" : 1,
"3" : 1,
"0" : 1
}
},
"endOffset" : {
"topic-0" : {
"2" : 0,
"4" : 115,
"1" : 134,
"3" : 21,
"0" : 534
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0
} ],
"sink" : {
"description" : "MemorySink"
}
}
*/
println(query.status) println(query.status)
/* Will print the current status of the query /* Will print something like the following.
{
Status of query 'queryName' "message" : "Waiting for data to arrive",
Query id: 1 "isDataAvailable" : false,
Status timestamp: 123 "isTriggerActive" : false
Input rate: 15.5 rows/sec }
Processing rate 23.5 rows/sec
Latency: 345.0 ms
Trigger details:
batchId: 5
isDataPresentInTrigger: true
isTriggerActive: true
latency.getBatch.total: 20
latency.getOffset.total: 10
numRows.input.total: 100
Source statuses [1 source]:
Source 1 - MySource1
Available offset: 0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
numRows.input.source: 100
latency.getOffset.source: 10
latency.getBatch.source: 20
Sink status - MySink
Committed offsets: [1, -]
*/ */
{% endhighlight %} {% endhighlight %}
...@@ -1138,34 +1373,63 @@ Status of query 'queryName' ...@@ -1138,34 +1373,63 @@ Status of query 'queryName'
{% highlight java %} {% highlight java %}
StreamingQuery query = ... StreamingQuery query = ...
System.out.println(query.status); System.out.println(query.lastProgress());
/* Will print something like the following.
/* Will print the current status of the query
{
Status of query 'queryName' "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
Query id: 1 "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
Status timestamp: 123 "name" : "MyQuery",
Input rate: 15.5 rows/sec "timestamp" : "2016-12-14T18:45:24.873Z",
Processing rate 23.5 rows/sec "numInputRows" : 10,
Latency: 345.0 ms "inputRowsPerSecond" : 120.0,
Trigger details: "processedRowsPerSecond" : 200.0,
batchId: 5 "durationMs" : {
isDataPresentInTrigger: true "triggerExecution" : 3,
isTriggerActive: true "getOffset" : 2
latency.getBatch.total: 20 },
latency.getOffset.total: 10 "eventTime" : {
numRows.input.total: 100 "watermark" : "2016-12-14T18:45:24.873Z"
Source statuses [1 source]: },
Source 1 - MySource1 "stateOperators" : [ ],
Available offset: 0 "sources" : [ {
Input rate: 15.5 rows/sec "description" : "KafkaSource[Subscribe[topic-0]]",
Processing rate: 23.5 rows/sec "startOffset" : {
Trigger details: "topic-0" : {
numRows.input.source: 100 "2" : 0,
latency.getOffset.source: 10 "4" : 1,
latency.getBatch.source: 20 "1" : 1,
Sink status - MySink "3" : 1,
Committed offsets: [1, -] "0" : 1
}
},
"endOffset" : {
"topic-0" : {
"2" : 0,
"4" : 115,
"1" : 134,
"3" : 21,
"0" : 534
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0
} ],
"sink" : {
"description" : "MemorySink"
}
}
*/
System.out.println(query.status());
/* Will print something like the following.
{
"message" : "Waiting for data to arrive",
"isDataAvailable" : false,
"isTriggerActive" : false
}
*/ */
{% endhighlight %} {% endhighlight %}
...@@ -1173,43 +1437,27 @@ Status of query 'queryName' ...@@ -1173,43 +1437,27 @@ Status of query 'queryName'
<div data-lang="python" markdown="1"> <div data-lang="python" markdown="1">
{% highlight python %} {% highlight python %}
query = ... // a StreamingQuery query = ... # a StreamingQuery
print(query.lastProgress)
print(query.status) '''
Will print something like the following.
{u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
''' '''
Will print the current status of the query
print(query.status)
Status of query 'queryName' '''
Query id: 1 Will print something like the following.
Status timestamp: 123
Input rate: 15.5 rows/sec {u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False}
Processing rate 23.5 rows/sec
Latency: 345.0 ms
Trigger details:
batchId: 5
isDataPresentInTrigger: true
isTriggerActive: true
latency.getBatch.total: 20
latency.getOffset.total: 10
numRows.input.total: 100
Source statuses [1 source]:
Source 1 - MySource1
Available offset: 0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
numRows.input.source: 100
latency.getOffset.source: 10
latency.getBatch.source: 20
Sink status - MySink
Committed offsets: [1, -]
''' '''
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
### Asynchronous API
You can also asynchronously monitor all queries associated with a You can also asynchronously monitor all queries associated with a
`SparkSession` by attaching a `StreamingQueryListener` `SparkSession` by attaching a `StreamingQueryListener`
...@@ -1225,15 +1473,14 @@ stopped and when there is progress made in an active query. Here is an example, ...@@ -1225,15 +1473,14 @@ stopped and when there is progress made in an active query. Here is an example,
val spark: SparkSession = ... val spark: SparkSession = ...
spark.streams.addListener(new StreamingQueryListener() { spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryTerminated.queryStatus.name) println("Query started: " + queryStarted.id)
} }
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.queryStatus.name) println("Query terminated: " + queryTerminated.id)
} }
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.queryStatus) println("Query made progress: " + queryProgress.progress)
} }
}) })
{% endhighlight %} {% endhighlight %}
...@@ -1245,15 +1492,14 @@ spark.streams.addListener(new StreamingQueryListener() { ...@@ -1245,15 +1492,14 @@ spark.streams.addListener(new StreamingQueryListener() {
SparkSession spark = ... SparkSession spark = ...
spark.streams.addListener(new StreamingQueryListener() { spark.streams.addListener(new StreamingQueryListener() {
@Overrides void onQueryStarted(QueryStartedEvent queryStarted) { @Overrides void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("Query started: " + queryTerminated.queryStatus.name); System.out.println("Query started: " + queryStarted.id());
} }
@Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) { @Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("Query terminated: " + queryTerminated.queryStatus.name); System.out.println("Query terminated: " + queryTerminated.id());
} }
@Overrides void onQueryProgress(QueryProgressEvent queryProgress) { @Overrides void onQueryProgress(QueryProgressEvent queryProgress) {
System.out.println("Query made progress: " + queryProgress.queryStatus); System.out.println("Query made progress: " + queryProgress.progress());
} }
}); });
{% endhighlight %} {% endhighlight %}
...@@ -1268,7 +1514,7 @@ Not available in Python. ...@@ -1268,7 +1514,7 @@ Not available in Python.
</div> </div>
## Recovering from Failures with Checkpointing ## Recovering from Failures with Checkpointing
In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. As of Spark 2.0, this checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries). In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries).
<div class="codetabs"> <div class="codetabs">
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment