Skip to content
Snippets Groups Projects
Commit e29a74d5 authored by uncleGen's avatar uncleGen Committed by Sean Owen
Browse files

[DOCS][SS] fix structured streaming python example

## What changes were proposed in this pull request?

- SS python example: `TypeError: 'xxx' object is not callable`
- some other doc issue.

## How was this patch tested?

Jenkins.

Author: uncleGen <hustyugm@gmail.com>

Closes #17257 from uncleGen/docs-ss-python.
parent f6fdf92d
No related branches found
No related tags found
No related merge requests found
...@@ -539,7 +539,7 @@ spark = SparkSession. ... ...@@ -539,7 +539,7 @@ spark = SparkSession. ...
# Read text from socket # Read text from socket
socketDF = spark \ socketDF = spark \
.readStream() \ .readStream \
.format("socket") \ .format("socket") \
.option("host", "localhost") \ .option("host", "localhost") \
.option("port", 9999) \ .option("port", 9999) \
...@@ -552,7 +552,7 @@ socketDF.printSchema() ...@@ -552,7 +552,7 @@ socketDF.printSchema()
# Read all the csv files written atomically in a directory # Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer") userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \ csvDF = spark \
.readStream() \ .readStream \
.option("sep", ";") \ .option("sep", ";") \
.schema(userSchema) \ .schema(userSchema) \
.csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory") .csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
...@@ -971,7 +971,7 @@ Here is the compatibility matrix. ...@@ -971,7 +971,7 @@ Here is the compatibility matrix.
<br/><br/> <br/><br/>
Update mode uses watermark to drop old aggregation state. Update mode uses watermark to drop old aggregation state.
<br/><br/> <br/><br/>
Complete mode does drop not old aggregation state since by definition this mode Complete mode does not drop old aggregation state since by definition this mode
preserves all data in the Result Table. preserves all data in the Result Table.
</td> </td>
</tr> </tr>
...@@ -1201,13 +1201,13 @@ noAggDF = deviceDataDf.select("device").where("signal > 10") ...@@ -1201,13 +1201,13 @@ noAggDF = deviceDataDf.select("device").where("signal > 10")
# Print new data to console # Print new data to console
noAggDF \ noAggDF \
.writeStream() \ .writeStream \
.format("console") \ .format("console") \
.start() .start()
# Write new data to Parquet files # Write new data to Parquet files
noAggDF \ noAggDF \
.writeStream() \ .writeStream \
.format("parquet") \ .format("parquet") \
.option("checkpointLocation", "path/to/checkpoint/dir") \ .option("checkpointLocation", "path/to/checkpoint/dir") \
.option("path", "path/to/destination/dir") \ .option("path", "path/to/destination/dir") \
...@@ -1218,14 +1218,14 @@ aggDF = df.groupBy("device").count() ...@@ -1218,14 +1218,14 @@ aggDF = df.groupBy("device").count()
# Print updated aggregations to console # Print updated aggregations to console
aggDF \ aggDF \
.writeStream() \ .writeStream \
.outputMode("complete") \ .outputMode("complete") \
.format("console") \ .format("console") \
.start() .start()
# Have all the aggregates in an in memory table. The query name will be the table name # Have all the aggregates in an in memory table. The query name will be the table name
aggDF \ aggDF \
.writeStream() \ .writeStream \
.queryName("aggregates") \ .queryName("aggregates") \
.outputMode("complete") \ .outputMode("complete") \
.format("memory") \ .format("memory") \
...@@ -1313,7 +1313,7 @@ query.lastProgress(); // the most recent progress update of this streaming qu ...@@ -1313,7 +1313,7 @@ query.lastProgress(); // the most recent progress update of this streaming qu
<div data-lang="python" markdown="1"> <div data-lang="python" markdown="1">
{% highlight python %} {% highlight python %}
query = df.writeStream().format("console").start() # get the query object query = df.writeStream.format("console").start() # get the query object
query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data
...@@ -1658,7 +1658,7 @@ aggDF ...@@ -1658,7 +1658,7 @@ aggDF
{% highlight python %} {% highlight python %}
aggDF \ aggDF \
.writeStream() \ .writeStream \
.outputMode("complete") \ .outputMode("complete") \
.option("checkpointLocation", "path/to/HDFS/dir") \ .option("checkpointLocation", "path/to/HDFS/dir") \
.format("memory") \ .format("memory") \
......
...@@ -97,7 +97,7 @@ class FileStreamSource( ...@@ -97,7 +97,7 @@ class FileStreamSource(
} }
seenFiles.purge() seenFiles.purge()
logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = $maxFileAgeMs") logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
/** /**
* Returns the maximum offset that can be retrieved from the source. * Returns the maximum offset that can be retrieved from the source.
......
...@@ -230,7 +230,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( ...@@ -230,7 +230,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* - It must pass the user-provided file filter. * - It must pass the user-provided file filter.
* - It must be newer than the ignore threshold. It is assumed that files older than the ignore * - It must be newer than the ignore threshold. It is assumed that files older than the ignore
* threshold have already been considered or are existing files before start * threshold have already been considered or are existing files before start
* (when newFileOnly = true). * (when newFilesOnly = true).
* - It must not be present in the recently selected files that this class remembers. * - It must not be present in the recently selected files that this class remembers.
* - It must not be newer than the time of the batch (i.e. `currentTime` for which this * - It must not be newer than the time of the batch (i.e. `currentTime` for which this
* file is being tested. This can occur if the driver was recovered, and the missing batches * file is being tested. This can occur if the driver was recovered, and the missing batches
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment