From a6a18a4573515e76d78534f1a19fcc2c3819f6c5 Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Mon, 13 Jun 2016 12:47:47 -0700
Subject: [PATCH] =?UTF-8?q?[HOTFIX][MINOR][SQL]=20Revert=20"=20Standardize?=
 =?UTF-8?q?=20'continuous=20queries'=20to=20'streaming=20D=E2=80=A6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

This reverts commit d32e227787338a08741d8064f5dd2db1d60ddc63.
Broke build - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-branch-2.0-compile-maven-hadoop-2.3/326/console

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #13645 from tdas/build-break.
---
 .../apache/spark/sql/DataFrameWriter.scala    | 47 +++++-----
 .../spark/sql/streaming/StreamTest.scala      |  4 +-
 .../test/DataFrameReaderWriterSuite.scala     | 90 ++++++++-----------
 3 files changed, 64 insertions(+), 77 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 392e3c1e4e..afae0786b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -56,7 +56,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   def mode(saveMode: SaveMode): DataFrameWriter[T] = {
     // mode() is used for non-continuous queries
     // outputMode() is used for continuous queries
-    assertNotStreaming("mode() can only be called on non-streaming Datasets/DataFrames")
+    assertNotStreaming("mode() can only be called on non-continuous queries")
     this.mode = saveMode
     this
   }
@@ -73,7 +73,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   def mode(saveMode: String): DataFrameWriter[T] = {
     // mode() is used for non-continuous queries
     // outputMode() is used for continuous queries
-    assertNotStreaming("mode() can only be called on non-streaming Datasets/DataFrames")
+    assertNotStreaming("mode() can only be called on non-continuous queries")
     this.mode = saveMode.toLowerCase match {
       case "overwrite" => SaveMode.Overwrite
       case "append" => SaveMode.Append
@@ -86,33 +86,33 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   }
 
   /**
-   * Specifies how data of a streaming Dataset/DataFrame is written to a streaming sink.
-   *   - `OutputMode.Append()`: only the new rows in the streaming Dataset/DataFrame will be
+   * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
+   *   - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be
    *                            written to the sink
-   *   - `OutputMode.Complete()`: all the rows in the streaming Dataset/DataFrame will be written
+   *   - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written
    *                              to the sink every time these is some updates
    *
    * @since 2.0.0
    */
   @Experimental
   def outputMode(outputMode: OutputMode): DataFrameWriter[T] = {
-    assertStreaming("outputMode() can only be called on streaming Datasets/DataFrames")
+    assertStreaming("outputMode() can only be called on continuous queries")
     this.outputMode = outputMode
     this
   }
 
   /**
-   * Specifies how data of a streaming Dataset/DataFrame is written to a streaming sink.
-   *   - `append`:   only the new rows in the streaming Dataset/DataFrame will be written to
+   * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
+   *   - `append`:   only the new rows in the streaming DataFrame/Dataset will be written to
    *                 the sink
-   *   - `complete`: all the rows in the streaming Dataset/DataFrame will be written to the sink
+   *   - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink
    *                 every time these is some updates
    *
    * @since 2.0.0
    */
   @Experimental
   def outputMode(outputMode: String): DataFrameWriter[T] = {
-    assertStreaming("outputMode() can only be called on streaming Datasets/DataFrames")
+    assertStreaming("outputMode() can only be called on continuous queries")
     this.outputMode = outputMode.toLowerCase match {
       case "append" =>
         OutputMode.Append
@@ -150,7 +150,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    */
   @Experimental
   def trigger(trigger: Trigger): DataFrameWriter[T] = {
-    assertStreaming("trigger() can only be called on streaming Datasets/DataFrames")
+    assertStreaming("trigger() can only be called on continuous queries")
     this.trigger = trigger
     this
   }
@@ -284,7 +284,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    */
   def save(): Unit = {
     assertNotBucketed("save")
-    assertNotStreaming("save() can only be called on non-streaming Datasets/DataFrames")
+    assertNotStreaming("save() can only be called on non-continuous queries")
     val dataSource = DataSource(
       df.sparkSession,
       className = source,
@@ -304,7 +304,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    */
   @Experimental
   def queryName(queryName: String): DataFrameWriter[T] = {
-    assertStreaming("queryName() can only be called on streaming Datasets/DataFrames")
+    assertStreaming("queryName() can only be called on continuous queries")
     this.extraOptions += ("queryName" -> queryName)
     this
   }
@@ -333,7 +333,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   @Experimental
   def startStream(): ContinuousQuery = {
     assertNotBucketed("startStream")
-    assertStreaming("startStream() can only be called on streaming Datasets/DataFrames")
+    assertStreaming("startStream() can only be called on continuous queries")
 
     if (source == "memory") {
       val queryName =
@@ -434,7 +434,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
     assertNotPartitioned("foreach")
     assertNotBucketed("foreach")
-    assertStreaming("foreach() can only be called on streaming Datasets/DataFrames.")
+    assertStreaming(
+      "foreach() can only be called on streaming Datasets/DataFrames.")
 
     val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
     val sink = new ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.exprEnc)
@@ -501,7 +502,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
 
   private def insertInto(tableIdent: TableIdentifier): Unit = {
     assertNotBucketed("insertInto")
-    assertNotStreaming("insertInto() can only be called on non-streaming Datasets/DataFrames")
+    assertNotStreaming("insertInto() can only be called on non-continuous queries")
     val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap)
     val overwrite = mode == SaveMode.Overwrite
 
@@ -620,7 +621,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   }
 
   private def saveAsTable(tableIdent: TableIdentifier): Unit = {
-    assertNotStreaming("saveAsTable() can only be called on non-streaming Datasets/DataFrames")
+    assertNotStreaming("saveAsTable() can only be called on non-continuous queries")
 
     val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent)
 
@@ -663,7 +664,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
     assertNotPartitioned("jdbc")
     assertNotBucketed("jdbc")
-    assertNotStreaming("jdbc() can only be called on non-streaming Datasets/DataFrames")
+    assertNotStreaming("jdbc() can only be called on non-continuous queries")
 
     val props = new Properties()
     extraOptions.foreach { case (key, value) =>
@@ -722,7 +723,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * @since 1.4.0
    */
   def json(path: String): Unit = {
-    assertNotStreaming("json() can only be called on non-streaming Datasets/DataFrames")
+    assertNotStreaming("json() can only be called on non-continuous queries")
     format("json").save(path)
   }
 
@@ -742,7 +743,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * @since 1.4.0
    */
   def parquet(path: String): Unit = {
-    assertNotStreaming("parquet() can only be called on non-streaming Datasets/DataFrames")
+    assertNotStreaming("parquet() can only be called on non-continuous queries")
     format("parquet").save(path)
   }
 
@@ -762,7 +763,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * @note Currently, this method can only be used after enabling Hive support
    */
   def orc(path: String): Unit = {
-    assertNotStreaming("orc() can only be called on non-streaming Datasets/DataFrames")
+    assertNotStreaming("orc() can only be called on non-continuous queries")
     format("orc").save(path)
   }
 
@@ -786,7 +787,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * @since 1.6.0
    */
   def text(path: String): Unit = {
-    assertNotStreaming("text() can only be called on non-streaming Datasets/DataFrames")
+    assertNotStreaming("text() can only be called on non-continuous queries")
     format("text").save(path)
   }
 
@@ -816,7 +817,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * @since 2.0.0
    */
   def csv(path: String): Unit = {
-    assertNotStreaming("csv() can only be called on non-streaming Datasets/DataFrames")
+    assertNotStreaming("csv() can only be called on non-continuous queries")
     format("csv").save(path)
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index fabb8ba6c6..7f1e5fe613 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -70,7 +70,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
   /** How long to wait for an active stream to catch up when checking a result. */
   val streamingTimeout = 10.seconds
 
-  /** A trait for actions that can be performed while testing a streaming DataSet/DataFrame. */
+  /** A trait for actions that can be performed while testing a streaming DataFrame. */
   trait StreamAction
 
   /** A trait to mark actions that require the stream to be actively running. */
@@ -194,7 +194,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
   }
 
   /**
-   * Executes the specified actions on the given streaming DataSet/DataFrame and provides helpful
+   * Executes the specified actions on the given streaming DataFrame and provides helpful
    * error messages in the case of failures or incorrect answers.
    *
    * Note that if the stream is not explicitly started before an action that requires it to be
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index 51aa53287c..6e0d66ae7f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -371,80 +371,66 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
 
   private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath
 
-  test("check trigger() can only be called on streaming Datasets/DataFrames") {
+  test("check trigger() can only be called on continuous queries") {
     val df = spark.read.text(newTextInput)
     val w = df.write.option("checkpointLocation", newMetadataDir)
     val e = intercept[AnalysisException](w.trigger(ProcessingTime("10 seconds")))
-    assert(e.getMessage == "trigger() can only be called on streaming Datasets/DataFrames;")
+    assert(e.getMessage == "trigger() can only be called on continuous queries;")
   }
 
-  test("check queryName() can only be called on streaming Datasets/DataFrames") {
+  test("check queryName() can only be called on continuous queries") {
     val df = spark.read.text(newTextInput)
     val w = df.write.option("checkpointLocation", newMetadataDir)
     val e = intercept[AnalysisException](w.queryName("queryName"))
-    assert(e.getMessage == "queryName() can only be called on streaming Datasets/DataFrames;")
+    assert(e.getMessage == "queryName() can only be called on continuous queries;")
   }
 
-  test("check startStream() can only be called on streaming Datasets/DataFrames") {
+  test("check startStream() can only be called on continuous queries") {
     val df = spark.read.text(newTextInput)
     val w = df.write.option("checkpointLocation", newMetadataDir)
     val e = intercept[AnalysisException](w.startStream())
-    assert(e.getMessage == "startStream() can only be called on streaming Datasets/DataFrames;")
+    assert(e.getMessage == "startStream() can only be called on continuous queries;")
   }
 
-  test("check startStream(path) can only be called on streaming Datasets/DataFrames") {
+  test("check startStream(path) can only be called on continuous queries") {
     val df = spark.read.text(newTextInput)
     val w = df.write.option("checkpointLocation", newMetadataDir)
     val e = intercept[AnalysisException](w.startStream("non_exist_path"))
-    assert(e.getMessage == "startStream() can only be called on streaming Datasets/DataFrames;")
+    assert(e.getMessage == "startStream() can only be called on continuous queries;")
   }
 
-  test("check foreach() can only be called on streaming Datasets/DataFrames") {
-    val df = spark.read.text(newTextInput)
-    val w = df.write.option("checkpointLocation", newMetadataDir)
-    val foreachWriter = new ForeachWriter[String] {
-      override def open(partitionId: Long, version: Long): Boolean = false
-      override def process(value: String): Unit = {}
-      override def close(errorOrNull: Throwable): Unit = {}
-    }
-    val e = intercept[AnalysisException](w.foreach(foreachWriter))
-    Seq("foreach()", "streaming Datasets/DataFrames").foreach { s =>
-      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
-    }
-  }
-
-  test("check mode(SaveMode) can only be called on non-streaming Datasets/DataFrames") {
+  test("check mode(SaveMode) can only be called on non-continuous queries") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.mode(SaveMode.Append))
-    assert(e.getMessage == "mode() can only be called on non-streaming Datasets/DataFrames;")
+    assert(e.getMessage == "mode() can only be called on non-continuous queries;")
   }
 
-  test("check mode(string) can only be called on non-streaming Datasets/DataFrames") {
+  test("check mode(string) can only be called on non-continuous queries") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.mode("append"))
-    assert(e.getMessage == "mode() can only be called on non-streaming Datasets/DataFrames;")
+    assert(e.getMessage == "mode() can only be called on non-continuous queries;")
   }
 
-  test("check outputMode(OutputMode) can only be called on streaming Datasets/DataFrames") {
+  test("check outputMode(OutputMode) can only be called on continuous queries") {
     val df = spark.read.text(newTextInput)
     val w = df.write.option("checkpointLocation", newMetadataDir)
     val e = intercept[AnalysisException](w.outputMode(OutputMode.Append))
-    Seq("outputmode", "streaming Datasets/DataFrames").foreach { s =>
+    Seq("outputmode", "continuous queries").foreach { s =>
       assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
     }
   }
 
-  test("check outputMode(string) can only be called on streaming Datasets/DataFrames") {
+  test("check outputMode(string) can only be called on continuous queries") {
     val df = spark.read.text(newTextInput)
     val w = df.write.option("checkpointLocation", newMetadataDir)
     val e = intercept[AnalysisException](w.outputMode("append"))
-    Seq("outputmode", "streaming Datasets/DataFrames").foreach { s =>
+    Seq("outputmode", "continuous queries").foreach { s =>
       assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
     }
   }
@@ -464,7 +450,7 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
     testError("Xyz")
   }
 
-  test("check bucketBy() can only be called on non-streaming Datasets/DataFrames") {
+  test("check bucketBy() can only be called on non-continuous queries") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
@@ -473,7 +459,7 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
     assert(e.getMessage == "'startStream' does not support bucketing right now;")
   }
 
-  test("check sortBy() can only be called on non-streaming Datasets/DataFrames;") {
+  test("check sortBy() can only be called on non-continuous queries;") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
@@ -482,94 +468,94 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
     assert(e.getMessage == "'startStream' does not support bucketing right now;")
   }
 
-  test("check save(path) can only be called on non-streaming Datasets/DataFrames") {
+  test("check save(path) can only be called on non-continuous queries") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.save("non_exist_path"))
-    assert(e.getMessage == "save() can only be called on non-streaming Datasets/DataFrames;")
+    assert(e.getMessage == "save() can only be called on non-continuous queries;")
   }
 
-  test("check save() can only be called on non-streaming Datasets/DataFrames") {
+  test("check save() can only be called on non-continuous queries") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.save())
-    assert(e.getMessage == "save() can only be called on non-streaming Datasets/DataFrames;")
+    assert(e.getMessage == "save() can only be called on non-continuous queries;")
   }
 
-  test("check insertInto() can only be called on non-streaming Datasets/DataFrames") {
+  test("check insertInto() can only be called on non-continuous queries") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.insertInto("non_exsit_table"))
-    assert(e.getMessage == "insertInto() can only be called on non-streaming Datasets/DataFrames;")
+    assert(e.getMessage == "insertInto() can only be called on non-continuous queries;")
   }
 
-  test("check saveAsTable() can only be called on non-streaming Datasets/DataFrames") {
+  test("check saveAsTable() can only be called on non-continuous queries") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.saveAsTable("non_exsit_table"))
-    assert(e.getMessage == "saveAsTable() can only be called on non-streaming Datasets/DataFrames;")
+    assert(e.getMessage == "saveAsTable() can only be called on non-continuous queries;")
   }
 
-  test("check jdbc() can only be called on non-streaming Datasets/DataFrames") {
+  test("check jdbc() can only be called on non-continuous queries") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.jdbc(null, null, null))
-    assert(e.getMessage == "jdbc() can only be called on non-streaming Datasets/DataFrames;")
+    assert(e.getMessage == "jdbc() can only be called on non-continuous queries;")
   }
 
-  test("check json() can only be called on non-streaming Datasets/DataFrames") {
+  test("check json() can only be called on non-continuous queries") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.json("non_exist_path"))
-    assert(e.getMessage == "json() can only be called on non-streaming Datasets/DataFrames;")
+    assert(e.getMessage == "json() can only be called on non-continuous queries;")
   }
 
-  test("check parquet() can only be called on non-streaming Datasets/DataFrames") {
+  test("check parquet() can only be called on non-continuous queries") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.parquet("non_exist_path"))
-    assert(e.getMessage == "parquet() can only be called on non-streaming Datasets/DataFrames;")
+    assert(e.getMessage == "parquet() can only be called on non-continuous queries;")
   }
 
-  test("check orc() can only be called on non-streaming Datasets/DataFrames") {
+  test("check orc() can only be called on non-continuous queries") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.orc("non_exist_path"))
-    assert(e.getMessage == "orc() can only be called on non-streaming Datasets/DataFrames;")
+    assert(e.getMessage == "orc() can only be called on non-continuous queries;")
   }
 
-  test("check text() can only be called on non-streaming Datasets/DataFrames") {
+  test("check text() can only be called on non-continuous queries") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.text("non_exist_path"))
-    assert(e.getMessage == "text() can only be called on non-streaming Datasets/DataFrames;")
+    assert(e.getMessage == "text() can only be called on non-continuous queries;")
   }
 
-  test("check csv() can only be called on non-streaming Datasets/DataFrames") {
+  test("check csv() can only be called on non-continuous queries") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.csv("non_exist_path"))
-    assert(e.getMessage == "csv() can only be called on non-streaming Datasets/DataFrames;")
+    assert(e.getMessage == "csv() can only be called on non-continuous queries;")
   }
 
   test("check foreach() does not support partitioning or bucketing") {
-- 
GitLab