diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
index e641e09b56adfa8d89f58c9bac001a0412d18654..2571b59be54f925f788d02e4cf4af164bea02904 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
@@ -30,6 +30,9 @@ trait Sink {
    * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
    * this method is called more than once with the same batchId (which will happen in the case of
    * failures), then `data` should only be added once.
+   *
+   * Note: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`).
+   * Otherwise, you may get a wrong result.
    */
   def addBatch(batchId: Long, data: DataFrame): Unit
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 2ec2a3c3c4a490af7c68a673914b37ede74d25c0..e8b9712d19cd559745dd98ce7174642ef0328bb8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -45,7 +45,9 @@ class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
     println(batchIdStr)
     println("-------------------------------------------")
     // scalastyle:off println
-    data.show(numRowsToShow, isTruncated)
+    data.sparkSession.createDataFrame(
+      data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
+      .show(numRowsToShow, isTruncated)
   }
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..e853d8c465419c30cd680fe5c95bff2eb5802bcd
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.streaming.StreamTest
+
+class ConsoleSinkSuite extends StreamTest with BeforeAndAfter {
+
+  import testImplicits._
+
+  after {
+    sqlContext.streams.active.foreach(_.stop())
+  }
+
+  test("SPARK-16020 Complete mode aggregation with console sink") {
+    withTempDir { checkpointLocation =>
+      val origOut = System.out
+      val stdout = new ByteArrayOutputStream()
+      try {
+        // Hook Java System.out.println
+        System.setOut(new PrintStream(stdout))
+        // Hook Scala println
+        Console.withOut(stdout) {
+          val input = MemoryStream[String]
+          val df = input.toDF().groupBy("value").count()
+          val query = df.writeStream
+            .format("console")
+            .outputMode("complete")
+            .option("checkpointLocation", checkpointLocation.getAbsolutePath)
+            .start()
+          input.addData("a")
+          query.processAllAvailable()
+          input.addData("a", "b")
+          query.processAllAvailable()
+          input.addData("a", "b", "c")
+          query.processAllAvailable()
+          query.stop()
+        }
+        System.out.flush()
+      } finally {
+        System.setOut(origOut)
+      }
+
+      val expected = """-------------------------------------------
+        |Batch: 0
+        |-------------------------------------------
+        |+-----+-----+
+        ||value|count|
+        |+-----+-----+
+        ||    a|    1|
+        |+-----+-----+
+        |
+        |-------------------------------------------
+        |Batch: 1
+        |-------------------------------------------
+        |+-----+-----+
+        ||value|count|
+        |+-----+-----+
+        ||    a|    2|
+        ||    b|    1|
+        |+-----+-----+
+        |
+        |-------------------------------------------
+        |Batch: 2
+        |-------------------------------------------
+        |+-----+-----+
+        ||value|count|
+        |+-----+-----+
+        ||    a|    3|
+        ||    b|    2|
+        ||    c|    1|
+        |+-----+-----+
+        |
+        |""".stripMargin
+      assert(expected === new String(stdout.toByteArray, UTF_8))
+    }
+  }
+
+}