Skip to content
Snippets Groups Projects
Commit 79accf45 authored by Reynold Xin's avatar Reynold Xin Committed by Shixiong Zhu
Browse files

[SPARK-17798][SQL] Remove redundant Experimental annotations in sql.streaming

## What changes were proposed in this pull request?
I was looking through API annotations to catch mislabeled APIs, and realized DataStreamReader and DataStreamWriter classes are already annotated as Experimental, and as a result there is no need to annotate each method within them.

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #15373 from rxin/SPARK-17798.
parent 92b7e572
No related branches found
No related tags found
No related merge requests found
......@@ -35,89 +35,73 @@ import org.apache.spark.sql.types.StructType
@Experimental
final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {
/**
* :: Experimental ::
* Specifies the input data source format.
*
* @since 2.0.0
*/
@Experimental
def format(source: String): DataStreamReader = {
this.source = source
this
}
/**
* :: Experimental ::
* Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
* automatically from data. By specifying the schema here, the underlying data source can
* skip the schema inference step, and thus speed up data loading.
*
* @since 2.0.0
*/
@Experimental
def schema(schema: StructType): DataStreamReader = {
this.userSpecifiedSchema = Option(schema)
this
}
/**
* :: Experimental ::
* Adds an input option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: String): DataStreamReader = {
this.extraOptions += (key -> value)
this
}
/**
* :: Experimental ::
* Adds an input option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: Boolean): DataStreamReader = option(key, value.toString)
/**
* :: Experimental ::
* Adds an input option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: Long): DataStreamReader = option(key, value.toString)
/**
* :: Experimental ::
* Adds an input option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: Double): DataStreamReader = option(key, value.toString)
/**
* :: Experimental ::
* (Scala-specific) Adds input options for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def options(options: scala.collection.Map[String, String]): DataStreamReader = {
this.extraOptions ++= options
this
}
/**
* :: Experimental ::
* Adds input options for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def options(options: java.util.Map[String, String]): DataStreamReader = {
this.options(options.asScala)
this
......@@ -125,13 +109,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
/**
* :: Experimental ::
* Loads input data stream in as a [[DataFrame]], for data streams that don't require a path
* (e.g. external key-value stores).
*
* @since 2.0.0
*/
@Experimental
def load(): DataFrame = {
val dataSource =
DataSource(
......@@ -143,18 +125,15 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
}
/**
* :: Experimental ::
* Loads input in as a [[DataFrame]], for data streams that read from some path.
*
* @since 2.0.0
*/
@Experimental
def load(path: String): DataFrame = {
option("path", path).load()
}
/**
* :: Experimental ::
* Loads a JSON file stream (one object per line) and returns the result as a [[DataFrame]].
*
* This function goes through the input once to determine the input schema. If you know the
......@@ -198,11 +177,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*
* @since 2.0.0
*/
@Experimental
def json(path: String): DataFrame = format("json").load(path)
/**
* :: Experimental ::
* Loads a CSV file stream and returns the result as a [[DataFrame]].
*
* This function will go through the input once to determine the input schema if `inferSchema`
......@@ -262,11 +239,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*
* @since 2.0.0
*/
@Experimental
def csv(path: String): DataFrame = format("csv").load(path)
/**
* :: Experimental ::
* Loads a Parquet file stream, returning the result as a [[DataFrame]].
*
* You can set the following Parquet-specific option(s) for reading Parquet files:
......@@ -281,13 +256,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*
* @since 2.0.0
*/
@Experimental
def parquet(path: String): DataFrame = {
format("parquet").load(path)
}
/**
* :: Experimental ::
* Loads text files and returns a [[DataFrame]] whose schema starts with a string column named
* "value", and followed by partitioned columns if there are any.
*
......@@ -308,7 +281,6 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*
* @since 2.0.0
*/
@Experimental
def text(path: String): DataFrame = format("text").load(path)
......
......@@ -37,7 +37,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
private val df = ds.toDF()
/**
* :: Experimental ::
* Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
* - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be
* written to the sink
......@@ -46,15 +45,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
*
* @since 2.0.0
*/
@Experimental
def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
this.outputMode = outputMode
this
}
/**
* :: Experimental ::
* Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
* - `append`: only the new rows in the streaming DataFrame/Dataset will be written to
* the sink
......@@ -63,7 +59,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
*
* @since 2.0.0
*/
@Experimental
def outputMode(outputMode: String): DataStreamWriter[T] = {
this.outputMode = outputMode.toLowerCase match {
case "append" =>
......@@ -78,7 +73,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
}
/**
* :: Experimental ::
* Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run
* the query as fast as possible.
*
......@@ -100,7 +94,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
*
* @since 2.0.0
*/
@Experimental
def trigger(trigger: Trigger): DataStreamWriter[T] = {
this.trigger = trigger
this
......@@ -108,25 +101,21 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
/**
* :: Experimental ::
* Specifies the name of the [[StreamingQuery]] that can be started with `start()`.
* This name must be unique among all the currently active queries in the associated SQLContext.
*
* @since 2.0.0
*/
@Experimental
def queryName(queryName: String): DataStreamWriter[T] = {
this.extraOptions += ("queryName" -> queryName)
this
}
/**
* :: Experimental ::
* Specifies the underlying output data source. Built-in options include "parquet" for now.
*
* @since 2.0.0
*/
@Experimental
def format(source: String): DataStreamWriter[T] = {
this.source = source
this
......@@ -156,90 +145,74 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
}
/**
* :: Experimental ::
* Adds an output option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: String): DataStreamWriter[T] = {
this.extraOptions += (key -> value)
this
}
/**
* :: Experimental ::
* Adds an output option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: Boolean): DataStreamWriter[T] = option(key, value.toString)
/**
* :: Experimental ::
* Adds an output option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: Long): DataStreamWriter[T] = option(key, value.toString)
/**
* :: Experimental ::
* Adds an output option for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def option(key: String, value: Double): DataStreamWriter[T] = option(key, value.toString)
/**
* :: Experimental ::
* (Scala-specific) Adds output options for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def options(options: scala.collection.Map[String, String]): DataStreamWriter[T] = {
this.extraOptions ++= options
this
}
/**
* :: Experimental ::
* Adds output options for the underlying data source.
*
* @since 2.0.0
*/
@Experimental
def options(options: java.util.Map[String, String]): DataStreamWriter[T] = {
this.options(options.asScala)
this
}
/**
* :: Experimental ::
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
@Experimental
def start(path: String): StreamingQuery = {
option("path", path).start()
}
/**
* :: Experimental ::
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
@Experimental
def start(): StreamingQuery = {
if (source == "memory") {
assertNotPartitioned("memory")
......@@ -297,7 +270,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
}
/**
* :: Experimental ::
* Starts the execution of the streaming query, which will continually send results to the given
* [[ForeachWriter]] as as new data arrives. The [[ForeachWriter]] can be used to send the data
* generated by the [[DataFrame]]/[[Dataset]] to an external system.
......@@ -343,7 +315,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
*
* @since 2.0.0
*/
@Experimental
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
this.source = "foreach"
this.foreachWriter = if (writer != null) {
......
......@@ -35,7 +35,7 @@ abstract class StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.DataStreamWriter `DataStreamWriter.start()`]],
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]],
* that is, `onQueryStart` will be called on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]]. Please
* don't block this method as it will block your query.
......@@ -101,8 +101,6 @@ object StreamingQueryListener {
* @param queryInfo Information about the status of the query.
* @param exception The exception message of the [[StreamingQuery]] if the query was terminated
* with an exception. Otherwise, it will be `None`.
* @param stackTrace The stack trace of the exception if the query was terminated with an
* exception. It will be empty if there was no error.
* @since 2.0.0
*/
@Experimental
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment