Skip to content
Snippets Groups Projects
Commit d0ac0e6f authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-16020][SQL] Fix complete mode aggregation with console sink

## What changes were proposed in this pull request?

We cannot use `limit` on DataFrame in ConsoleSink because it will use a wrong planner. This PR just collects `DataFrame` and calls `show` on a batch DataFrame based on the result. This is fine since ConsoleSink is only for debugging.

## How was this patch tested?

Manually confirmed ConsoleSink now works with complete mode aggregation.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13740 from zsxwing/complete-console.
parent 8c198e24
No related branches found
No related tags found
No related merge requests found
...@@ -30,6 +30,9 @@ trait Sink { ...@@ -30,6 +30,9 @@ trait Sink {
* Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
* this method is called more than once with the same batchId (which will happen in the case of * this method is called more than once with the same batchId (which will happen in the case of
* failures), then `data` should only be added once. * failures), then `data` should only be added once.
*
* Note: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`).
* Otherwise, you may get a wrong result.
*/ */
def addBatch(batchId: Long, data: DataFrame): Unit def addBatch(batchId: Long, data: DataFrame): Unit
} }
...@@ -45,7 +45,9 @@ class ConsoleSink(options: Map[String, String]) extends Sink with Logging { ...@@ -45,7 +45,9 @@ class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
println(batchIdStr) println(batchIdStr)
println("-------------------------------------------") println("-------------------------------------------")
// scalastyle:off println // scalastyle:off println
data.show(numRowsToShow, isTruncated) data.sparkSession.createDataFrame(
data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
.show(numRowsToShow, isTruncated)
} }
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.charset.StandardCharsets.UTF_8
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.streaming.StreamTest
class ConsoleSinkSuite extends StreamTest with BeforeAndAfter {
import testImplicits._
after {
sqlContext.streams.active.foreach(_.stop())
}
test("SPARK-16020 Complete mode aggregation with console sink") {
withTempDir { checkpointLocation =>
val origOut = System.out
val stdout = new ByteArrayOutputStream()
try {
// Hook Java System.out.println
System.setOut(new PrintStream(stdout))
// Hook Scala println
Console.withOut(stdout) {
val input = MemoryStream[String]
val df = input.toDF().groupBy("value").count()
val query = df.writeStream
.format("console")
.outputMode("complete")
.option("checkpointLocation", checkpointLocation.getAbsolutePath)
.start()
input.addData("a")
query.processAllAvailable()
input.addData("a", "b")
query.processAllAvailable()
input.addData("a", "b", "c")
query.processAllAvailable()
query.stop()
}
System.out.flush()
} finally {
System.setOut(origOut)
}
val expected = """-------------------------------------------
|Batch: 0
|-------------------------------------------
|+-----+-----+
||value|count|
|+-----+-----+
|| a| 1|
|+-----+-----+
|
|-------------------------------------------
|Batch: 1
|-------------------------------------------
|+-----+-----+
||value|count|
|+-----+-----+
|| a| 2|
|| b| 1|
|+-----+-----+
|
|-------------------------------------------
|Batch: 2
|-------------------------------------------
|+-----+-----+
||value|count|
|+-----+-----+
|| a| 3|
|| b| 2|
|| c| 1|
|+-----+-----+
|
|""".stripMargin
assert(expected === new String(stdout.toByteArray, UTF_8))
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment