Skip to content
Snippets Groups Projects
Commit f1b220ee authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Reynold Xin
Browse files

[SPARK-15553][SQL] Dataset.createTempView should use CreateViewCommand

## What changes were proposed in this pull request?

Let `Dataset.createTempView` and `Dataset.createOrReplaceTempView` use `CreateViewCommand`, rather than calling `SparkSession.createTempView`. Besides, this patch also removes `SparkSession.createTempView`.

## How was this patch tested?
Existing tests.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #13327 from viirya/dataset-createtempview.
parent 73178c75
No related branches found
No related tags found
No related merge requests found
......@@ -50,6 +50,11 @@ case class CatalogStorageFormat(
compressed: Boolean,
serdeProperties: Map[String, String])
object CatalogStorageFormat {
/** Empty storage format for default values and copies. */
val EmptyStorageFormat = CatalogStorageFormat(locationUri = None, inputFormat = None,
outputFormat = None, serde = None, compressed = false, serdeProperties = Map.empty)
}
/**
* A column in a table.
......
......@@ -35,6 +35,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
......@@ -44,7 +45,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
......@@ -2329,8 +2330,14 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
@throws[AnalysisException]
def createTempView(viewName: String): Unit = {
sparkSession.createTempView(viewName, toDF(), replaceIfExists = false)
def createTempView(viewName: String): Unit = withPlan {
val tableDesc = CatalogTable(
identifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
tableType = CatalogTableType.VIEW,
schema = Seq.empty[CatalogColumn],
storage = CatalogStorageFormat.EmptyStorageFormat)
CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = false,
isTemporary = true, sql = "")
}
/**
......@@ -2340,8 +2347,14 @@ class Dataset[T] private[sql](
* @group basic
* @since 2.0.0
*/
def createOrReplaceTempView(viewName: String): Unit = {
sparkSession.createTempView(viewName, toDF(), replaceIfExists = true)
def createOrReplaceTempView(viewName: String): Unit = withPlan {
val tableDesc = CatalogTable(
identifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
tableType = CatalogTableType.VIEW,
schema = Seq.empty[CatalogColumn],
storage = CatalogStorageFormat.EmptyStorageFormat)
CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = true,
isTemporary = true, sql = "")
}
/**
......
......@@ -552,7 +552,7 @@ class SQLContext private[sql](val sparkSession: SparkSession)
* only during the lifetime of this instance of SQLContext.
*/
private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
sparkSession.createTempView(tableName, df, replaceIfExists = true)
df.createOrReplaceTempView(tableName)
}
/**
......
......@@ -583,17 +583,6 @@ class SparkSession private(
Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent))
}
/**
* Creates a temporary view with a DataFrame. The lifetime of this temporary view is tied to
* this [[SparkSession]].
*/
private[sql] def createTempView(
viewName: String, df: DataFrame, replaceIfExists: Boolean) = {
sessionState.catalog.createTempView(
sessionState.sqlParser.parseTableIdentifier(viewName).table,
df.logicalPlan, replaceIfExists)
}
/* ----------------- *
| Everything else |
* ----------------- */
......
......@@ -902,8 +902,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
.getOrElse(EmptyStorageFormat)
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
.getOrElse(CatalogStorageFormat.EmptyStorageFormat)
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat)
.getOrElse(CatalogStorageFormat.EmptyStorageFormat)
val location = Option(ctx.locationSpec).map(visitLocationSpec)
// If we are creating an EXTERNAL table, then the LOCATION field is required
if (external && location.isEmpty) {
......@@ -976,15 +977,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
}
/** Empty storage format for default values and copies. */
private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, false, Map.empty)
/**
* Create a [[CatalogStorageFormat]].
*/
override def visitTableFileFormat(
ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
EmptyStorageFormat.copy(
CatalogStorageFormat.EmptyStorageFormat.copy(
inputFormat = Option(string(ctx.inFmt)),
outputFormat = Option(string(ctx.outFmt)))
}
......@@ -997,7 +995,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
val source = ctx.identifier.getText
HiveSerDe.sourceToSerDe(source, conf) match {
case Some(s) =>
EmptyStorageFormat.copy(
CatalogStorageFormat.EmptyStorageFormat.copy(
inputFormat = s.inputFormat,
outputFormat = s.outputFormat,
serde = s.serde)
......@@ -1037,7 +1035,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
override def visitRowFormatSerde(
ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) {
import ctx._
EmptyStorageFormat.copy(
CatalogStorageFormat.EmptyStorageFormat.copy(
serde = Option(string(name)),
serdeProperties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
}
......@@ -1067,7 +1065,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx)
"line.delim" -> value
}
EmptyStorageFormat.copy(serdeProperties = entries.toMap)
CatalogStorageFormat.EmptyStorageFormat.copy(serdeProperties = entries.toMap)
}
/**
......@@ -1181,7 +1179,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
identifier = visitTableIdentifier(name),
tableType = CatalogTableType.VIEW,
schema = schema,
storage = EmptyStorageFormat,
storage = CatalogStorageFormat.EmptyStorageFormat,
properties = properties,
viewOriginalText = sql,
viewText = sql,
......
......@@ -30,8 +30,7 @@ case class CacheTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
plan.foreach { logicalPlan =>
sparkSession.createTempView(
tableName, Dataset.ofRows(sparkSession, logicalPlan), replaceIfExists = true)
Dataset.ofRows(sparkSession, logicalPlan).createOrReplaceTempView(tableName)
}
sparkSession.catalog.cacheTable(tableName)
......
......@@ -57,8 +57,12 @@ case class CreateViewCommand(
override def output: Seq[Attribute] = Seq.empty[Attribute]
require(tableDesc.tableType == CatalogTableType.VIEW)
require(tableDesc.viewText.isDefined)
require(tableDesc.tableType == CatalogTableType.VIEW,
"The type of the table to created with CREATE VIEW must be 'CatalogTableType.VIEW'.")
if (!isTemporary) {
require(tableDesc.viewText.isDefined,
"The table to created with CREATE VIEW must have 'viewText'.")
}
if (allowExisting && replace) {
throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment