From e55a105ae04f1d1c35ee8f02005a3ab71d789124 Mon Sep 17 00:00:00 2001 From: Lubo Zhang <lubo.zhang@intel.com> Date: Thu, 22 Jun 2017 11:18:58 -0700 Subject: [PATCH] [SPARK-20599][SS] ConsoleSink should work with (batch) ## What changes were proposed in this pull request? Currently, if we read a batch and want to display it on the console sink, it will lead a runtime exception. Changes: - In this PR, we add a match rule to check whether it is a ConsoleSinkProvider, we will display the Dataset if using console format. ## How was this patch tested? spark.read.schema().json(path).write.format("console").save Author: Lubo Zhang <lubo.zhang@intel.com> Author: lubozhan <lubo.zhang@intel.com> Closes #18347 from lubozhan/dev. --- .../sql/execution/streaming/console.scala | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index 38c6319110..9e889ff679 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -19,8 +19,10 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, StreamSinkProvider} import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.types.StructType class ConsoleSink(options: Map[String, String]) extends Sink with Logging { // Number of rows to display, by default 20 rows @@ -51,7 +53,14 @@ class ConsoleSink(options: Map[String, String]) extends Sink with Logging { } } -class ConsoleSinkProvider extends StreamSinkProvider with DataSourceRegister { +case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame) + extends BaseRelation { + override def schema: StructType = data.schema +} + +class ConsoleSinkProvider extends StreamSinkProvider + with DataSourceRegister + with CreatableRelationProvider { def createSink( sqlContext: SQLContext, parameters: Map[String, String], @@ -60,5 +69,20 @@ class ConsoleSinkProvider extends StreamSinkProvider with DataSourceRegister { new ConsoleSink(parameters) } + def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { + // Number of rows to display, by default 20 rows + val numRowsToShow = parameters.get("numRows").map(_.toInt).getOrElse(20) + + // Truncate the displayed data if it is too long, by default it is true + val isTruncated = parameters.get("truncate").map(_.toBoolean).getOrElse(true) + data.showInternal(numRowsToShow, isTruncated) + + ConsoleRelation(sqlContext, data) + } + def shortName(): String = "console" } -- GitLab