Skip to content
Snippets Groups Projects
Commit e55a105a authored by Lubo Zhang's avatar Lubo Zhang Committed by Shixiong Zhu
Browse files

[SPARK-20599][SS] ConsoleSink should work with (batch)

## What changes were proposed in this pull request?

Currently, if we read a batch and want to display it on the console sink, it will lead a runtime exception.

Changes:

- In this PR, we add a match rule to check whether it is a ConsoleSinkProvider, we will display the Dataset
 if using console format.

## How was this patch tested?

spark.read.schema().json(path).write.format("console").save

Author: Lubo Zhang <lubo.zhang@intel.com>
Author: lubozhan <lubo.zhang@intel.com>

Closes #18347 from lubozhan/dev.
parent 19331b8e
No related branches found
No related tags found
No related merge requests found
......@@ -19,8 +19,10 @@ package org.apache.spark.sql.execution.streaming
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types.StructType
class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
// Number of rows to display, by default 20 rows
......@@ -51,7 +53,14 @@ class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
}
}
class ConsoleSinkProvider extends StreamSinkProvider with DataSourceRegister {
case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
extends BaseRelation {
override def schema: StructType = data.schema
}
class ConsoleSinkProvider extends StreamSinkProvider
with DataSourceRegister
with CreatableRelationProvider {
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
......@@ -60,5 +69,20 @@ class ConsoleSinkProvider extends StreamSinkProvider with DataSourceRegister {
new ConsoleSink(parameters)
}
def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
// Number of rows to display, by default 20 rows
val numRowsToShow = parameters.get("numRows").map(_.toInt).getOrElse(20)
// Truncate the displayed data if it is too long, by default it is true
val isTruncated = parameters.get("truncate").map(_.toBoolean).getOrElse(true)
data.showInternal(numRowsToShow, isTruncated)
ConsoleRelation(sqlContext, data)
}
def shortName(): String = "console"
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment