diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 8dfe8ff7028a351ed50df6c435005df98d5c24ed..db2134b02016741eebdbe1b29eab5bba365950ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -47,6 +47,7 @@ import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython +import org.apache.spark.sql.execution.streaming.StreamingRelation import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -449,6 +450,20 @@ class Dataset[T] private[sql]( */ def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation] + /** + * Returns true if this [[Dataset]] contains one or more sources that continuously + * return data as it arrives. A [[Dataset]] that reads data from a streaming source + * must be executed as a [[ContinuousQuery]] using the `startStream()` method in + * [[DataFrameWriter]]. Methods that return a single answer, (e.g., `count()` or + * `collect()`) will throw an [[AnalysisException]] when there is a streaming + * source present. + * + * @group basic + * @since 2.0.0 + */ + @Experimental + def isStreaming: Boolean = logicalPlan.find(_.isInstanceOf[StreamingRelation]).isDefined + /** * Displays the [[Dataset]] in a tabular form. Strings more than 20 characters will be truncated, * and all cells will be aligned right. For example: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 2aa90568c3b1ba5e341db1fe0bdf3544f21f5a00..e8e801084ffa7a5366867e7d746093716dd30656 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -23,6 +23,7 @@ import java.sql.{Date, Timestamp} import scala.language.postfixOps import org.apache.spark.sql.catalyst.encoders.OuterScopes +import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} @@ -602,6 +603,23 @@ class DatasetSuite extends QueryTest with SharedSQLContext { TupleClass(1, "a") ) } + + test("isStreaming returns false for static Dataset") { + val data = Seq(("a", 1), ("b", 2), ("c", 3)).toDS() + assert(!data.isStreaming, "static Dataset returned true for 'isStreaming'.") + } + + test("isStreaming returns true for streaming Dataset") { + val data = MemoryStream[Int].toDS() + assert(data.isStreaming, "streaming Dataset returned false for 'isStreaming'.") + } + + test("isStreaming returns true after static and streaming Dataset join") { + val static = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("a", "b") + val streaming = MemoryStream[Int].toDS().toDF("b") + val df = streaming.join(static, Seq("b")) + assert(df.isStreaming, "streaming Dataset returned false for 'isStreaming'.") + } } case class OtherTuple(_1: String, _2: Int)