Skip to content
Snippets Groups Projects
Commit 5a5b83c9 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-15261][SQL] Remove experimental tag from DataFrameReader/Writer

## What changes were proposed in this pull request?
This patch removes experimental tag from DataFrameReader and DataFrameWriter, and explicitly tags a few methods added for structured streaming as experimental.

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #13038 from rxin/SPARK-15261.
parent 61e0bdcf
No related branches found
No related tags found
No related merge requests found
......@@ -50,8 +50,6 @@ class DataFrameReader(object):
(e.g. file systems, key-value stores, etc). Use :func:`SQLContext.read`
to access this.
::Note: Experimental
.. versionadded:: 1.4
"""
......@@ -143,6 +141,8 @@ class DataFrameReader(object):
def stream(self, path=None, format=None, schema=None, **options):
"""Loads a data stream from a data source and returns it as a :class`DataFrame`.
.. note:: Experimental.
:param path: optional string for file-system backed data sources.
:param format: optional string for format of the data source. Default to 'parquet'.
:param schema: optional :class:`StructType` for the input schema.
......@@ -462,8 +462,6 @@ class DataFrameWriter(object):
(e.g. file systems, key-value stores, etc). Use :func:`DataFrame.write`
to access this.
::Note: Experimental
.. versionadded:: 1.4
"""
def __init__(self, df):
......@@ -540,7 +538,9 @@ class DataFrameWriter(object):
def queryName(self, queryName):
"""Specifies the name of the :class:`ContinuousQuery` that can be started with
:func:`startStream`. This name must be unique among all the currently active queries
in the associated SQLContext.
in the associated SQLContext
.. note:: Experimental.
:param queryName: unique name for the query
......@@ -557,6 +557,8 @@ class DataFrameWriter(object):
"""Set the trigger for the stream query. If this is not set it will run the query as fast
as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
.. note:: Experimental.
:param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
>>> # trigger the query for execution every 5 seconds
......@@ -614,6 +616,8 @@ class DataFrameWriter(object):
If ``format`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
.. note:: Experimental.
:param path: the path in a Hadoop supported file system
:param format: the format used to save
......
......@@ -34,13 +34,11 @@ import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.types.StructType
/**
* :: Experimental ::
* Interface used to load a [[DataFrame]] from external storage systems (e.g. file systems,
* Interface used to load a [[Dataset]] from external storage systems (e.g. file systems,
* key-value stores, etc) or data streams. Use [[SparkSession.read]] to access this.
*
* @since 1.4.0
*/
@Experimental
class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
/**
......@@ -164,11 +162,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}
/**
* :: Experimental ::
* Loads input data stream in as a [[DataFrame]], for data streams that don't require a path
* (e.g. external key-value stores).
*
* @since 2.0.0
*/
@Experimental
def stream(): DataFrame = {
val dataSource =
DataSource(
......@@ -180,10 +180,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}
/**
* :: Experimental ::
* Loads input in as a [[DataFrame]], for data streams that read from some path.
*
* @since 2.0.0
*/
@Experimental
def stream(path: String): DataFrame = {
option("path", path).stream()
}
......
......@@ -34,13 +34,11 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
/**
* :: Experimental ::
* Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
* key-value stores, etc) or data streams. Use [[DataFrame.write]] to access this.
* Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
* key-value stores, etc) or data streams. Use [[Dataset.write]] to access this.
*
* @since 1.4.0
*/
@Experimental
final class DataFrameWriter private[sql](df: DataFrame) {
/**
......@@ -255,11 +253,13 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}
/**
* :: Experimental ::
* Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`.
* This name must be unique among all the currently active queries in the associated SQLContext.
*
* @since 2.0.0
*/
@Experimental
def queryName(queryName: String): DataFrameWriter = {
assertStreaming("queryName() can only be called on continuous queries")
this.extraOptions += ("queryName" -> queryName)
......@@ -267,25 +267,29 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}
/**
* :: Experimental ::
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
@Experimental
def startStream(path: String): ContinuousQuery = {
option("path", path).startStream()
}
/**
* :: Experimental ::
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
@Experimental
def startStream(): ContinuousQuery = {
assertNotBucketed
assertNotBucketed()
assertStreaming("startStream() can only be called on continuous queries")
if (source == "memory") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment