From d0ac0e6f433bfccf4ced3743a2526f67fdb5c38e Mon Sep 17 00:00:00 2001
From: Shixiong Zhu <shixiong@databricks.com>
Date: Fri, 17 Jun 2016 21:58:10 -0700
Subject: [PATCH] [SPARK-16020][SQL] Fix complete mode aggregation with console
 sink

## What changes were proposed in this pull request?

We cannot use `limit` on DataFrame in ConsoleSink because it will use a wrong planner. This PR just collects `DataFrame` and calls `show` on a batch DataFrame based on the result. This is fine since ConsoleSink is only for debugging.

## How was this patch tested?

Manually confirmed ConsoleSink now works with complete mode aggregation.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13740 from zsxwing/complete-console.
---
 .../spark/sql/execution/streaming/Sink.scala  |  3 +
 .../sql/execution/streaming/console.scala     |  4 +-
 .../streaming/ConsoleSinkSuite.scala          | 99 +++++++++++++++++++
 3 files changed, 105 insertions(+), 1 deletion(-)
 create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
index e641e09b56..2571b59be5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
@@ -30,6 +30,9 @@ trait Sink {
    * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
    * this method is called more than once with the same batchId (which will happen in the case of
    * failures), then `data` should only be added once.
+   *
+   * Note: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`).
+   * Otherwise, you may get a wrong result.
    */
   def addBatch(batchId: Long, data: DataFrame): Unit
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 2ec2a3c3c4..e8b9712d19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -45,7 +45,9 @@ class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
     println(batchIdStr)
     println("-------------------------------------------")
     // scalastyle:off println
-    data.show(numRowsToShow, isTruncated)
+    data.sparkSession.createDataFrame(
+      data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
+      .show(numRowsToShow, isTruncated)
   }
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala
new file mode 100644
index 0000000000..e853d8c465
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.streaming.StreamTest
+
+class ConsoleSinkSuite extends StreamTest with BeforeAndAfter {
+
+  import testImplicits._
+
+  after {
+    sqlContext.streams.active.foreach(_.stop())
+  }
+
+  test("SPARK-16020 Complete mode aggregation with console sink") {
+    withTempDir { checkpointLocation =>
+      val origOut = System.out
+      val stdout = new ByteArrayOutputStream()
+      try {
+        // Hook Java System.out.println
+        System.setOut(new PrintStream(stdout))
+        // Hook Scala println
+        Console.withOut(stdout) {
+          val input = MemoryStream[String]
+          val df = input.toDF().groupBy("value").count()
+          val query = df.writeStream
+            .format("console")
+            .outputMode("complete")
+            .option("checkpointLocation", checkpointLocation.getAbsolutePath)
+            .start()
+          input.addData("a")
+          query.processAllAvailable()
+          input.addData("a", "b")
+          query.processAllAvailable()
+          input.addData("a", "b", "c")
+          query.processAllAvailable()
+          query.stop()
+        }
+        System.out.flush()
+      } finally {
+        System.setOut(origOut)
+      }
+
+      val expected = """-------------------------------------------
+        |Batch: 0
+        |-------------------------------------------
+        |+-----+-----+
+        ||value|count|
+        |+-----+-----+
+        ||    a|    1|
+        |+-----+-----+
+        |
+        |-------------------------------------------
+        |Batch: 1
+        |-------------------------------------------
+        |+-----+-----+
+        ||value|count|
+        |+-----+-----+
+        ||    a|    2|
+        ||    b|    1|
+        |+-----+-----+
+        |
+        |-------------------------------------------
+        |Batch: 2
+        |-------------------------------------------
+        |+-----+-----+
+        ||value|count|
+        |+-----+-----+
+        ||    a|    3|
+        ||    b|    2|
+        ||    c|    1|
+        |+-----+-----+
+        |
+        |""".stripMargin
+      assert(expected === new String(stdout.toByteArray, UTF_8))
+    }
+  }
+
+}
-- 
GitLab