Skip to content
Snippets Groups Projects
Commit ba24d1ee authored by Burak Yavuz's avatar Burak Yavuz Committed by Michael Armbrust
Browse files

[SPARK-14287] isStreaming method for Dataset

With the addition of StreamExecution (ContinuousQuery) to Datasets, data will become unbounded. With unbounded data, the execution of some methods and operations will not make sense, e.g. `Dataset.count()`.

A simple API is required to check whether the data in a Dataset is bounded or unbounded. This will allow users to check whether their Dataset is in streaming mode or not. ML algorithms may check if the data is unbounded and throw an exception for example.

The implementation of this method is simple, however naming it is the challenge. Some possible names for this method are:
 - isStreaming
 - isContinuous
 - isBounded
 - isUnbounded

I've gone with `isStreaming` for now. We can change it before Spark 2.0 if we decide to come up with a different name. For that reason I've marked it as `Experimental`

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #12080 from brkyvz/is-streaming.
parent 7201f033
No related branches found
No related tags found
No related merge requests found
......@@ -47,6 +47,7 @@ import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
......@@ -449,6 +450,20 @@ class Dataset[T] private[sql](
*/
def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation]
/**
* Returns true if this [[Dataset]] contains one or more sources that continuously
* return data as it arrives. A [[Dataset]] that reads data from a streaming source
* must be executed as a [[ContinuousQuery]] using the `startStream()` method in
* [[DataFrameWriter]]. Methods that return a single answer, (e.g., `count()` or
* `collect()`) will throw an [[AnalysisException]] when there is a streaming
* source present.
*
* @group basic
* @since 2.0.0
*/
@Experimental
def isStreaming: Boolean = logicalPlan.find(_.isInstanceOf[StreamingRelation]).isDefined
/**
* Displays the [[Dataset]] in a tabular form. Strings more than 20 characters will be truncated,
* and all cells will be aligned right. For example:
......
......@@ -23,6 +23,7 @@ import java.sql.{Date, Timestamp}
import scala.language.postfixOps
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
......@@ -602,6 +603,23 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
TupleClass(1, "a")
)
}
test("isStreaming returns false for static Dataset") {
val data = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
assert(!data.isStreaming, "static Dataset returned true for 'isStreaming'.")
}
test("isStreaming returns true for streaming Dataset") {
val data = MemoryStream[Int].toDS()
assert(data.isStreaming, "streaming Dataset returned false for 'isStreaming'.")
}
test("isStreaming returns true after static and streaming Dataset join") {
val static = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("a", "b")
val streaming = MemoryStream[Int].toDS().toDF("b")
val df = streaming.join(static, Seq("b"))
assert(df.isStreaming, "streaming Dataset returned false for 'isStreaming'.")
}
}
case class OtherTuple(_1: String, _2: Int)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment