Skip to content
Snippets Groups Projects
Commit 58855991 authored by hyukjinkwon's avatar hyukjinkwon Committed by Sean Owen
Browse files

[MINOR][DOC] Use standard quotes instead of "curly quote" marks from Mac in...

[MINOR][DOC] Use standard quotes instead of "curly quote" marks from Mac in structured streaming programming guides

## What changes were proposed in this pull request?

This PR fixes curly quotes (`“` and `”` ) to standard quotes (`"`).

This will be a actual problem when users copy and paste the examples. This would not work.

This seems only happening in `structured-streaming-programming-guide.md`.

## How was this patch tested?

Manually built.

This will change some examples to be correctly marked down as below:

![2016-08-23 3 24 13](https://cloud.githubusercontent.com/assets/6477701/17882878/2a38332e-694a-11e6-8e84-76bdb89151e0.png)

to

![2016-08-23 3 26 06](https://cloud.githubusercontent.com/assets/6477701/17882888/376eaa28-694a-11e6-8b88-32ea83997037.png)

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #14770 from HyukjinKwon/minor-quotes.
parent 8fd63e80
No related branches found
No related tags found
No related merge requests found
...@@ -88,7 +88,7 @@ val words = lines.as[String].flatMap(_.split(" ")) ...@@ -88,7 +88,7 @@ val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count() val wordCounts = words.groupBy("value").count()
{% endhighlight %} {% endhighlight %}
This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named value, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using `.as[String]`, so that we can apply the `flatMap` operation to split each line into multiple words. The resultant `words` Dataset contains all the words. Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream. This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using `.as[String]`, so that we can apply the `flatMap` operation to split each line into multiple words. The resultant `words` Dataset contains all the words. Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
...@@ -117,7 +117,7 @@ Dataset<String> words = lines ...@@ -117,7 +117,7 @@ Dataset<String> words = lines
Dataset<Row> wordCounts = words.groupBy("value").count(); Dataset<Row> wordCounts = words.groupBy("value").count();
{% endhighlight %} {% endhighlight %}
This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named value, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using `.as(Encoders.STRING())`, so that we can apply the `flatMap` operation to split each line into multiple words. The resultant `words` Dataset contains all the words. Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream. This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using `.as(Encoders.STRING())`, so that we can apply the `flatMap` operation to split each line into multiple words. The resultant `words` Dataset contains all the words. Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
</div> </div>
<div data-lang="python" markdown="1"> <div data-lang="python" markdown="1">
...@@ -142,12 +142,12 @@ words = lines.select( ...@@ -142,12 +142,12 @@ words = lines.select(
wordCounts = words.groupBy('word').count() wordCounts = words.groupBy('word').count()
{% endhighlight %} {% endhighlight %}
This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named value, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function `alias` to name the new column as word. Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream. This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function `alias` to name the new column as "word". Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
</div> </div>
</div> </div>
We have now set up the query on the streaming data. All that is left is to actually start receiving data and computing the counts. To do this, we set it up to print the complete set of counts (specified by `outputMode(complete)`) to the console every time they are updated. And then start the streaming computation using `start()`. We have now set up the query on the streaming data. All that is left is to actually start receiving data and computing the counts. To do this, we set it up to print the complete set of counts (specified by `outputMode("complete")`) to the console every time they are updated. And then start the streaming computation using `start()`.
<div class="codetabs"> <div class="codetabs">
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
...@@ -361,16 +361,16 @@ table, and Spark runs it as an *incremental* query on the *unbounded* input ...@@ -361,16 +361,16 @@ table, and Spark runs it as an *incremental* query on the *unbounded* input
table. Let’s understand this model in more detail. table. Let’s understand this model in more detail.
## Basic Concepts ## Basic Concepts
Consider the input data stream as the Input Table. Every data item that is Consider the input data stream as the "Input Table". Every data item that is
arriving on the stream is like a new row being appended to the Input Table. arriving on the stream is like a new row being appended to the Input Table.
![Stream as a Table](img/structured-streaming-stream-as-a-table.png "Stream as a Table") ![Stream as a Table](img/structured-streaming-stream-as-a-table.png "Stream as a Table")
A query on the input will generate the Result Table. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink. A query on the input will generate the "Result Table". Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.
![Model](img/structured-streaming-model.png) ![Model](img/structured-streaming-model.png)
The Output is defined as what gets written out to the external storage. The output can be defined in different modes The "Output" is defined as what gets written out to the external storage. The output can be defined in different modes
- *Complete Mode* - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table. - *Complete Mode* - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.
...@@ -386,7 +386,7 @@ the final `wordCounts` DataFrame is the result table. Note that the query on ...@@ -386,7 +386,7 @@ the final `wordCounts` DataFrame is the result table. Note that the query on
streaming `lines` DataFrame to generate `wordCounts` is *exactly the same* as streaming `lines` DataFrame to generate `wordCounts` is *exactly the same* as
it would be a static DataFrame. However, when this query is started, Spark it would be a static DataFrame. However, when this query is started, Spark
will continuously check for new data from the socket connection. If there is will continuously check for new data from the socket connection. If there is
new data, Spark will run an incremental query that combines the previous new data, Spark will run an "incremental" query that combines the previous
running counts with the new data to compute updated counts, as shown below. running counts with the new data to compute updated counts, as shown below.
![Model](img/structured-streaming-example-model.png) ![Model](img/structured-streaming-example-model.png)
...@@ -682,8 +682,8 @@ Streaming DataFrames can be joined with static DataFrames to create new streamin ...@@ -682,8 +682,8 @@ Streaming DataFrames can be joined with static DataFrames to create new streamin
val staticDf = spark.read. ... val staticDf = spark.read. ...
val streamingDf = spark.readStream. ... val streamingDf = spark.readStream. ...
streamingDf.join(staticDf, type) // inner equi-join with a static DF streamingDf.join(staticDf, "type") // inner equi-join with a static DF
streamingDf.join(staticDf, type, right_join) // right outer join with a static DF streamingDf.join(staticDf, "type", "right_join") // right outer join with a static DF
{% endhighlight %} {% endhighlight %}
...@@ -789,7 +789,7 @@ Here is a table of all the sinks, and the corresponding settings. ...@@ -789,7 +789,7 @@ Here is a table of all the sinks, and the corresponding settings.
<tr> <tr>
<td><b>File Sink</b><br/>(only parquet in Spark 2.0)</td> <td><b>File Sink</b><br/>(only parquet in Spark 2.0)</td>
<td>Append</td> <td>Append</td>
<td><pre>writeStream<br/> .format(parquet)<br/> .start()</pre></td> <td><pre>writeStream<br/> .format("parquet")<br/> .start()</pre></td>
<td>Yes</td> <td>Yes</td>
<td>Supports writes to partitioned tables. Partitioning by time may be useful.</td> <td>Supports writes to partitioned tables. Partitioning by time may be useful.</td>
</tr> </tr>
...@@ -803,14 +803,14 @@ Here is a table of all the sinks, and the corresponding settings. ...@@ -803,14 +803,14 @@ Here is a table of all the sinks, and the corresponding settings.
<tr> <tr>
<td><b>Console Sink</b></td> <td><b>Console Sink</b></td>
<td>Append, Complete</td> <td>Append, Complete</td>
<td><pre>writeStream<br/> .format(console)<br/> .start()</pre></td> <td><pre>writeStream<br/> .format("console")<br/> .start()</pre></td>
<td>No</td> <td>No</td>
<td></td> <td></td>
</tr> </tr>
<tr> <tr>
<td><b>Memory Sink</b></td> <td><b>Memory Sink</b></td>
<td>Append, Complete</td> <td>Append, Complete</td>
<td><pre>writeStream<br/> .format(memory)<br/> .queryName(table)<br/> .start()</pre></td> <td><pre>writeStream<br/> .format("memory")<br/> .queryName("table")<br/> .start()</pre></td>
<td>No</td> <td>No</td>
<td>Saves the output data as a table, for interactive querying. Table name is the query name.</td> <td>Saves the output data as a table, for interactive querying. Table name is the query name.</td>
</tr> </tr>
...@@ -839,7 +839,7 @@ noAggDF ...@@ -839,7 +839,7 @@ noAggDF
.start() .start()
// ========== DF with aggregation ========== // ========== DF with aggregation ==========
val aggDF = df.groupBy(device).count() val aggDF = df.groupBy("device").count()
// Print updated aggregations to console // Print updated aggregations to console
aggDF aggDF
...@@ -879,7 +879,7 @@ noAggDF ...@@ -879,7 +879,7 @@ noAggDF
.start(); .start();
// ========== DF with aggregation ========== // ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy(device).count(); Dataset<Row> aggDF = df.groupBy("device").count();
// Print updated aggregations to console // Print updated aggregations to console
aggDF aggDF
...@@ -919,7 +919,7 @@ noAggDF\ ...@@ -919,7 +919,7 @@ noAggDF\
.start() .start()
# ========== DF with aggregation ========== # ========== DF with aggregation ==========
aggDF = df.groupBy(device).count() aggDF = df.groupBy("device").count()
# Print updated aggregations to console # Print updated aggregations to console
aggDF\ aggDF\
...@@ -1095,7 +1095,7 @@ In case of a failure or intentional shutdown, you can recover the previous progr ...@@ -1095,7 +1095,7 @@ In case of a failure or intentional shutdown, you can recover the previous progr
aggDF aggDF
.writeStream .writeStream
.outputMode("complete") .outputMode("complete")
.option(checkpointLocation, path/to/HDFS/dir) .option("checkpointLocation", "path/to/HDFS/dir")
.format("memory") .format("memory")
.start() .start()
{% endhighlight %} {% endhighlight %}
...@@ -1107,7 +1107,7 @@ aggDF ...@@ -1107,7 +1107,7 @@ aggDF
aggDF aggDF
.writeStream() .writeStream()
.outputMode("complete") .outputMode("complete")
.option(checkpointLocation, path/to/HDFS/dir) .option("checkpointLocation", "path/to/HDFS/dir")
.format("memory") .format("memory")
.start(); .start();
{% endhighlight %} {% endhighlight %}
...@@ -1119,7 +1119,7 @@ aggDF ...@@ -1119,7 +1119,7 @@ aggDF
aggDF\ aggDF\
.writeStream()\ .writeStream()\
.outputMode("complete")\ .outputMode("complete")\
.option(checkpointLocation, path/to/HDFS/dir)\ .option("checkpointLocation", "path/to/HDFS/dir")\
.format("memory")\ .format("memory")\
.start() .start()
{% endhighlight %} {% endhighlight %}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment