diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index d437c16a25b0117ed95cbb5685f5313c6d346bcd..864a9cd3eb89d8506c72a32ef93ff596de3c2244 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -35,89 +35,73 @@ import org.apache.spark.sql.types.StructType
 @Experimental
 final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {
   /**
-   * :: Experimental ::
    * Specifies the input data source format.
    *
    * @since 2.0.0
    */
-  @Experimental
   def format(source: String): DataStreamReader = {
     this.source = source
     this
   }
 
   /**
-   * :: Experimental ::
    * Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
    * automatically from data. By specifying the schema here, the underlying data source can
    * skip the schema inference step, and thus speed up data loading.
    *
    * @since 2.0.0
    */
-  @Experimental
   def schema(schema: StructType): DataStreamReader = {
     this.userSpecifiedSchema = Option(schema)
     this
   }
 
   /**
-   * :: Experimental ::
    * Adds an input option for the underlying data source.
    *
    * @since 2.0.0
    */
-  @Experimental
   def option(key: String, value: String): DataStreamReader = {
     this.extraOptions += (key -> value)
     this
   }
 
   /**
-   * :: Experimental ::
    * Adds an input option for the underlying data source.
    *
    * @since 2.0.0
    */
-  @Experimental
   def option(key: String, value: Boolean): DataStreamReader = option(key, value.toString)
 
   /**
-   * :: Experimental ::
    * Adds an input option for the underlying data source.
    *
    * @since 2.0.0
    */
-  @Experimental
   def option(key: String, value: Long): DataStreamReader = option(key, value.toString)
 
   /**
-   * :: Experimental ::
    * Adds an input option for the underlying data source.
    *
    * @since 2.0.0
    */
-  @Experimental
   def option(key: String, value: Double): DataStreamReader = option(key, value.toString)
 
   /**
-   * :: Experimental ::
    * (Scala-specific) Adds input options for the underlying data source.
    *
    * @since 2.0.0
    */
-  @Experimental
   def options(options: scala.collection.Map[String, String]): DataStreamReader = {
     this.extraOptions ++= options
     this
   }
 
   /**
-   * :: Experimental ::
    * Adds input options for the underlying data source.
    *
    * @since 2.0.0
    */
-  @Experimental
   def options(options: java.util.Map[String, String]): DataStreamReader = {
     this.options(options.asScala)
     this
@@ -125,13 +109,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
 
 
   /**
-   * :: Experimental ::
    * Loads input data stream in as a [[DataFrame]], for data streams that don't require a path
    * (e.g. external key-value stores).
    *
    * @since 2.0.0
    */
-  @Experimental
   def load(): DataFrame = {
     val dataSource =
       DataSource(
@@ -143,18 +125,15 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
   }
 
   /**
-   * :: Experimental ::
    * Loads input in as a [[DataFrame]], for data streams that read from some path.
    *
    * @since 2.0.0
    */
-  @Experimental
   def load(path: String): DataFrame = {
     option("path", path).load()
   }
 
   /**
-   * :: Experimental ::
    * Loads a JSON file stream (one object per line) and returns the result as a [[DataFrame]].
    *
    * This function goes through the input once to determine the input schema. If you know the
@@ -198,11 +177,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    *
    * @since 2.0.0
    */
-  @Experimental
   def json(path: String): DataFrame = format("json").load(path)
 
   /**
-   * :: Experimental ::
    * Loads a CSV file stream and returns the result as a [[DataFrame]].
    *
    * This function will go through the input once to determine the input schema if `inferSchema`
@@ -262,11 +239,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    *
    * @since 2.0.0
    */
-  @Experimental
   def csv(path: String): DataFrame = format("csv").load(path)
 
   /**
-   * :: Experimental ::
    * Loads a Parquet file stream, returning the result as a [[DataFrame]].
    *
    * You can set the following Parquet-specific option(s) for reading Parquet files:
@@ -281,13 +256,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    *
    * @since 2.0.0
    */
-  @Experimental
   def parquet(path: String): DataFrame = {
     format("parquet").load(path)
   }
 
   /**
-   * :: Experimental ::
    * Loads text files and returns a [[DataFrame]] whose schema starts with a string column named
    * "value", and followed by partitioned columns if there are any.
    *
@@ -308,7 +281,6 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    *
    * @since 2.0.0
    */
-  @Experimental
   def text(path: String): DataFrame = format("text").load(path)
 
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index f70c7d08a691c26827276c4b2b38bb4c463ff544..b959444b492981e3f69a846644f7426c3c348cd7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -37,7 +37,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
   private val df = ds.toDF()
 
   /**
-   * :: Experimental ::
    * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
    *   - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be
    *                            written to the sink
@@ -46,15 +45,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
    *
    * @since 2.0.0
    */
-  @Experimental
   def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
     this.outputMode = outputMode
     this
   }
 
-
   /**
-   * :: Experimental ::
    * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
    *   - `append`:   only the new rows in the streaming DataFrame/Dataset will be written to
    *                 the sink
@@ -63,7 +59,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
    *
    * @since 2.0.0
    */
-  @Experimental
   def outputMode(outputMode: String): DataStreamWriter[T] = {
     this.outputMode = outputMode.toLowerCase match {
       case "append" =>
@@ -78,7 +73,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
   }
 
   /**
-   * :: Experimental ::
    * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run
    * the query as fast as possible.
    *
@@ -100,7 +94,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
    *
    * @since 2.0.0
    */
-  @Experimental
   def trigger(trigger: Trigger): DataStreamWriter[T] = {
     this.trigger = trigger
     this
@@ -108,25 +101,21 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
 
   /**
-   * :: Experimental ::
    * Specifies the name of the [[StreamingQuery]] that can be started with `start()`.
    * This name must be unique among all the currently active queries in the associated SQLContext.
    *
    * @since 2.0.0
    */
-  @Experimental
   def queryName(queryName: String): DataStreamWriter[T] = {
     this.extraOptions += ("queryName" -> queryName)
     this
   }
 
   /**
-   * :: Experimental ::
    * Specifies the underlying output data source. Built-in options include "parquet" for now.
    *
    * @since 2.0.0
    */
-  @Experimental
   def format(source: String): DataStreamWriter[T] = {
     this.source = source
     this
@@ -156,90 +145,74 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
   }
 
   /**
-   * :: Experimental ::
    * Adds an output option for the underlying data source.
    *
    * @since 2.0.0
    */
-  @Experimental
   def option(key: String, value: String): DataStreamWriter[T] = {
     this.extraOptions += (key -> value)
     this
   }
 
   /**
-   * :: Experimental ::
    * Adds an output option for the underlying data source.
    *
    * @since 2.0.0
    */
-  @Experimental
   def option(key: String, value: Boolean): DataStreamWriter[T] = option(key, value.toString)
 
   /**
-   * :: Experimental ::
    * Adds an output option for the underlying data source.
    *
    * @since 2.0.0
    */
-  @Experimental
   def option(key: String, value: Long): DataStreamWriter[T] = option(key, value.toString)
 
   /**
-   * :: Experimental ::
    * Adds an output option for the underlying data source.
    *
    * @since 2.0.0
    */
-  @Experimental
   def option(key: String, value: Double): DataStreamWriter[T] = option(key, value.toString)
 
   /**
-   * :: Experimental ::
    * (Scala-specific) Adds output options for the underlying data source.
    *
    * @since 2.0.0
    */
-  @Experimental
   def options(options: scala.collection.Map[String, String]): DataStreamWriter[T] = {
     this.extraOptions ++= options
     this
   }
 
   /**
-   * :: Experimental ::
    * Adds output options for the underlying data source.
    *
    * @since 2.0.0
    */
-  @Experimental
   def options(options: java.util.Map[String, String]): DataStreamWriter[T] = {
     this.options(options.asScala)
     this
   }
 
   /**
-   * :: Experimental ::
    * Starts the execution of the streaming query, which will continually output results to the given
    * path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
    * the stream.
    *
    * @since 2.0.0
    */
-  @Experimental
   def start(path: String): StreamingQuery = {
     option("path", path).start()
   }
 
   /**
-   * :: Experimental ::
    * Starts the execution of the streaming query, which will continually output results to the given
    * path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
    * the stream.
    *
    * @since 2.0.0
    */
-  @Experimental
   def start(): StreamingQuery = {
     if (source == "memory") {
       assertNotPartitioned("memory")
@@ -297,7 +270,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
   }
 
   /**
-   * :: Experimental ::
    * Starts the execution of the streaming query, which will continually send results to the given
    * [[ForeachWriter]] as as new data arrives. The [[ForeachWriter]] can be used to send the data
    * generated by the [[DataFrame]]/[[Dataset]] to an external system.
@@ -343,7 +315,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
    *
    * @since 2.0.0
    */
-  @Experimental
   def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
     this.source = "foreach"
     this.foreachWriter = if (writer != null) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index db606abb8ce4316d90fb3698e6f5c17ebeb6a31a..8a8855d85a4c7c55d6ab3cfc88f08ed7f3347449 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -35,7 +35,7 @@ abstract class StreamingQueryListener {
   /**
    * Called when a query is started.
    * @note This is called synchronously with
-   *       [[org.apache.spark.sql.DataStreamWriter `DataStreamWriter.start()`]],
+   *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]],
    *       that is, `onQueryStart` will be called on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]]. Please
    *       don't block this method as it will block your query.
@@ -101,8 +101,6 @@ object StreamingQueryListener {
    * @param queryInfo Information about the status of the query.
    * @param exception The exception message of the [[StreamingQuery]] if the query was terminated
    *                  with an exception. Otherwise, it will be `None`.
-   * @param stackTrace The stack trace of the exception if the query was terminated with an
-   *                   exception. It will be empty if there was no error.
    * @since 2.0.0
    */
   @Experimental