diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 8c14c3d220a23a716eb2abc1fa5eeccf83dca5c6..99d50e51e2af3919b818b873854b8de3f0454aa2 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -46,9 +46,9 @@ import java.util.Arrays; import java.util.Iterator; SparkSession spark = SparkSession - .builder() - .appName("JavaStructuredNetworkWordCount") - .getOrCreate(); + .builder() + .appName("JavaStructuredNetworkWordCount") + .getOrCreate(); {% endhighlight %} </div> @@ -95,7 +95,7 @@ This `lines` DataFrame represents an unbounded table containing the streaming te {% highlight java %} // Create DataFrame representing the stream of input lines from connection to localhost:9999 -Dataset<String> lines = spark +Dataset<Row> lines = spark .readStream() .format("socket") .option("host", "localhost") @@ -104,14 +104,14 @@ Dataset<String> lines = spark // Split the lines into words Dataset<String> words = lines - .as(Encoders.STRING()) - .flatMap( - new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(x.split(" ")).iterator(); - } - }, Encoders.STRING()); + .as(Encoders.STRING()) + .flatMap( + new FlatMapFunction<String, String>() { + @Override + public Iterator<String> call(String x) { + return Arrays.asList(x.split(" ")).iterator(); + } + }, Encoders.STRING()); // Generate running word count Dataset<Row> wordCounts = words.groupBy("value").count(); @@ -125,11 +125,11 @@ This `lines` DataFrame represents an unbounded table containing the streaming te {% highlight python %} # Create DataFrame representing the stream of input lines from connection to localhost:9999 lines = spark\ - .readStream\ - .format('socket')\ - .option('host', 'localhost')\ - .option('port', 9999)\ - .load() + .readStream\ + .format('socket')\ + .option('host', 'localhost')\ + .option('port', 9999)\ + .load() # Split the lines into words words = lines.select( @@ -434,11 +434,11 @@ val spark: SparkSession = ... // Read text from socket val socketDF = spark - .readStream - .format("socket") - .option("host", "localhost") - .option("port", 9999) - .load() + .readStream + .format("socket") + .option("host", "localhost") + .option("port", 9999) + .load() socketDF.isStreaming // Returns True for DataFrames that have streaming sources @@ -447,10 +447,10 @@ socketDF.printSchema // Read all the csv files written atomically in a directory val userSchema = new StructType().add("name", "string").add("age", "integer") val csvDF = spark - .readStream - .option("sep", ";") - .schema(userSchema) // Specify schema of the csv files - .csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory") + .readStream + .option("sep", ";") + .schema(userSchema) // Specify schema of the csv files + .csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} </div> @@ -461,11 +461,11 @@ SparkSession spark = ... // Read text from socket Dataset[Row] socketDF = spark - .readStream() - .format("socket") - .option("host", "localhost") - .option("port", 9999) - .load(); + .readStream() + .format("socket") + .option("host", "localhost") + .option("port", 9999) + .load(); socketDF.isStreaming(); // Returns True for DataFrames that have streaming sources @@ -474,10 +474,10 @@ socketDF.printSchema(); // Read all the csv files written atomically in a directory StructType userSchema = new StructType().add("name", "string").add("age", "integer"); Dataset[Row] csvDF = spark - .readStream() - .option("sep", ";") - .schema(userSchema) // Specify schema of the csv files - .csv("/path/to/directory"); // Equivalent to format("csv").load("/path/to/directory") + .readStream() + .option("sep", ";") + .schema(userSchema) // Specify schema of the csv files + .csv("/path/to/directory"); // Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} </div> @@ -549,12 +549,12 @@ import org.apache.spark.sql.expressions.javalang.typed; import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; public class DeviceData { - private String device; - private String type; - private Double signal; - private java.sql.Date time; - ... - // Getter and setter methods for each field + private String device; + private String type; + private Double signal; + private java.sql.Date time; + ... + // Getter and setter methods for each field } Dataset<Row> df = ...; // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType } @@ -828,33 +828,33 @@ val noAggDF = deviceDataDf.select("device").where("signal > 10") // Print new data to console noAggDF - .writeStream - .format("console") - .start() + .writeStream + .format("console") + .start() // Write new data to Parquet files noAggDF - .writeStream - .parquet("path/to/destination/directory") - .start() + .writeStream + .parquet("path/to/destination/directory") + .start() // ========== DF with aggregation ========== val aggDF = df.groupBy(“deviceâ€).count() // Print updated aggregations to console aggDF - .writeStream - .outputMode("complete") - .format("console") - .start() + .writeStream + .outputMode("complete") + .format("console") + .start() // Have all the aggregates in an in-memory table aggDF - .writeStream - .queryName("aggregates") // this query name will be the table name - .outputMode("complete") - .format("memory") - .start() + .writeStream + .queryName("aggregates") // this query name will be the table name + .outputMode("complete") + .format("memory") + .start() spark.sql("select * from aggregates").show() // interactively query in-memory table {% endhighlight %} @@ -868,33 +868,33 @@ Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10"); // Print new data to console noAggDF - .writeStream() - .format("console") - .start(); + .writeStream() + .format("console") + .start(); // Write new data to Parquet files noAggDF - .writeStream() - .parquet("path/to/destination/directory") - .start(); + .writeStream() + .parquet("path/to/destination/directory") + .start(); // ========== DF with aggregation ========== Dataset<Row> aggDF = df.groupBy(“deviceâ€).count(); // Print updated aggregations to console aggDF - .writeStream() - .outputMode("complete") - .format("console") - .start(); + .writeStream() + .outputMode("complete") + .format("console") + .start(); // Have all the aggregates in an in-memory table aggDF - .writeStream() - .queryName("aggregates") // this query name will be the table name - .outputMode("complete") - .format("memory") - .start(); + .writeStream() + .queryName("aggregates") // this query name will be the table name + .outputMode("complete") + .format("memory") + .start(); spark.sql("select * from aggregates").show(); // interactively query in-memory table {% endhighlight %} @@ -908,33 +908,33 @@ noAggDF = deviceDataDf.select("device").where("signal > 10") # Print new data to console noAggDF\ - .writeStream()\ - .format("console")\ - .start() + .writeStream()\ + .format("console")\ + .start() # Write new data to Parquet files noAggDF\ - .writeStream()\ - .parquet("path/to/destination/directory")\ - .start() + .writeStream()\ + .parquet("path/to/destination/directory")\ + .start() # ========== DF with aggregation ========== aggDF = df.groupBy(“deviceâ€).count() # Print updated aggregations to console aggDF\ - .writeStream()\ - .outputMode("complete")\ - .format("console")\ - .start() + .writeStream()\ + .outputMode("complete")\ + .format("console")\ + .start() # Have all the aggregates in an in memory table. The query name will be the table name aggDF\ - .writeStream()\ - .queryName("aggregates")\ - .outputMode("complete")\ - .format("memory")\ - .start() + .writeStream()\ + .queryName("aggregates")\ + .outputMode("complete")\ + .format("memory")\ + .start() spark.sql("select * from aggregates").show() # interactively query in-memory table {% endhighlight %} @@ -1093,11 +1093,11 @@ In case of a failure or intentional shutdown, you can recover the previous progr {% highlight scala %} aggDF - .writeStream - .outputMode("complete") - .option(“checkpointLocationâ€, “path/to/HDFS/dirâ€) - .format("memory") - .start() + .writeStream + .outputMode("complete") + .option(“checkpointLocationâ€, “path/to/HDFS/dirâ€) + .format("memory") + .start() {% endhighlight %} </div> @@ -1105,11 +1105,11 @@ aggDF {% highlight java %} aggDF - .writeStream() - .outputMode("complete") - .option(“checkpointLocationâ€, “path/to/HDFS/dirâ€) - .format("memory") - .start(); + .writeStream() + .outputMode("complete") + .option(“checkpointLocationâ€, “path/to/HDFS/dirâ€) + .format("memory") + .start(); {% endhighlight %} </div> @@ -1117,11 +1117,11 @@ aggDF {% highlight python %} aggDF\ - .writeStream()\ - .outputMode("complete")\ - .option(“checkpointLocationâ€, “path/to/HDFS/dirâ€)\ - .format("memory")\ - .start() + .writeStream()\ + .outputMode("complete")\ + .option(“checkpointLocationâ€, “path/to/HDFS/dirâ€)\ + .format("memory")\ + .start() {% endhighlight %} </div> diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java index 346d2182c70b0731213aaad68baa046fbe0e1283..c913ee065850450dbdcbb83f84cba7d61b832a45 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java @@ -53,15 +53,15 @@ public final class JavaStructuredNetworkWordCount { .getOrCreate(); // Create DataFrame representing the stream of input lines from connection to host:port - Dataset<String> lines = spark + Dataset<Row> lines = spark .readStream() .format("socket") .option("host", host) .option("port", port) - .load().as(Encoders.STRING()); + .load(); // Split the lines into words - Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + Dataset<String> words = lines.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String x) { return Arrays.asList(x.split(" ")).iterator(); diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java index 557d36cff30d74e4c983344a626a914772c60a85..172d053c29a1f3ee3a3665c9719b7b003d2e4b12 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java @@ -75,28 +75,30 @@ public final class JavaStructuredNetworkWordCountWindowed { .getOrCreate(); // Create DataFrame representing the stream of input lines from connection to host:port - Dataset<Tuple2<String, Timestamp>> lines = spark + Dataset<Row> lines = spark .readStream() .format("socket") .option("host", host) .option("port", port) .option("includeTimestamp", true) - .load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())); + .load(); // Split the lines into words, retaining timestamps - Dataset<Row> words = lines.flatMap( - new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() { - @Override - public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) { - List<Tuple2<String, Timestamp>> result = new ArrayList<>(); - for (String word : t._1.split(" ")) { - result.add(new Tuple2<>(word, t._2)); + Dataset<Row> words = lines + .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())) + .flatMap( + new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() { + @Override + public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) { + List<Tuple2<String, Timestamp>> result = new ArrayList<>(); + for (String word : t._1.split(" ")) { + result.add(new Tuple2<>(word, t._2)); + } + return result.iterator(); } - return result.iterator(); - } - }, - Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()) - ).toDF("word", "timestamp"); + }, + Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()) + ).toDF("word", "timestamp"); // Group the data by window and word and compute the count of each group Dataset<Row> windowedCounts = words.groupBy( diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala index 364bff227bc55ad60d741ce55341f9d0ca45ac88..f0756c4e183c9f141078a1f9e783ce2093ec6946 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala @@ -56,10 +56,10 @@ object StructuredNetworkWordCount { .format("socket") .option("host", host) .option("port", port) - .load().as[String] + .load() // Split the lines into words - val words = lines.flatMap(_.split(" ")) + val words = lines.as[String].flatMap(_.split(" ")) // Generate running word count val wordCounts = words.groupBy("value").count() diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala index 333b0a9d24f40f658363b1478c80ec1a6b1a63e6..b4dad21dd75b0049bcf08b667df77f8aa2b88b2a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala @@ -78,10 +78,10 @@ object StructuredNetworkWordCountWindowed { .option("host", host) .option("port", port) .option("includeTimestamp", true) - .load().as[(String, Timestamp)] + .load() // Split the lines into words, retaining timestamps - val words = lines.flatMap(line => + val words = lines.as[(String, Timestamp)].flatMap(line => line._1.split(" ").map(word => (word, line._2)) ).toDF("word", "timestamp")