From b8302ccd02265f9d7a7895c7b033441fa2d8ffd1 Mon Sep 17 00:00:00 2001
From: Felix Cheung <felixcheung_m@hotmail.com>
Date: Thu, 4 May 2017 00:27:10 -0700
Subject: [PATCH] [SPARK-20015][SPARKR][SS][DOC][EXAMPLE] Document R Structured
 Streaming (experimental) in R vignettes and R & SS programming guide, R
 example

## What changes were proposed in this pull request?

Add
- R vignettes
- R programming guide
- SS programming guide
- R example

Also disable spark.als in vignettes for now since it's failing (SPARK-20402)

## How was this patch tested?

manually

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #17814 from felixcheung/rdocss.
---
 R/pkg/vignettes/sparkr-vignettes.Rmd          |  79 ++++-
 docs/sparkr.md                                |   4 +
 .../structured-streaming-programming-guide.md | 285 +++++++++++++++---
 .../streaming/structured_network_wordcount.R  |  57 ++++
 4 files changed, 381 insertions(+), 44 deletions(-)
 create mode 100644 examples/src/main/r/streaming/structured_network_wordcount.R

diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd b/R/pkg/vignettes/sparkr-vignettes.Rmd
index 4b9d6c3806..d38ec4f1b6 100644
--- a/R/pkg/vignettes/sparkr-vignettes.Rmd
+++ b/R/pkg/vignettes/sparkr-vignettes.Rmd
@@ -182,7 +182,7 @@ head(df)
 ```
 
 ### Data Sources
-SparkR supports operating on a variety of data sources through the `SparkDataFrame` interface. You can check the Spark SQL programming guide for more [specific options](https://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources.
+SparkR supports operating on a variety of data sources through the `SparkDataFrame` interface. You can check the Spark SQL Programming Guide for more [specific options](https://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources.
 
 The general method for creating `SparkDataFrame` from data sources is `read.df`. This method takes in the path for the file to load and the type of data source, and the currently active Spark Session will be used automatically. SparkR supports reading CSV, JSON and Parquet files natively and through Spark Packages you can find data source connectors for popular file formats like Avro. These packages can be added with `sparkPackages` parameter when initializing SparkSession using `sparkR.session`.
 
@@ -232,7 +232,7 @@ write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite"
 ```
 
 ### Hive Tables
-You can also create SparkDataFrames from Hive tables. To do this we will need to create a SparkSession with Hive support which can access tables in the Hive MetaStore. Note that Spark should have been built with Hive support and more details can be found in the [SQL programming guide](https://spark.apache.org/docs/latest/sql-programming-guide.html). In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (`enableHiveSupport = TRUE`).
+You can also create SparkDataFrames from Hive tables. To do this we will need to create a SparkSession with Hive support which can access tables in the Hive MetaStore. Note that Spark should have been built with Hive support and more details can be found in the [SQL Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html). In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (`enableHiveSupport = TRUE`).
 
 ```{r, eval=FALSE}
 sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
@@ -314,7 +314,7 @@ Use `cube` or `rollup` to compute subtotals across multiple dimensions.
 mean(cube(carsDF, "cyl", "gear", "am"), "mpg")
 ```
 
-generates groupings for {(`cyl`, `gear`, `am`), (`cyl`, `gear`), (`cyl`), ()}, while 
+generates groupings for {(`cyl`, `gear`, `am`), (`cyl`, `gear`), (`cyl`), ()}, while
 
 ```{r}
 mean(rollup(carsDF, "cyl", "gear", "am"), "mpg")
@@ -672,6 +672,7 @@ head(select(naiveBayesPrediction, "Class", "Sex", "Age", "Survived", "prediction
 Survival analysis studies the expected duration of time until an event happens, and often the relationship with risk factors or treatment taken on the subject. In contrast to standard regression analysis, survival modeling has to deal with special characteristics in the data including non-negative survival time and censoring.
 
 Accelerated Failure Time (AFT) model is a parametric survival model for censored data that assumes the effect of a covariate is to accelerate or decelerate the life course of an event by some constant. For more information, refer to the Wikipedia page [AFT Model](https://en.wikipedia.org/wiki/Accelerated_failure_time_model) and the references there. Different from a [Proportional Hazards Model](https://en.wikipedia.org/wiki/Proportional_hazards_model) designed for the same purpose, the AFT model is easier to parallelize because each instance contributes to the objective function independently.
+
 ```{r, warning=FALSE}
 library(survival)
 ovarianDF <- createDataFrame(ovarian)
@@ -902,7 +903,7 @@ perplexity
 
 There are multiple options that can be configured in `spark.als`, including `rank`, `reg`, `nonnegative`. For a complete list, refer to the help file.
 
-```{r}
+```{r, eval=FALSE}
 ratings <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
                 list(2, 1, 1.0), list(2, 2, 5.0))
 df <- createDataFrame(ratings, c("user", "item", "rating"))
@@ -910,7 +911,7 @@ model <- spark.als(df, "rating", "user", "item", rank = 10, reg = 0.1, nonnegati
 ```
 
 Extract latent factors.
-```{r}
+```{r, eval=FALSE}
 stats <- summary(model)
 userFactors <- stats$userFactors
 itemFactors <- stats$itemFactors
@@ -920,7 +921,7 @@ head(itemFactors)
 
 Make predictions.
 
-```{r}
+```{r, eval=FALSE}
 predicted <- predict(model, df)
 head(predicted)
 ```
@@ -1002,6 +1003,72 @@ unlink(modelPath)
 ```
 
 
+## Structured Streaming
+
+SparkR supports the Structured Streaming API (experimental).
+
+You can check the Structured Streaming Programming Guide for [an introduction](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model) to its programming model and basic concepts.
+
+### Simple Source and Sink
+
+Spark has a few built-in input sources. As an example, to test with a socket source reading text into words and displaying the computed word counts:
+
+```{r, eval=FALSE}
+# Create DataFrame representing the stream of input lines from connection
+lines <- read.stream("socket", host = hostname, port = port)
+
+# Split the lines into words
+words <- selectExpr(lines, "explode(split(value, ' ')) as word")
+
+# Generate running word count
+wordCounts <- count(groupBy(words, "word"))
+
+# Start running the query that prints the running counts to the console
+query <- write.stream(wordCounts, "console", outputMode = "complete")
+```
+
+### Kafka Source
+
+It is simple to read data from Kafka. For more information, see [Input Sources](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources) supported by Structured Streaming.
+
+```{r, eval=FALSE}
+topic <- read.stream("kafka",
+                     kafka.bootstrap.servers = "host1:port1,host2:port2",
+                     subscribe = "topic1")
+keyvalue <- selectExpr(topic, "CAST(key AS STRING)", "CAST(value AS STRING)")
+```
+
+### Operations and Sinks
+
+Most of the common operations on `SparkDataFrame` are supported for streaming, including selection, projection, and aggregation. Once you have defined the final result, to start the streaming computation, you will call the `write.stream` method setting a sink and `outputMode`.
+
+A streaming `SparkDataFrame` can be written for debugging to the console, to a temporary in-memory table, or for further processing in a fault-tolerant manner to a File Sink in different formats.
+
+```{r, eval=FALSE}
+noAggDF <- select(where(deviceDataStreamingDf, "signal > 10"), "device")
+
+# Print new data to console
+write.stream(noAggDF, "console")
+
+# Write new data to Parquet files
+write.stream(noAggDF,
+             "parquet",
+             path = "path/to/destination/dir",
+             checkpointLocation = "path/to/checkpoint/dir")
+
+# Aggregate
+aggDF <- count(groupBy(noAggDF, "device"))
+
+# Print updated aggregations to console
+write.stream(aggDF, "console", outputMode = "complete")
+
+# Have all the aggregates in an in memory table. The query name will be the table name
+write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")
+
+head(sql("select * from aggregates"))
+```
+
+
 ## Advanced Topics
 
 ### SparkR Object Classes
diff --git a/docs/sparkr.md b/docs/sparkr.md
index 6dbd02a488..569b85e72c 100644
--- a/docs/sparkr.md
+++ b/docs/sparkr.md
@@ -593,6 +593,10 @@ The following example shows how to save/load a MLlib model by SparkR.
 </tr>
 </table>
 
+# Structured Streaming
+
+SparkR supports the Structured Streaming API (experimental). Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. For more information see the R API on the [Structured Streaming Programming Guide](structured-streaming-programming-guide.html)
+
 # R Function Name Conflicts
 
 When loading and attaching a new package in R, it is possible to have a name [conflict](https://stat.ethz.ch/R-manual/R-devel/library/base/html/library.html), where a
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 5b18cf2f3c..53b3db21da 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -8,13 +8,13 @@ title: Structured Streaming Programming Guide
 {:toc}
 
 # Overview
-Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java or Python to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.*
+Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.*
 
-**Structured Streaming is still ALPHA in Spark 2.1** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count. 
+**Structured Streaming is still ALPHA in Spark 2.1** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count.
 
 # Quick Example
-Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in 
-[Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java)/[Python]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/sql/streaming/structured_network_wordcount.py).
+Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in
+[Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java)/[Python]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/sql/streaming/structured_network_wordcount.py)/[R]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/r/streaming/structured_network_wordcount.R).
 And if you [download Spark](http://spark.apache.org/downloads.html), you can directly run the example. In any case, let’s walk through the example step-by-step and understand how it works. First, we have to import the necessary classes and create a local SparkSession, the starting point of all functionalities related to Spark.
 
 <div class="codetabs">
@@ -63,6 +63,13 @@ spark = SparkSession \
     .getOrCreate()
 {% endhighlight %}
 
+</div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+sparkR.session(appName = "StructuredNetworkWordCount")
+{% endhighlight %}
+
 </div>
 </div>
 
@@ -136,6 +143,22 @@ wordCounts = words.groupBy("word").count()
 
 This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function `alias` to name the new column as "word". Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
 
+</div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+# Create DataFrame representing the stream of input lines from connection to localhost:9999
+lines <- read.stream("socket", host = "localhost", port = 9999)
+
+# Split the lines into words
+words <- selectExpr(lines, "explode(split(value, ' ')) as word")
+
+# Generate running word count
+wordCounts <- count(group_by(words, "word"))
+{% endhighlight %}
+
+This `lines` SparkDataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have a SQL expression with two SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we name the new column as "word". Finally, we have defined the `wordCounts` SparkDataFrame by grouping by the unique values in the SparkDataFrame and counting them. Note that this is a streaming SparkDataFrame which represents the running word counts of the stream.
+
 </div>
 </div>
 
@@ -181,10 +204,20 @@ query = wordCounts \
 query.awaitTermination()
 {% endhighlight %}
 
+</div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+# Start running the query that prints the running counts to the console
+query <- write.stream(wordCounts, "console", outputMode = "complete")
+
+awaitTermination(query)
+{% endhighlight %}
+
 </div>
 </div>
 
-After this code is executed, the streaming computation will have started in the background. The `query` object is a handle to that active streaming query, and we have decided to wait for the termination of the query using `query.awaitTermination()` to prevent the process from exiting while the query is active.
+After this code is executed, the streaming computation will have started in the background. The `query` object is a handle to that active streaming query, and we have decided to wait for the termination of the query using `awaitTermination()` to prevent the process from exiting while the query is active.
 
 To actually execute this example code, you can either compile the code in your own 
 [Spark application](quick-start.html#self-contained-applications), or simply 
@@ -211,6 +244,11 @@ $ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetwor
 $ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
 {% endhighlight %}
 </div>
+<div data-lang="r"  markdown="1">
+{% highlight bash %}
+$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
+{% endhighlight %}
+</div>
 </div>
 
 Then, any lines typed in the terminal running the netcat server will be counted and printed on screen every second. It will look something like the following.
@@ -325,6 +363,35 @@ Batch: 0
 | spark|    1|
 +------+-----+
 
+-------------------------------------------
+Batch: 1
+-------------------------------------------
++------+-----+
+| value|count|
++------+-----+
+|apache|    2|
+| spark|    1|
+|hadoop|    1|
++------+-----+
+...
+{% endhighlight %}
+</div>
+<div data-lang="r" markdown="1">
+{% highlight bash %}
+# TERMINAL 2: RUNNING structured_network_wordcount.R
+
+$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
+
+-------------------------------------------
+Batch: 0
+-------------------------------------------
++------+-----+
+| value|count|
++------+-----+
+|apache|    1|
+| spark|    1|
++------+-----+
+
 -------------------------------------------
 Batch: 1
 -------------------------------------------
@@ -409,14 +476,14 @@ to track the read position in the stream. The engine uses checkpointing and writ
 
 # API using Datasets and DataFrames
 Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as well as streaming, unbounded data. Similar to static Datasets/DataFrames, you can use the common entry point `SparkSession`
-([Scala](api/scala/index.html#org.apache.spark.sql.SparkSession)/[Java](api/java/org/apache/spark/sql/SparkSession.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.SparkSession) docs)
+([Scala](api/scala/index.html#org.apache.spark.sql.SparkSession)/[Java](api/java/org/apache/spark/sql/SparkSession.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.SparkSession)/[R](api/R/sparkR.session.html) docs)
 to create streaming DataFrames/Datasets from streaming sources, and apply the same operations on them as static DataFrames/Datasets. If you are not familiar with Datasets/DataFrames, you are strongly advised to familiarize yourself with them using the
 [DataFrame/Dataset Programming Guide](sql-programming-guide.html).
 
 ## Creating streaming DataFrames and streaming Datasets
-Streaming DataFrames can be created through the `DataStreamReader` interface 
+Streaming DataFrames can be created through the `DataStreamReader` interface
 ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamReader.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader) docs)
-returned by `SparkSession.readStream()`. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.
+returned by `SparkSession.readStream()`. In [R](api/R/read.stream.html), with the `read.stream()` method. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.
 
 #### Input Sources
 In Spark 2.0, there are a few built-in sources.
@@ -445,7 +512,8 @@ Here are the details of all the sources in Spark.
         <code>path</code>: path to the input directory, and common to all file formats.
         <br/><br/>
         For file-format-specific options, see the related methods in <code>DataStreamReader</code>
-        (<a href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>).
+        (<a href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>/<a
+        href="api/R/read.stream.html">R</a>).
         E.g. for "parquet" format options see <code>DataStreamReader.parquet()</code></td>
     <td>Yes</td>
     <td>Supports glob paths, but does not support multiple comma-separated paths/globs.</td>
@@ -483,7 +551,7 @@ Here are some examples.
 {% highlight scala %}
 val spark: SparkSession = ...
 
-// Read text from socket 
+// Read text from socket
 val socketDF = spark
   .readStream
   .format("socket")
@@ -493,7 +561,7 @@ val socketDF = spark
 
 socketDF.isStreaming    // Returns True for DataFrames that have streaming sources
 
-socketDF.printSchema 
+socketDF.printSchema
 
 // Read all the csv files written atomically in a directory
 val userSchema = new StructType().add("name", "string").add("age", "integer")
@@ -510,7 +578,7 @@ val csvDF = spark
 {% highlight java %}
 SparkSession spark = ...
 
-// Read text from socket 
+// Read text from socket
 Dataset<Row> socketDF = spark
   .readStream()
   .format("socket")
@@ -537,7 +605,7 @@ Dataset<Row> csvDF = spark
 {% highlight python %}
 spark = SparkSession. ...
 
-# Read text from socket 
+# Read text from socket
 socketDF = spark \
     .readStream \
     .format("socket") \
@@ -547,7 +615,7 @@ socketDF = spark \
 
 socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources
 
-socketDF.printSchema() 
+socketDF.printSchema()
 
 # Read all the csv files written atomically in a directory
 userSchema = StructType().add("name", "string").add("age", "integer")
@@ -558,6 +626,25 @@ csvDF = spark \
     .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")
 {% endhighlight %}
 
+</div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+sparkR.session(...)
+
+# Read text from socket
+socketDF <- read.stream("socket", host = hostname, port = port)
+
+isStreaming(socketDF)    # Returns TRUE for SparkDataFrames that have streaming sources
+
+printSchema(socketDF)
+
+# Read all the csv files written atomically in a directory
+schema <- structType(structField("name", "string"),
+                     structField("age", "integer"))
+csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep = ";")
+{% endhighlight %}
+
 </div>
 </div>
 
@@ -638,12 +725,24 @@ ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(),
 df = ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }
 
 # Select the devices which have signal more than 10
-df.select("device").where("signal > 10")                              
+df.select("device").where("signal > 10")
 
 # Running count of the number of updates for each device type
 df.groupBy("deviceType").count()
 {% endhighlight %}
 </div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+df <- ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }
+
+# Select the devices which have signal more than 10
+select(where(df, "signal > 10"), "device")
+
+# Running count of the number of updates for each device type
+count(groupBy(df, "deviceType"))
+{% endhighlight %}
+</div>
 </div>
 
 ### Window Operations on Event Time
@@ -840,7 +939,7 @@ Streaming DataFrames can be joined with static DataFrames to create new streamin
 
 {% highlight scala %}
 val staticDf = spark.read. ...
-val streamingDf = spark.readStream. ... 
+val streamingDf = spark.readStream. ...
 
 streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
 streamingDf.join(staticDf, "type", "right_join")  // right outer join with a static DF  
@@ -972,7 +1071,7 @@ Once you have defined the final result DataFrame/Dataset, all that is left is fo
 ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamWriter.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamWriter) docs)
 returned through `Dataset.writeStream()`. You will have to specify one or more of the following in this interface.
 
-- *Details of the output sink:* Data format, location, etc. 
+- *Details of the output sink:* Data format, location, etc.
 
 - *Output mode:* Specify what gets written to the output sink.
 
@@ -1077,7 +1176,7 @@ Here is the compatibility matrix.
 #### Output Sinks
 There are a few types of built-in output sinks.
 
-- **File sink** - Stores the output to a directory. 
+- **File sink** - Stores the output to a directory.
 
 {% highlight scala %}
 writeStream
@@ -1145,7 +1244,8 @@ Here are the details of all the sinks in Spark.
         · "s3a://a/b/c/dataset.txt"<br/>
         <br/>
         For file-format-specific options, see the related methods in DataFrameWriter
-        (<a href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>).
+        (<a href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>/<a
+        href="api/R/write.stream.html">R</a>).
         E.g. for "parquet" format options see <code>DataFrameWriter.parquet()</code>
     </td>
     <td>Yes</td>
@@ -1208,7 +1308,7 @@ noAggDF
   .option("checkpointLocation", "path/to/checkpoint/dir")
   .option("path", "path/to/destination/dir")
   .start()
-   
+
 // ========== DF with aggregation ==========
 val aggDF = df.groupBy("device").count()
 
@@ -1219,7 +1319,7 @@ aggDF
   .format("console")
   .start()
 
-// Have all the aggregates in an in-memory table 
+// Have all the aggregates in an in-memory table
 aggDF
   .writeStream
   .queryName("aggregates")    // this query name will be the table name
@@ -1250,7 +1350,7 @@ noAggDF
   .option("checkpointLocation", "path/to/checkpoint/dir")
   .option("path", "path/to/destination/dir")
   .start();
-   
+
 // ========== DF with aggregation ==========
 Dataset<Row> aggDF = df.groupBy("device").count();
 
@@ -1261,7 +1361,7 @@ aggDF
   .format("console")
   .start();
 
-// Have all the aggregates in an in-memory table 
+// Have all the aggregates in an in-memory table
 aggDF
   .writeStream()
   .queryName("aggregates")    // this query name will be the table name
@@ -1292,7 +1392,7 @@ noAggDF \
     .option("checkpointLocation", "path/to/checkpoint/dir") \
     .option("path", "path/to/destination/dir") \
     .start()
-   
+
 # ========== DF with aggregation ==========
 aggDF = df.groupBy("device").count()
 
@@ -1314,6 +1414,35 @@ aggDF \
 spark.sql("select * from aggregates").show()   # interactively query in-memory table
 {% endhighlight %}
 
+</div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+# ========== DF with no aggregations ==========
+noAggDF <- select(where(deviceDataDf, "signal > 10"), "device")
+
+# Print new data to console
+write.stream(noAggDF, "console")
+
+# Write new data to Parquet files
+write.stream(noAggDF,
+             "parquet",
+             path = "path/to/destination/dir",
+             checkpointLocation = "path/to/checkpoint/dir")
+
+# ========== DF with aggregation ==========
+aggDF <- count(groupBy(df, "device"))
+
+# Print updated aggregations to console
+write.stream(aggDF, "console", outputMode = "complete")
+
+# Have all the aggregates in an in memory table. The query name will be the table name
+write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")
+
+# Interactively query in-memory table
+head(sql("select * from aggregates"))
+{% endhighlight %}
+
 </div>
 </div>
 
@@ -1351,7 +1480,7 @@ query.name        // get the name of the auto-generated or user-specified name
 
 query.explain()   // print detailed explanations of the query
 
-query.stop()      // stop the query 
+query.stop()      // stop the query
 
 query.awaitTermination()   // block until query is terminated, with stop() or with error
 
@@ -1403,7 +1532,7 @@ query.name()        # get the name of the auto-generated or user-specified name
 
 query.explain()   # print detailed explanations of the query
 
-query.stop()      # stop the query 
+query.stop()      # stop the query
 
 query.awaitTermination()   # block until query is terminated, with stop() or with error
 
@@ -1415,6 +1544,24 @@ query.lastProgress()    # the most recent progress update of this streaming quer
 
 {% endhighlight %}
 
+</div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+query <- write.stream(df, "console")  # get the query object
+
+queryName(query)          # get the name of the auto-generated or user-specified name
+
+explain(query)            # print detailed explanations of the query
+
+stopQuery(query)          # stop the query
+
+awaitTermination(query)   # block until query is terminated, with stop() or with error
+
+lastProgress(query)       # the most recent progress update of this streaming query
+
+{% endhighlight %}
+
 </div>
 </div>
 
@@ -1461,6 +1608,12 @@ spark.streams().get(id)  # get a query object by its unique id
 spark.streams().awaitAnyTermination()  # block until any one of them terminates
 {% endhighlight %}
 
+</div>
+<div data-lang="r"  markdown="1">
+{% highlight bash %}
+Not available in R.
+{% endhighlight %}
+
 </div>
 </div>
 
@@ -1644,6 +1797,58 @@ Will print something like the following.
 '''
 {% endhighlight %}
 
+</div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+query <- ...  # a StreamingQuery
+lastProgress(query)
+
+'''
+Will print something like the following.
+
+{
+  "id" : "8c57e1ec-94b5-4c99-b100-f694162df0b9",
+  "runId" : "ae505c5a-a64e-4896-8c28-c7cbaf926f16",
+  "name" : null,
+  "timestamp" : "2017-04-26T08:27:28.835Z",
+  "numInputRows" : 0,
+  "inputRowsPerSecond" : 0.0,
+  "processedRowsPerSecond" : 0.0,
+  "durationMs" : {
+    "getOffset" : 0,
+    "triggerExecution" : 1
+  },
+  "stateOperators" : [ {
+    "numRowsTotal" : 4,
+    "numRowsUpdated" : 0
+  } ],
+  "sources" : [ {
+    "description" : "TextSocketSource[host: localhost, port: 9999]",
+    "startOffset" : 1,
+    "endOffset" : 1,
+    "numInputRows" : 0,
+    "inputRowsPerSecond" : 0.0,
+    "processedRowsPerSecond" : 0.0
+  } ],
+  "sink" : {
+    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@76b37531"
+  }
+}
+'''
+
+status(query)
+'''
+Will print something like the following.
+
+{
+  "message" : "Waiting for data to arrive",
+  "isDataAvailable" : false,
+  "isTriggerActive" : false
+}
+'''
+{% endhighlight %}
+
 </div>
 </div>
 
@@ -1703,11 +1908,17 @@ spark.streams().addListener(new StreamingQueryListener() {
 Not available in Python.
 {% endhighlight %}
 
+</div>
+<div data-lang="r"  markdown="1">
+{% highlight bash %}
+Not available in R.
+{% endhighlight %}
+
 </div>
 </div>
 
 ## Recovering from Failures with Checkpointing 
-In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries). 
+In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries).
 
 <div class="codetabs">
 <div data-lang="scala"  markdown="1">
@@ -1745,20 +1956,18 @@ aggDF \
     .start()
 {% endhighlight %}
 
+</div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "path/to/HDFS/dir")
+{% endhighlight %}
+
 </div>
 </div>
 
 # Where to go from here
-- Examples: See and run the 
-[Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming)/[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/sql/streaming)/[Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/sql/streaming) 
+- Examples: See and run the
+[Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming)/[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/sql/streaming)/[Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/sql/streaming)/[R]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/r/streaming)
 examples.
 - Spark Summit 2016 Talk - [A Deep Dive into Structured Streaming](https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/)
-
-
-
-
-
-
-
-
-
diff --git a/examples/src/main/r/streaming/structured_network_wordcount.R b/examples/src/main/r/streaming/structured_network_wordcount.R
new file mode 100644
index 0000000000..cda18ebc07
--- /dev/null
+++ b/examples/src/main/r/streaming/structured_network_wordcount.R
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Counts words in UTF8 encoded, '\n' delimited text received from the network.
+
+# To run this on your local machine, you need to first run a Netcat server
+# $ nc -lk 9999
+# and then run the example
+# ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
+
+# Load SparkR library into your R session
+library(SparkR)
+
+# Initialize SparkSession
+sparkR.session(appName = "SparkR-Streaming-structured-network-wordcount-example")
+
+args <- commandArgs(trailing = TRUE)
+
+if (length(args) != 2) {
+  print("Usage: structured_network_wordcount.R <hostname> <port>")
+  print("<hostname> and <port> describe the TCP server that Structured Streaming")
+  print("would connect to receive data.")
+  q("no")
+}
+
+hostname <- args[[1]]
+port <- as.integer(args[[2]])
+
+# Create DataFrame representing the stream of input lines from connection to localhost:9999
+lines <- read.stream("socket", host = hostname, port = port)
+
+# Split the lines into words
+words <- selectExpr(lines, "explode(split(value, ' ')) as word")
+
+# Generate running word count
+wordCounts <- count(groupBy(words, "word"))
+
+# Start running the query that prints the running counts to the console
+query <- write.stream(wordCounts, "console", outputMode = "complete")
+
+awaitTermination(query)
+
+sparkR.session.stop()
-- 
GitLab