diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java index a2cf9389543e8fa376323c7e6e0aba5bf5aa4cfc..346d2182c70b0731213aaad68baa046fbe0e1283 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java @@ -24,7 +24,7 @@ import java.util.Arrays; import java.util.Iterator; /** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Counts words in UTF8 encoded, '\n' delimited text received from the network. * * Usage: JavaStructuredNetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Structured Streaming @@ -40,7 +40,7 @@ public final class JavaStructuredNetworkWordCount { public static void main(String[] args) throws Exception { if (args.length < 2) { - System.err.println("Usage: JavaNetworkWordCount <hostname> <port>"); + System.err.println("Usage: JavaStructuredNetworkWordCount <hostname> <port>"); System.exit(1); } diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java new file mode 100644 index 0000000000000000000000000000000000000000..557d36cff30d74e4c983344a626a914772c60a85 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql.streaming; + +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.streaming.StreamingQuery; +import scala.Tuple2; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network over a + * sliding window of configurable duration. Each line from the network is tagged + * with a timestamp that is used to determine the windows into which it falls. + * + * Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port> <window duration> + * [<slide duration>] + * <hostname> and <port> describe the TCP server that Structured Streaming + * would connect to receive data. + * <window duration> gives the size of window, specified as integer number of seconds + * <slide duration> gives the amount of time successive windows are offset from one another, + * given in the same units as above. <slide duration> should be less than or equal to + * <window duration>. If the two are equal, successive windows have no overlap. If + * <slide duration> is not provided, it defaults to <window duration>. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCountWindowed + * localhost 9999 <window duration in seconds> [<slide duration in seconds>]` + * + * One recommended <window duration>, <slide duration> pair is 10, 5 + */ +public final class JavaStructuredNetworkWordCountWindowed { + + public static void main(String[] args) throws Exception { + if (args.length < 3) { + System.err.println("Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port>" + + " <window duration in seconds> [<slide duration in seconds>]"); + System.exit(1); + } + + String host = args[0]; + int port = Integer.parseInt(args[1]); + int windowSize = Integer.parseInt(args[2]); + int slideSize = (args.length == 3) ? windowSize : Integer.parseInt(args[3]); + if (slideSize > windowSize) { + System.err.println("<slide duration> must be less than or equal to <window duration>"); + } + String windowDuration = windowSize + " seconds"; + String slideDuration = slideSize + " seconds"; + + SparkSession spark = SparkSession + .builder() + .appName("JavaStructuredNetworkWordCountWindowed") + .getOrCreate(); + + // Create DataFrame representing the stream of input lines from connection to host:port + Dataset<Tuple2<String, Timestamp>> lines = spark + .readStream() + .format("socket") + .option("host", host) + .option("port", port) + .option("includeTimestamp", true) + .load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())); + + // Split the lines into words, retaining timestamps + Dataset<Row> words = lines.flatMap( + new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() { + @Override + public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) { + List<Tuple2<String, Timestamp>> result = new ArrayList<>(); + for (String word : t._1.split(" ")) { + result.add(new Tuple2<>(word, t._2)); + } + return result.iterator(); + } + }, + Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()) + ).toDF("word", "timestamp"); + + // Group the data by window and word and compute the count of each group + Dataset<Row> windowedCounts = words.groupBy( + functions.window(words.col("timestamp"), windowDuration, slideDuration), + words.col("word") + ).count().orderBy("window"); + + // Start running the query that prints the windowed word counts to the console + StreamingQuery query = windowedCounts.writeStream() + .outputMode("complete") + .format("console") + .option("truncate", "false") + .start(); + + query.awaitTermination(); + } +} diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount.py b/examples/src/main/python/sql/streaming/structured_network_wordcount.py index 32d63c52c9191a20de865e944eaf118ee6448b94..afde2550587ca1fb35dac1a3d9329d17e5e5898f 100644 --- a/examples/src/main/python/sql/streaming/structured_network_wordcount.py +++ b/examples/src/main/python/sql/streaming/structured_network_wordcount.py @@ -16,7 +16,7 @@ # """ - Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + Counts words in UTF8 encoded, '\n' delimited text received from the network. Usage: structured_network_wordcount.py <hostname> <port> <hostname> and <port> describe the TCP server that Structured Streaming would connect to receive data. @@ -58,6 +58,7 @@ if __name__ == "__main__": # Split the lines into words words = lines.select( + # explode turns each item in an array into a separate row explode( split(lines.value, ' ') ).alias('word') diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py b/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py new file mode 100644 index 0000000000000000000000000000000000000000..02a7d3363d7801e9271afaca0117e715793b226f --- /dev/null +++ b/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + Counts words in UTF8 encoded, '\n' delimited text received from the network over a + sliding window of configurable duration. Each line from the network is tagged + with a timestamp that is used to determine the windows into which it falls. + + Usage: structured_network_wordcount_windowed.py <hostname> <port> <window duration> + [<slide duration>] + <hostname> and <port> describe the TCP server that Structured Streaming + would connect to receive data. + <window duration> gives the size of window, specified as integer number of seconds + <slide duration> gives the amount of time successive windows are offset from one another, + given in the same units as above. <slide duration> should be less than or equal to + <window duration>. If the two are equal, successive windows have no overlap. If + <slide duration> is not provided, it defaults to <window duration>. + + To run this on your local machine, you need to first run a Netcat server + `$ nc -lk 9999` + and then run the example + `$ bin/spark-submit + examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py + localhost 9999 <window duration> [<slide duration>]` + + One recommended <window duration>, <slide duration> pair is 10, 5 +""" +from __future__ import print_function + +import sys + +from pyspark.sql import SparkSession +from pyspark.sql.functions import explode +from pyspark.sql.functions import split +from pyspark.sql.functions import window + +if __name__ == "__main__": + if len(sys.argv) != 5 and len(sys.argv) != 4: + msg = ("Usage: structured_network_wordcount_windowed.py <hostname> <port> " + "<window duration in seconds> [<slide duration in seconds>]") + print(msg, file=sys.stderr) + exit(-1) + + host = sys.argv[1] + port = int(sys.argv[2]) + windowSize = int(sys.argv[3]) + slideSize = int(sys.argv[4]) if (len(sys.argv) == 5) else windowSize + if slideSize > windowSize: + print("<slide duration> must be less than or equal to <window duration>", file=sys.stderr) + windowDuration = '{} seconds'.format(windowSize) + slideDuration = '{} seconds'.format(slideSize) + + spark = SparkSession\ + .builder\ + .appName("StructuredNetworkWordCountWindowed")\ + .getOrCreate() + + # Create DataFrame representing the stream of input lines from connection to host:port + lines = spark\ + .readStream\ + .format('socket')\ + .option('host', host)\ + .option('port', port)\ + .option('includeTimestamp', 'true')\ + .load() + + # Split the lines into words, retaining timestamps + # split() splits each line into an array, and explode() turns the array into multiple rows + words = lines.select( + explode(split(lines.value, ' ')).alias('word'), + lines.timestamp + ) + + # Group the data by window and word and compute the count of each group + windowedCounts = words.groupBy( + window(words.timestamp, windowDuration, slideDuration), + words.word + ).count().orderBy('window') + + # Start running the query that prints the windowed word counts to the console + query = windowedCounts\ + .writeStream\ + .outputMode('complete')\ + .format('console')\ + .option('truncate', 'false')\ + .start() + + query.awaitTermination() diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala index 433f7a181bbf85e6759bc358eef4600e15d2a19f..364bff227bc55ad60d741ce55341f9d0ca45ac88 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession /** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Counts words in UTF8 encoded, '\n' delimited text received from the network. * * Usage: StructuredNetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Structured Streaming diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala new file mode 100644 index 0000000000000000000000000000000000000000..333b0a9d24f40f658363b1478c80ec1a6b1a63e6 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.sql.streaming + +import java.sql.Timestamp + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network over a + * sliding window of configurable duration. Each line from the network is tagged + * with a timestamp that is used to determine the windows into which it falls. + * + * Usage: StructuredNetworkWordCountWindowed <hostname> <port> <window duration> + * [<slide duration>] + * <hostname> and <port> describe the TCP server that Structured Streaming + * would connect to receive data. + * <window duration> gives the size of window, specified as integer number of seconds + * <slide duration> gives the amount of time successive windows are offset from one another, + * given in the same units as above. <slide duration> should be less than or equal to + * <window duration>. If the two are equal, successive windows have no overlap. If + * <slide duration> is not provided, it defaults to <window duration>. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed + * localhost 9999 <window duration in seconds> [<slide duration in seconds>]` + * + * One recommended <window duration>, <slide duration> pair is 10, 5 + */ +object StructuredNetworkWordCountWindowed { + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" + + " <window duration in seconds> [<slide duration in seconds>]") + System.exit(1) + } + + val host = args(0) + val port = args(1).toInt + val windowSize = args(2).toInt + val slideSize = if (args.length == 3) windowSize else args(3).toInt + if (slideSize > windowSize) { + System.err.println("<slide duration> must be less than or equal to <window duration>") + } + val windowDuration = s"$windowSize seconds" + val slideDuration = s"$slideSize seconds" + + val spark = SparkSession + .builder + .appName("StructuredNetworkWordCountWindowed") + .getOrCreate() + + import spark.implicits._ + + // Create DataFrame representing the stream of input lines from connection to host:port + val lines = spark.readStream + .format("socket") + .option("host", host) + .option("port", port) + .option("includeTimestamp", true) + .load().as[(String, Timestamp)] + + // Split the lines into words, retaining timestamps + val words = lines.flatMap(line => + line._1.split(" ").map(word => (word, line._2)) + ).toDF("word", "timestamp") + + // Group the data by window and word and compute the count of each group + val windowedCounts = words.groupBy( + window($"timestamp", windowDuration, slideDuration), $"word" + ).count().orderBy("window") + + // Start running the query that prints the windowed word counts to the console + val query = windowedCounts.writeStream + .outputMode("complete") + .format("console") + .option("truncate", "false") + .start() + + query.awaitTermination() + } +} +// scalastyle:on println diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index d07d88dcdcc44114a4b95a0dc88c41e79ef9a426..fb15239f9af98c81fb82c8dffc35b7015d27db2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -19,17 +19,24 @@ package org.apache.spark.sql.execution.streaming import java.io.{BufferedReader, InputStreamReader, IOException} import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} object TextSocketSource { - val SCHEMA = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: + StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") } /** @@ -37,7 +44,7 @@ object TextSocketSource { * This source will *not* work in production applications due to multiple reasons, including no * support for fault recovery and keeping all of the text read in memory forever. */ -class TextSocketSource(host: String, port: Int, sqlContext: SQLContext) +class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext) extends Source with Logging { @GuardedBy("this") @@ -47,7 +54,7 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext) private var readThread: Thread = null @GuardedBy("this") - private var lines = new ArrayBuffer[String] + private var lines = new ArrayBuffer[(String, Timestamp)] initialize() @@ -67,7 +74,10 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext) return } TextSocketSource.this.synchronized { - lines += line + lines += ((line, + Timestamp.valueOf( + TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime())) + )) } } } catch { @@ -79,7 +89,8 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext) } /** Returns the schema of the data from this source */ - override def schema: StructType = TextSocketSource.SCHEMA + override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP + else TextSocketSource.SCHEMA_REGULAR /** Returns the maximum available offset for this source. */ override def getOffset: Option[Offset] = synchronized { @@ -92,7 +103,11 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext) val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1 val data = synchronized { lines.slice(startIdx, endIdx) } import sqlContext.implicits._ - data.toDF("value") + if (includeTimestamp) { + data.toDF("value", "timestamp") + } else { + data.map(_._1).toDF("value") + } } /** Stop this source. */ @@ -111,6 +126,14 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext) } class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging { + private def parseIncludeTimestamp(params: Map[String, String]): Boolean = { + Try(params.getOrElse("includeTimestamp", "false").toBoolean) match { + case Success(bool) => bool + case Failure(_) => + throw new AnalysisException("includeTimestamp must be set to either \"true\" or \"false\"") + } + } + /** Returns the name and schema of the source that can be used to continually read data. */ override def sourceSchema( sqlContext: SQLContext, @@ -125,7 +148,13 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis if (!parameters.contains("port")) { throw new AnalysisException("Set a port to read from with option(\"port\", ...).") } - ("textSocket", TextSocketSource.SCHEMA) + val schema = + if (parseIncludeTimestamp(parameters)) { + TextSocketSource.SCHEMA_TIMESTAMP + } else { + TextSocketSource.SCHEMA_REGULAR + } + ("textSocket", schema) } override def createSource( @@ -136,7 +165,7 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis parameters: Map[String, String]): Source = { val host = parameters("host") val port = parameters("port").toInt - new TextSocketSource(host, port, sqlContext) + new TextSocketSource(host, port, parseIncludeTimestamp(parameters), sqlContext) } /** String that represents the format that this data source provider uses. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala index ca577631854efb86ba76de2c1b0dff96f63dffe7..6b0ba7acb4804796b488ee73d49b5d673a5db48c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming import java.io.{IOException, OutputStreamWriter} import java.net.ServerSocket +import java.sql.Timestamp import java.util.concurrent.LinkedBlockingQueue import org.scalatest.BeforeAndAfterEach @@ -27,7 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.streaming.StreamTest import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { import testImplicits._ @@ -85,6 +86,47 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before } } + test("timestamped usage") { + serverThread = new ServerThread() + serverThread.start() + + val provider = new TextSocketSourceProvider + val parameters = Map("host" -> "localhost", "port" -> serverThread.port.toString, + "includeTimestamp" -> "true") + val schema = provider.sourceSchema(sqlContext, None, "", parameters)._2 + assert(schema === StructType(StructField("value", StringType) :: + StructField("timestamp", TimestampType) :: Nil)) + + source = provider.createSource(sqlContext, "", None, "", parameters) + + failAfter(streamingTimeout) { + serverThread.enqueue("hello") + while (source.getOffset.isEmpty) { + Thread.sleep(10) + } + val offset1 = source.getOffset.get + val batch1 = source.getBatch(None, offset1) + val batch1Seq = batch1.as[(String, Timestamp)].collect().toSeq + assert(batch1Seq.map(_._1) === Seq("hello")) + val batch1Stamp = batch1Seq(0)._2 + + serverThread.enqueue("world") + while (source.getOffset.get === offset1) { + Thread.sleep(10) + } + val offset2 = source.getOffset.get + val batch2 = source.getBatch(Some(offset1), offset2) + val batch2Seq = batch2.as[(String, Timestamp)].collect().toSeq + assert(batch2Seq.map(_._1) === Seq("world")) + val batch2Stamp = batch2Seq(0)._2 + assert(!batch2Stamp.before(batch1Stamp)) + + // Try stopping the source to make sure this does not block forever. + source.stop() + source = null + } + } + test("params not given") { val provider = new TextSocketSourceProvider intercept[AnalysisException] { @@ -98,6 +140,14 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before } } + test("non-boolean includeTimestamp") { + val provider = new TextSocketSourceProvider + intercept[AnalysisException] { + provider.sourceSchema(sqlContext, None, "", Map("host" -> "localhost", + "port" -> "1234", "includeTimestamp" -> "fasle")) + } + } + test("no server up") { val provider = new TextSocketSourceProvider val parameters = Map("host" -> "localhost", "port" -> "0")