Skip to content
Snippets Groups Projects
Commit d489e1dc authored by Liwei Lin's avatar Liwei Lin Committed by Shixiong Zhu
Browse files

[SPARK-19041][SS] Fix code snippet compilation issues in Structured Streaming Programming Guide

## What changes were proposed in this pull request?

Currently some code snippets in the programming guide just do not compile. We should fix them.

## How was this patch tested?

```
SKIP_API=1 jekyll build
```

## Screenshot from part of the change:

![snip20161231_37](https://cloud.githubusercontent.com/assets/15843379/21576864/cc52fcd8-cf7b-11e6-8bd6-f935d9ff4a6b.png)

Author: Liwei Lin <lwlin7@gmail.com>

Closes #16442 from lw-lin/ss-pro-guide-.
parent 517f3983
No related branches found
No related tags found
No related merge requests found
......@@ -537,9 +537,9 @@ Most of the common operations on DataFrame/Dataset are supported for streaming.
<div data-lang="scala" markdown="1">
{% highlight scala %}
case class DeviceData(device: String, type: String, signal: Double, time: DateTime)
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: string }
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data
// Select the devices which have signal more than 10
......@@ -547,11 +547,11 @@ df.select("device").where("signal > 10") // using untyped APIs
ds.filter(_.signal > 10).map(_.device) // using typed APIs
// Running count of the number of updates for each device type
df.groupBy("type").count() // using untyped API
df.groupBy("deviceType").count() // using untyped API
// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(_.type).agg(typed.avg(_.signal)) // using typed API
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // using typed API
{% endhighlight %}
</div>
......@@ -565,7 +565,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
public class DeviceData {
private String device;
private String type;
private String deviceType;
private Double signal;
private java.sql.Date time;
...
......@@ -590,13 +590,13 @@ ds.filter(new FilterFunction<DeviceData>() { // using typed APIs
}, Encoders.STRING());
// Running count of the number of updates for each device type
df.groupBy("type").count(); // using untyped API
df.groupBy("deviceType").count(); // using untyped API
// Running average signal for each device type
ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API
@Override
public String call(DeviceData value) throws Exception {
return value.getType();
return value.getDeviceType();
}
}, Encoders.STRING()).agg(typed.avg(new MapFunction<DeviceData, Double>() {
@Override
......@@ -611,13 +611,13 @@ ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API
<div data-lang="python" markdown="1">
{% highlight python %}
df = ... # streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
df = ... # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }
# Select the devices which have signal more than 10
df.select("device").where("signal > 10")
# Running count of the number of updates for each device type
df.groupBy("type").count()
df.groupBy("deviceType").count()
{% endhighlight %}
</div>
</div>
......@@ -973,7 +973,7 @@ Here is a table of all the sinks, and the corresponding settings.
<tr>
<td><b>File Sink</b></td>
<td>Append</td>
<td><pre>writeStream<br/> .format("parquet")<br/> .start()</pre></td>
<td><pre>writeStream<br/> .format("parquet")<br/> .option(<br/> "checkpointLocation",<br/> "path/to/checkpoint/dir")<br/> .option(<br/> "path",<br/> "path/to/destination/dir")<br/> .start()</pre></td>
<td>Yes</td>
<td>Supports writes to partitioned tables. Partitioning by time may be useful.</td>
</tr>
......@@ -1026,7 +1026,9 @@ noAggDF
// Write new data to Parquet files
noAggDF
.writeStream
.parquet("path/to/destination/directory")
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start()
// ========== DF with aggregation ==========
......@@ -1066,7 +1068,9 @@ noAggDF
// Write new data to Parquet files
noAggDF
.writeStream()
.parquet("path/to/destination/directory")
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start();
// ========== DF with aggregation ==========
......@@ -1106,7 +1110,9 @@ noAggDF \
# Write new data to Parquet files
noAggDF \
.writeStream() \
.parquet("path/to/destination/directory") \
.format("parquet") \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.option("path", "path/to/destination/dir") \
.start()
# ========== DF with aggregation ==========
......@@ -1120,11 +1126,11 @@ aggDF \
.start()
# Have all the aggregates in an in memory table. The query name will be the table name
aggDF\
.writeStream()\
.queryName("aggregates")\
.outputMode("complete")\
.format("memory")\
aggDF \
.writeStream() \
.queryName("aggregates") \
.outputMode("complete") \
.format("memory") \
.start()
spark.sql("select * from aggregates").show() # interactively query in-memory table
......@@ -1159,7 +1165,9 @@ The `StreamingQuery` object created when a query is started can be used to monit
{% highlight scala %}
val query = df.writeStream.format("console").start() // get the query object
query.id // get the unique identifier of the running query
query.id // get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId // get the unique id of this run of the query, which will be generated at every start/restart
query.name // get the name of the auto-generated or user-specified name
......@@ -1169,11 +1177,11 @@ query.stop() // stop the query
query.awaitTermination() // block until query is terminated, with stop() or with error
query.exception() // the exception if the query has been terminated with error
query.exception // the exception if the query has been terminated with error
query.sourceStatus() // progress information about data has been read from the input sources
query.recentProgress // an array of the most recent progress updates for this query
query.sinkStatus() // progress information about data written to the output sink
query.lastProgress // the most recent progress update of this streaming query
{% endhighlight %}
......@@ -1183,21 +1191,23 @@ query.sinkStatus() // progress information about data written to the output si
{% highlight java %}
StreamingQuery query = df.writeStream().format("console").start(); // get the query object
query.id(); // get the unique identifier of the running query
query.id(); // get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId(); // get the unique id of this run of the query, which will be generated at every start/restart
query.name(); // get the name of the auto-generated or user-specified name
query.explain(); // print detailed explanations of the query
query.stop(); // stop the query
query.stop(); // stop the query
query.awaitTermination(); // block until query is terminated, with stop() or with error
query.exception(); // the exception if the query has been terminated with error
query.exception(); // the exception if the query has been terminated with error
query.sourceStatus(); // progress information about data has been read from the input sources
query.recentProgress(); // an array of the most recent progress updates for this query
query.sinkStatus(); // progress information about data written to the output sink
query.lastProgress(); // the most recent progress update of this streaming query
{% endhighlight %}
......@@ -1207,7 +1217,9 @@ query.sinkStatus(); // progress information about data written to the output s
{% highlight python %}
query = df.writeStream().format("console").start() # get the query object
query.id() # get the unique identifier of the running query
query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId() # get the unique id of this run of the query, which will be generated at every start/restart
query.name() # get the name of the auto-generated or user-specified name
......@@ -1217,11 +1229,11 @@ query.stop() # stop the query
query.awaitTermination() # block until query is terminated, with stop() or with error
query.exception() # the exception if the query has been terminated with error
query.exception() # the exception if the query has been terminated with error
query.sourceStatus() # progress information about data has been read from the input sources
query.recentProgress() # an array of the most recent progress updates for this query
query.sinkStatus() # progress information about data written to the output sink
query.lastProgress() # the most recent progress update of this streaming query
{% endhighlight %}
......@@ -1491,14 +1503,17 @@ spark.streams.addListener(new StreamingQueryListener() {
{% highlight java %}
SparkSession spark = ...
spark.streams.addListener(new StreamingQueryListener() {
@Overrides void onQueryStarted(QueryStartedEvent queryStarted) {
spark.streams().addListener(new StreamingQueryListener() {
@Override
public void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("Query started: " + queryStarted.id());
}
@Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
@Override
public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("Query terminated: " + queryTerminated.id());
}
@Overrides void onQueryProgress(QueryProgressEvent queryProgress) {
@Override
public void onQueryProgress(QueryProgressEvent queryProgress) {
System.out.println("Query made progress: " + queryProgress.progress());
}
});
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment