Skip to content
Snippets Groups Projects
Commit b8302ccd authored by Felix Cheung's avatar Felix Cheung Committed by Felix Cheung
Browse files

[SPARK-20015][SPARKR][SS][DOC][EXAMPLE] Document R Structured Streaming...

[SPARK-20015][SPARKR][SS][DOC][EXAMPLE] Document R Structured Streaming (experimental) in R vignettes and R & SS programming guide, R example

## What changes were proposed in this pull request?

Add
- R vignettes
- R programming guide
- SS programming guide
- R example

Also disable spark.als in vignettes for now since it's failing (SPARK-20402)

## How was this patch tested?

manually

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #17814 from felixcheung/rdocss.
parent fc472bdd
No related branches found
No related tags found
No related merge requests found
......@@ -182,7 +182,7 @@ head(df)
```
### Data Sources
SparkR supports operating on a variety of data sources through the `SparkDataFrame` interface. You can check the Spark SQL programming guide for more [specific options](https://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources.
SparkR supports operating on a variety of data sources through the `SparkDataFrame` interface. You can check the Spark SQL Programming Guide for more [specific options](https://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources.
The general method for creating `SparkDataFrame` from data sources is `read.df`. This method takes in the path for the file to load and the type of data source, and the currently active Spark Session will be used automatically. SparkR supports reading CSV, JSON and Parquet files natively and through Spark Packages you can find data source connectors for popular file formats like Avro. These packages can be added with `sparkPackages` parameter when initializing SparkSession using `sparkR.session`.
......@@ -232,7 +232,7 @@ write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite"
```
### Hive Tables
You can also create SparkDataFrames from Hive tables. To do this we will need to create a SparkSession with Hive support which can access tables in the Hive MetaStore. Note that Spark should have been built with Hive support and more details can be found in the [SQL programming guide](https://spark.apache.org/docs/latest/sql-programming-guide.html). In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (`enableHiveSupport = TRUE`).
You can also create SparkDataFrames from Hive tables. To do this we will need to create a SparkSession with Hive support which can access tables in the Hive MetaStore. Note that Spark should have been built with Hive support and more details can be found in the [SQL Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html). In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (`enableHiveSupport = TRUE`).
```{r, eval=FALSE}
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
......@@ -314,7 +314,7 @@ Use `cube` or `rollup` to compute subtotals across multiple dimensions.
mean(cube(carsDF, "cyl", "gear", "am"), "mpg")
```
generates groupings for {(`cyl`, `gear`, `am`), (`cyl`, `gear`), (`cyl`), ()}, while
generates groupings for {(`cyl`, `gear`, `am`), (`cyl`, `gear`), (`cyl`), ()}, while
```{r}
mean(rollup(carsDF, "cyl", "gear", "am"), "mpg")
......@@ -672,6 +672,7 @@ head(select(naiveBayesPrediction, "Class", "Sex", "Age", "Survived", "prediction
Survival analysis studies the expected duration of time until an event happens, and often the relationship with risk factors or treatment taken on the subject. In contrast to standard regression analysis, survival modeling has to deal with special characteristics in the data including non-negative survival time and censoring.
Accelerated Failure Time (AFT) model is a parametric survival model for censored data that assumes the effect of a covariate is to accelerate or decelerate the life course of an event by some constant. For more information, refer to the Wikipedia page [AFT Model](https://en.wikipedia.org/wiki/Accelerated_failure_time_model) and the references there. Different from a [Proportional Hazards Model](https://en.wikipedia.org/wiki/Proportional_hazards_model) designed for the same purpose, the AFT model is easier to parallelize because each instance contributes to the objective function independently.
```{r, warning=FALSE}
library(survival)
ovarianDF <- createDataFrame(ovarian)
......@@ -902,7 +903,7 @@ perplexity
There are multiple options that can be configured in `spark.als`, including `rank`, `reg`, `nonnegative`. For a complete list, refer to the help file.
```{r}
```{r, eval=FALSE}
ratings <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(ratings, c("user", "item", "rating"))
......@@ -910,7 +911,7 @@ model <- spark.als(df, "rating", "user", "item", rank = 10, reg = 0.1, nonnegati
```
Extract latent factors.
```{r}
```{r, eval=FALSE}
stats <- summary(model)
userFactors <- stats$userFactors
itemFactors <- stats$itemFactors
......@@ -920,7 +921,7 @@ head(itemFactors)
Make predictions.
```{r}
```{r, eval=FALSE}
predicted <- predict(model, df)
head(predicted)
```
......@@ -1002,6 +1003,72 @@ unlink(modelPath)
```
## Structured Streaming
SparkR supports the Structured Streaming API (experimental).
You can check the Structured Streaming Programming Guide for [an introduction](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model) to its programming model and basic concepts.
### Simple Source and Sink
Spark has a few built-in input sources. As an example, to test with a socket source reading text into words and displaying the computed word counts:
```{r, eval=FALSE}
# Create DataFrame representing the stream of input lines from connection
lines <- read.stream("socket", host = hostname, port = port)
# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")
# Generate running word count
wordCounts <- count(groupBy(words, "word"))
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")
```
### Kafka Source
It is simple to read data from Kafka. For more information, see [Input Sources](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources) supported by Structured Streaming.
```{r, eval=FALSE}
topic <- read.stream("kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
subscribe = "topic1")
keyvalue <- selectExpr(topic, "CAST(key AS STRING)", "CAST(value AS STRING)")
```
### Operations and Sinks
Most of the common operations on `SparkDataFrame` are supported for streaming, including selection, projection, and aggregation. Once you have defined the final result, to start the streaming computation, you will call the `write.stream` method setting a sink and `outputMode`.
A streaming `SparkDataFrame` can be written for debugging to the console, to a temporary in-memory table, or for further processing in a fault-tolerant manner to a File Sink in different formats.
```{r, eval=FALSE}
noAggDF <- select(where(deviceDataStreamingDf, "signal > 10"), "device")
# Print new data to console
write.stream(noAggDF, "console")
# Write new data to Parquet files
write.stream(noAggDF,
"parquet",
path = "path/to/destination/dir",
checkpointLocation = "path/to/checkpoint/dir")
# Aggregate
aggDF <- count(groupBy(noAggDF, "device"))
# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")
# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")
head(sql("select * from aggregates"))
```
## Advanced Topics
### SparkR Object Classes
......
......@@ -593,6 +593,10 @@ The following example shows how to save/load a MLlib model by SparkR.
</tr>
</table>
# Structured Streaming
SparkR supports the Structured Streaming API (experimental). Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. For more information see the R API on the [Structured Streaming Programming Guide](structured-streaming-programming-guide.html)
# R Function Name Conflicts
When loading and attaching a new package in R, it is possible to have a name [conflict](https://stat.ethz.ch/R-manual/R-devel/library/base/html/library.html), where a
......
......@@ -8,13 +8,13 @@ title: Structured Streaming Programming Guide
{:toc}
# Overview
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java or Python to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.*
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.*
**Structured Streaming is still ALPHA in Spark 2.1** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count.
**Structured Streaming is still ALPHA in Spark 2.1** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count.
# Quick Example
Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in
[Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java)/[Python]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/sql/streaming/structured_network_wordcount.py).
Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in
[Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java)/[Python]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/sql/streaming/structured_network_wordcount.py)/[R]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/r/streaming/structured_network_wordcount.R).
And if you [download Spark](http://spark.apache.org/downloads.html), you can directly run the example. In any case, let’s walk through the example step-by-step and understand how it works. First, we have to import the necessary classes and create a local SparkSession, the starting point of all functionalities related to Spark.
<div class="codetabs">
......@@ -63,6 +63,13 @@ spark = SparkSession \
.getOrCreate()
{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight r %}
sparkR.session(appName = "StructuredNetworkWordCount")
{% endhighlight %}
</div>
</div>
......@@ -136,6 +143,22 @@ wordCounts = words.groupBy("word").count()
This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function `alias` to name the new column as "word". Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
</div>
<div data-lang="r" markdown="1">
{% highlight r %}
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines <- read.stream("socket", host = "localhost", port = 9999)
# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")
# Generate running word count
wordCounts <- count(group_by(words, "word"))
{% endhighlight %}
This `lines` SparkDataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have a SQL expression with two SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we name the new column as "word". Finally, we have defined the `wordCounts` SparkDataFrame by grouping by the unique values in the SparkDataFrame and counting them. Note that this is a streaming SparkDataFrame which represents the running word counts of the stream.
</div>
</div>
......@@ -181,10 +204,20 @@ query = wordCounts \
query.awaitTermination()
{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight r %}
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")
awaitTermination(query)
{% endhighlight %}
</div>
</div>
After this code is executed, the streaming computation will have started in the background. The `query` object is a handle to that active streaming query, and we have decided to wait for the termination of the query using `query.awaitTermination()` to prevent the process from exiting while the query is active.
After this code is executed, the streaming computation will have started in the background. The `query` object is a handle to that active streaming query, and we have decided to wait for the termination of the query using `awaitTermination()` to prevent the process from exiting while the query is active.
To actually execute this example code, you can either compile the code in your own
[Spark application](quick-start.html#self-contained-applications), or simply
......@@ -211,6 +244,11 @@ $ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetwor
$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight bash %}
$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
{% endhighlight %}
</div>
</div>
Then, any lines typed in the terminal running the netcat server will be counted and printed on screen every second. It will look something like the following.
......@@ -325,6 +363,35 @@ Batch: 0
| spark| 1|
+------+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 2|
| spark| 1|
|hadoop| 1|
+------+-----+
...
{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight bash %}
# TERMINAL 2: RUNNING structured_network_wordcount.R
$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 1|
| spark| 1|
+------+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
......@@ -409,14 +476,14 @@ to track the read position in the stream. The engine uses checkpointing and writ
# API using Datasets and DataFrames
Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as well as streaming, unbounded data. Similar to static Datasets/DataFrames, you can use the common entry point `SparkSession`
([Scala](api/scala/index.html#org.apache.spark.sql.SparkSession)/[Java](api/java/org/apache/spark/sql/SparkSession.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.SparkSession) docs)
([Scala](api/scala/index.html#org.apache.spark.sql.SparkSession)/[Java](api/java/org/apache/spark/sql/SparkSession.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.SparkSession)/[R](api/R/sparkR.session.html) docs)
to create streaming DataFrames/Datasets from streaming sources, and apply the same operations on them as static DataFrames/Datasets. If you are not familiar with Datasets/DataFrames, you are strongly advised to familiarize yourself with them using the
[DataFrame/Dataset Programming Guide](sql-programming-guide.html).
## Creating streaming DataFrames and streaming Datasets
Streaming DataFrames can be created through the `DataStreamReader` interface
Streaming DataFrames can be created through the `DataStreamReader` interface
([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamReader.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader) docs)
returned by `SparkSession.readStream()`. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.
returned by `SparkSession.readStream()`. In [R](api/R/read.stream.html), with the `read.stream()` method. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.
#### Input Sources
In Spark 2.0, there are a few built-in sources.
......@@ -445,7 +512,8 @@ Here are the details of all the sources in Spark.
<code>path</code>: path to the input directory, and common to all file formats.
<br/><br/>
For file-format-specific options, see the related methods in <code>DataStreamReader</code>
(<a href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>).
(<a href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>/<a
href="api/R/read.stream.html">R</a>).
E.g. for "parquet" format options see <code>DataStreamReader.parquet()</code></td>
<td>Yes</td>
<td>Supports glob paths, but does not support multiple comma-separated paths/globs.</td>
......@@ -483,7 +551,7 @@ Here are some examples.
{% highlight scala %}
val spark: SparkSession = ...
// Read text from socket
// Read text from socket
val socketDF = spark
.readStream
.format("socket")
......@@ -493,7 +561,7 @@ val socketDF = spark
socketDF.isStreaming // Returns True for DataFrames that have streaming sources
socketDF.printSchema
socketDF.printSchema
// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
......@@ -510,7 +578,7 @@ val csvDF = spark
{% highlight java %}
SparkSession spark = ...
// Read text from socket
// Read text from socket
Dataset<Row> socketDF = spark
.readStream()
.format("socket")
......@@ -537,7 +605,7 @@ Dataset<Row> csvDF = spark
{% highlight python %}
spark = SparkSession. ...
# Read text from socket
# Read text from socket
socketDF = spark \
.readStream \
.format("socket") \
......@@ -547,7 +615,7 @@ socketDF = spark \
socketDF.isStreaming() # Returns True for DataFrames that have streaming sources
socketDF.printSchema()
socketDF.printSchema()
# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
......@@ -558,6 +626,25 @@ csvDF = spark \
.csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight r %}
sparkR.session(...)
# Read text from socket
socketDF <- read.stream("socket", host = hostname, port = port)
isStreaming(socketDF) # Returns TRUE for SparkDataFrames that have streaming sources
printSchema(socketDF)
# Read all the csv files written atomically in a directory
schema <- structType(structField("name", "string"),
structField("age", "integer"))
csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep = ";")
{% endhighlight %}
</div>
</div>
......@@ -638,12 +725,24 @@ ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(),
df = ... # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }
# Select the devices which have signal more than 10
df.select("device").where("signal > 10")
df.select("device").where("signal > 10")
# Running count of the number of updates for each device type
df.groupBy("deviceType").count()
{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight r %}
df <- ... # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }
# Select the devices which have signal more than 10
select(where(df, "signal > 10"), "device")
# Running count of the number of updates for each device type
count(groupBy(df, "deviceType"))
{% endhighlight %}
</div>
</div>
### Window Operations on Event Time
......@@ -840,7 +939,7 @@ Streaming DataFrames can be joined with static DataFrames to create new streamin
{% highlight scala %}
val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...
val streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join") // right outer join with a static DF
......@@ -972,7 +1071,7 @@ Once you have defined the final result DataFrame/Dataset, all that is left is fo
([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamWriter.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamWriter) docs)
returned through `Dataset.writeStream()`. You will have to specify one or more of the following in this interface.
- *Details of the output sink:* Data format, location, etc.
- *Details of the output sink:* Data format, location, etc.
- *Output mode:* Specify what gets written to the output sink.
......@@ -1077,7 +1176,7 @@ Here is the compatibility matrix.
#### Output Sinks
There are a few types of built-in output sinks.
- **File sink** - Stores the output to a directory.
- **File sink** - Stores the output to a directory.
{% highlight scala %}
writeStream
......@@ -1145,7 +1244,8 @@ Here are the details of all the sinks in Spark.
· "s3a://a/b/c/dataset.txt"<br/>
<br/>
For file-format-specific options, see the related methods in DataFrameWriter
(<a href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>).
(<a href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>/<a
href="api/R/write.stream.html">R</a>).
E.g. for "parquet" format options see <code>DataFrameWriter.parquet()</code>
</td>
<td>Yes</td>
......@@ -1208,7 +1308,7 @@ noAggDF
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start()
// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()
......@@ -1219,7 +1319,7 @@ aggDF
.format("console")
.start()
// Have all the aggregates in an in-memory table
// Have all the aggregates in an in-memory table
aggDF
.writeStream
.queryName("aggregates") // this query name will be the table name
......@@ -1250,7 +1350,7 @@ noAggDF
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start();
// ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy("device").count();
......@@ -1261,7 +1361,7 @@ aggDF
.format("console")
.start();
// Have all the aggregates in an in-memory table
// Have all the aggregates in an in-memory table
aggDF
.writeStream()
.queryName("aggregates") // this query name will be the table name
......@@ -1292,7 +1392,7 @@ noAggDF \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.option("path", "path/to/destination/dir") \
.start()
# ========== DF with aggregation ==========
aggDF = df.groupBy("device").count()
......@@ -1314,6 +1414,35 @@ aggDF \
spark.sql("select * from aggregates").show() # interactively query in-memory table
{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight r %}
# ========== DF with no aggregations ==========
noAggDF <- select(where(deviceDataDf, "signal > 10"), "device")
# Print new data to console
write.stream(noAggDF, "console")
# Write new data to Parquet files
write.stream(noAggDF,
"parquet",
path = "path/to/destination/dir",
checkpointLocation = "path/to/checkpoint/dir")
# ========== DF with aggregation ==========
aggDF <- count(groupBy(df, "device"))
# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")
# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")
# Interactively query in-memory table
head(sql("select * from aggregates"))
{% endhighlight %}
</div>
</div>
......@@ -1351,7 +1480,7 @@ query.name // get the name of the auto-generated or user-specified name
query.explain() // print detailed explanations of the query
query.stop() // stop the query
query.stop() // stop the query
query.awaitTermination() // block until query is terminated, with stop() or with error
......@@ -1403,7 +1532,7 @@ query.name() # get the name of the auto-generated or user-specified name
query.explain() # print detailed explanations of the query
query.stop() # stop the query
query.stop() # stop the query
query.awaitTermination() # block until query is terminated, with stop() or with error
......@@ -1415,6 +1544,24 @@ query.lastProgress() # the most recent progress update of this streaming quer
{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight r %}
query <- write.stream(df, "console") # get the query object
queryName(query) # get the name of the auto-generated or user-specified name
explain(query) # print detailed explanations of the query
stopQuery(query) # stop the query
awaitTermination(query) # block until query is terminated, with stop() or with error
lastProgress(query) # the most recent progress update of this streaming query
{% endhighlight %}
</div>
</div>
......@@ -1461,6 +1608,12 @@ spark.streams().get(id) # get a query object by its unique id
spark.streams().awaitAnyTermination() # block until any one of them terminates
{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight bash %}
Not available in R.
{% endhighlight %}
</div>
</div>
......@@ -1644,6 +1797,58 @@ Will print something like the following.
'''
{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight r %}
query <- ... # a StreamingQuery
lastProgress(query)
'''
Will print something like the following.
{
"id" : "8c57e1ec-94b5-4c99-b100-f694162df0b9",
"runId" : "ae505c5a-a64e-4896-8c28-c7cbaf926f16",
"name" : null,
"timestamp" : "2017-04-26T08:27:28.835Z",
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 0,
"triggerExecution" : 1
},
"stateOperators" : [ {
"numRowsTotal" : 4,
"numRowsUpdated" : 0
} ],
"sources" : [ {
"description" : "TextSocketSource[host: localhost, port: 9999]",
"startOffset" : 1,
"endOffset" : 1,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@76b37531"
}
}
'''
status(query)
'''
Will print something like the following.
{
"message" : "Waiting for data to arrive",
"isDataAvailable" : false,
"isTriggerActive" : false
}
'''
{% endhighlight %}
</div>
</div>
......@@ -1703,11 +1908,17 @@ spark.streams().addListener(new StreamingQueryListener() {
Not available in Python.
{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight bash %}
Not available in R.
{% endhighlight %}
</div>
</div>
## Recovering from Failures with Checkpointing
In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries).
In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries).
<div class="codetabs">
<div data-lang="scala" markdown="1">
......@@ -1745,20 +1956,18 @@ aggDF \
.start()
{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight r %}
write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "path/to/HDFS/dir")
{% endhighlight %}
</div>
</div>
# Where to go from here
- Examples: See and run the
[Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming)/[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/sql/streaming)/[Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/sql/streaming)
- Examples: See and run the
[Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming)/[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/sql/streaming)/[Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/sql/streaming)/[R]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/r/streaming)
examples.
- Spark Summit 2016 Talk - [A Deep Dive into Structured Streaming](https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/)
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Counts words in UTF8 encoded, '\n' delimited text received from the network.
# To run this on your local machine, you need to first run a Netcat server
# $ nc -lk 9999
# and then run the example
# ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
# Load SparkR library into your R session
library(SparkR)
# Initialize SparkSession
sparkR.session(appName = "SparkR-Streaming-structured-network-wordcount-example")
args <- commandArgs(trailing = TRUE)
if (length(args) != 2) {
print("Usage: structured_network_wordcount.R <hostname> <port>")
print("<hostname> and <port> describe the TCP server that Structured Streaming")
print("would connect to receive data.")
q("no")
}
hostname <- args[[1]]
port <- as.integer(args[[2]])
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines <- read.stream("socket", host = hostname, port = port)
# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")
# Generate running word count
wordCounts <- count(groupBy(words, "word"))
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")
awaitTermination(query)
sparkR.session.stop()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment