diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index b7f35b3af4cfadc3481da9b0262e4023e9957f52..2a20651459d786e418b164b317ac0b97ad0b870d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -81,9 +81,9 @@ object CatalogStorageFormat { */ case class CatalogColumn( name: String, - // This may be null when used to create views. TODO: make this type-safe; this is left - // as a string due to issues in converting Hive varchars to and from SparkSQL strings. - @Nullable dataType: String, + // TODO: make this type-safe; this is left as a string due to issues in converting Hive + // varchars to and from SparkSQL strings. + dataType: String, nullable: Boolean = true, comment: Option[String] = None) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index b28ecb753f2263609d92e12c87a7dff7cf8e004c..8b6443c8b96f825461e3aaa895570914d8822bc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2421,13 +2421,7 @@ class Dataset[T] private[sql]( */ @throws[AnalysisException] def createTempView(viewName: String): Unit = withPlan { - val tableDesc = CatalogTable( - identifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName), - tableType = CatalogTableType.VIEW, - schema = Seq.empty[CatalogColumn], - storage = CatalogStorageFormat.empty) - CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = false, - isTemporary = true) + createViewCommand(viewName, replace = false) } /** @@ -2438,12 +2432,19 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ def createOrReplaceTempView(viewName: String): Unit = withPlan { - val tableDesc = CatalogTable( - identifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName), - tableType = CatalogTableType.VIEW, - schema = Seq.empty[CatalogColumn], - storage = CatalogStorageFormat.empty) - CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = true, + createViewCommand(viewName, replace = true) + } + + private def createViewCommand(viewName: String, replace: Boolean): CreateViewCommand = { + CreateViewCommand( + name = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName), + userSpecifiedColumns = Nil, + comment = None, + properties = Map.empty, + originalText = None, + child = logicalPlan, + allowExisting = false, + replace = replace, isTemporary = true) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 9b098018965c9dceffa2d72535c3f7f52533ed9b..5e1ad9b885b1ac7d897f4bb22ffd0eccaabfaa7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1235,20 +1235,21 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { if (ctx.identifierList != null) { operationNotAllowed("CREATE VIEW ... PARTITIONED ON", ctx) } else { - val identifiers = Option(ctx.identifierCommentList).toSeq.flatMap(_.identifierComment.asScala) - val schema = identifiers.map { ic => - CatalogColumn(ic.identifier.getText, null, nullable = true, Option(ic.STRING).map(string)) + val userSpecifiedColumns = Option(ctx.identifierCommentList).toSeq.flatMap { icl => + icl.identifierComment.asScala.map { ic => + ic.identifier.getText -> Option(ic.STRING).map(string) + } } createView( ctx, ctx.tableIdentifier, comment = Option(ctx.STRING).map(string), - schema, + userSpecifiedColumns, ctx.query, Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty), - ctx.EXISTS != null, - ctx.REPLACE != null, - ctx.TEMPORARY != null + allowExisting = ctx.EXISTS != null, + replace = ctx.REPLACE != null, + isTemporary = ctx.TEMPORARY != null ) } } @@ -1259,12 +1260,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) { createView( ctx, - ctx.tableIdentifier, + name = ctx.tableIdentifier, comment = None, - Seq.empty, - ctx.query, - Map.empty, - allowExist = false, + userSpecifiedColumns = Seq.empty, + query = ctx.query, + properties = Map.empty, + allowExisting = false, replace = true, isTemporary = false) } @@ -1276,23 +1277,23 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ctx: ParserRuleContext, name: TableIdentifierContext, comment: Option[String], - schema: Seq[CatalogColumn], + userSpecifiedColumns: Seq[(String, Option[String])], query: QueryContext, properties: Map[String, String], - allowExist: Boolean, + allowExisting: Boolean, replace: Boolean, isTemporary: Boolean): LogicalPlan = { - val sql = Option(source(query)) - val tableDesc = CatalogTable( - identifier = visitTableIdentifier(name), - tableType = CatalogTableType.VIEW, - schema = schema, - storage = CatalogStorageFormat.empty, - properties = properties, - viewOriginalText = sql, - viewText = sql, - comment = comment) - CreateViewCommand(tableDesc, plan(query), allowExist, replace, isTemporary) + val originalText = source(query) + CreateViewCommand( + visitTableIdentifier(name), + userSpecifiedColumns, + comment, + properties, + Some(originalText), + plan(query), + allowExisting = allowExisting, + replace = replace, + isTemporary = isTemporary) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 16b333a40288dbd04ed9ad0c771f09c7e6746154..312a1f691b2d0ca61759d6c5ba75f1e616b42ad6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} @@ -31,7 +31,13 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of * depending on Hive meta-store. * - * @param tableDesc the catalog table + * @param name the name of this view. + * @param userSpecifiedColumns the output column names and optional comments specified by users, + * can be Nil if not specified. + * @param comment the comment of this view. + * @param properties the properties of this view. + * @param originalText the original SQL text of this view, can be None if this view is created via + * Dataset API. * @param child the logical plan that represents the view; this is used to generate a canonicalized * version of the SQL that can be saved in the catalog. * @param allowExisting if true, and if the view already exists, noop; if false, and if the view @@ -44,7 +50,11 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} * unless they are specified with full qualified table name with database prefix. */ case class CreateViewCommand( - tableDesc: CatalogTable, + name: TableIdentifier, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + originalText: Option[String], child: LogicalPlan, allowExisting: Boolean, replace: Boolean, @@ -58,11 +68,9 @@ case class CreateViewCommand( override def output: Seq[Attribute] = Seq.empty[Attribute] - require(tableDesc.tableType == CatalogTableType.VIEW, - "The type of the table to created with CREATE VIEW must be 'CatalogTableType.VIEW'.") if (!isTemporary) { - require(tableDesc.viewText.isDefined, - "The table to created with CREATE VIEW must have 'viewText'.") + require(originalText.isDefined, + "The table to created with CREATE VIEW must have 'originalText'.") } if (allowExisting && replace) { @@ -76,8 +84,8 @@ case class CreateViewCommand( } // Temporary view names should NOT contain database prefix like "database.table" - if (isTemporary && tableDesc.identifier.database.isDefined) { - val database = tableDesc.identifier.database.get + if (isTemporary && name.database.isDefined) { + val database = name.database.get throw new AnalysisException( s"It is not allowed to add database prefix `$database` for the TEMPORARY view name.") } @@ -88,23 +96,23 @@ case class CreateViewCommand( qe.assertAnalyzed() val analyzedPlan = qe.analyzed - if (tableDesc.schema != Nil && tableDesc.schema.length != analyzedPlan.output.length) { + if (userSpecifiedColumns.nonEmpty && + userSpecifiedColumns.length != analyzedPlan.output.length) { throw new AnalysisException(s"The number of columns produced by the SELECT clause " + s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " + - s"specified by CREATE VIEW (num: `${tableDesc.schema.length}`).") + s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).") } val sessionState = sparkSession.sessionState if (isTemporary) { - createTemporaryView(tableDesc.identifier, sparkSession, analyzedPlan) + createTemporaryView(sparkSession, analyzedPlan) } else { // Adds default database for permanent table if it doesn't exist, so that tableExists() // only check permanent tables. - val database = tableDesc.identifier.database.getOrElse( - sessionState.catalog.getCurrentDatabase) - val tableIdentifier = tableDesc.identifier.copy(database = Option(database)) + val database = name.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val qualifiedName = name.copy(database = Option(database)) - if (sessionState.catalog.tableExists(tableIdentifier)) { + if (sessionState.catalog.tableExists(qualifiedName)) { if (allowExisting) { // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view // already exists. @@ -115,7 +123,7 @@ case class CreateViewCommand( // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already // exists. throw new AnalysisException( - s"View $tableIdentifier already exists. If you want to update the view definition, " + + s"View $qualifiedName already exists. If you want to update the view definition, " + "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS") } } else { @@ -127,25 +135,20 @@ case class CreateViewCommand( Seq.empty[Row] } - private def createTemporaryView( - table: TableIdentifier, sparkSession: SparkSession, analyzedPlan: LogicalPlan): Unit = { - - val sessionState = sparkSession.sessionState - val catalog = sessionState.catalog + private def createTemporaryView(sparkSession: SparkSession, analyzedPlan: LogicalPlan): Unit = { + val catalog = sparkSession.sessionState.catalog // Projects column names to alias names - val logicalPlan = { - if (tableDesc.schema.isEmpty) { - analyzedPlan - } else { - val projectList = analyzedPlan.output.zip(tableDesc.schema).map { - case (attr, col) => Alias(attr, col.name)() - } - sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed + val logicalPlan = if (userSpecifiedColumns.isEmpty) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { + case (attr, (colName, _)) => Alias(attr, colName)() } + sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed } - catalog.createTempView(table.table, logicalPlan, replace) + catalog.createTempView(name.table, logicalPlan, replace) } /** @@ -154,15 +157,14 @@ case class CreateViewCommand( */ private def prepareTable(sparkSession: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = { val viewSQL: String = { - val logicalPlan = - if (tableDesc.schema.isEmpty) { - analyzedPlan - } else { - val projectList = analyzedPlan.output.zip(tableDesc.schema).map { - case (attr, col) => Alias(attr, col.name)() - } - sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed + val logicalPlan = if (userSpecifiedColumns.isEmpty) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { + case (attr, (colName, _)) => Alias(attr, colName)() } + sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed + } new SQLBuilder(logicalPlan).toSQL } @@ -176,21 +178,26 @@ case class CreateViewCommand( "Failed to analyze the canonicalized SQL. It is possible there is a bug in Spark.", e) } - val viewSchema: Seq[CatalogColumn] = { - if (tableDesc.schema.isEmpty) { - analyzedPlan.output.map { a => - CatalogColumn(a.name, a.dataType.catalogString) - } - } else { - analyzedPlan.output.zip(tableDesc.schema).map { case (a, col) => - CatalogColumn(col.name, a.dataType.catalogString, nullable = true, col.comment) - } + val viewSchema = if (userSpecifiedColumns.isEmpty) { + analyzedPlan.output.map { a => + CatalogColumn(a.name, a.dataType.catalogString) + } + } else { + analyzedPlan.output.zip(userSpecifiedColumns).map { + case (a, (name, comment)) => + CatalogColumn(name, a.dataType.catalogString, comment = comment) } } - tableDesc.copy(schema = viewSchema, viewText = Some(viewSQL)) + CatalogTable( + identifier = name, + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = viewSchema, + properties = properties, + viewOriginalText = originalText, + viewText = Some(viewSQL), + comment = comment + ) } - - /** Escape backtick with double-backtick in column name and wrap it with backtick. */ - private def quote(name: String) = s"`${name.replaceAll("`", "``")}`" } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d308a31061fb12f0785ead34cf5d3fd1f268069f..db970785a716bc9cdebcb8a22c2c2650af44a7e6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -171,8 +171,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } else if (table.tableType == CatalogTableType.VIEW) { val viewText = table.viewText.getOrElse(sys.error("Invalid view without text.")) alias match { - // because hive use things like `_c0` to build the expanded text - // currently we cannot support view from "create view v1(c1) as ..." case None => SubqueryAlias(table.identifier.table, sparkSession.sessionState.sqlParser.parsePlan(viewText)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 9d99d960accc3abd50419753c1ee83dd6b7d30d5..a708434f5e13a6db56aef6636ad63b298565c949 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -37,7 +37,6 @@ class HiveDDLCommandSuite extends PlanTest { parser.parsePlan(sql).collect { case c: CreateTableCommand => (c.table, c.ifNotExists) case c: CreateHiveTableAsSelectLogicalPlan => (c.tableDesc, c.allowExisting) - case c: CreateViewCommand => (c.tableDesc, c.allowExisting) }.head } @@ -470,47 +469,30 @@ class HiveDDLCommandSuite extends PlanTest { test("create view -- basic") { val v1 = "CREATE VIEW view1 AS SELECT * FROM tab1" - val (desc, exists) = extractTableDesc(v1) - assert(!exists) - assert(desc.identifier.database.isEmpty) - assert(desc.identifier.table == "view1") - assert(desc.tableType == CatalogTableType.VIEW) - assert(desc.storage.locationUri.isEmpty) - assert(desc.schema == Seq.empty[CatalogColumn]) - assert(desc.viewText == Option("SELECT * FROM tab1")) - assert(desc.viewOriginalText == Option("SELECT * FROM tab1")) - assert(desc.storage.properties == Map()) - assert(desc.storage.inputFormat.isEmpty) - assert(desc.storage.outputFormat.isEmpty) - assert(desc.storage.serde.isEmpty) - assert(desc.properties == Map()) + val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand] + assert(!command.allowExisting) + assert(command.name.database.isEmpty) + assert(command.name.table == "view1") + assert(command.originalText == Some("SELECT * FROM tab1")) + assert(command.userSpecifiedColumns.isEmpty) } test("create view - full") { val v1 = """ |CREATE OR REPLACE VIEW view1 - |(col1, col3) + |(col1, col3 COMMENT 'hello') |COMMENT 'BLABLA' |TBLPROPERTIES('prop1Key'="prop1Val") |AS SELECT * FROM tab1 """.stripMargin - val (desc, exists) = extractTableDesc(v1) - assert(desc.identifier.database.isEmpty) - assert(desc.identifier.table == "view1") - assert(desc.tableType == CatalogTableType.VIEW) - assert(desc.storage.locationUri.isEmpty) - assert(desc.schema == - CatalogColumn("col1", null, nullable = true, None) :: - CatalogColumn("col3", null, nullable = true, None) :: Nil) - assert(desc.viewText == Option("SELECT * FROM tab1")) - assert(desc.viewOriginalText == Option("SELECT * FROM tab1")) - assert(desc.storage.properties == Map()) - assert(desc.storage.inputFormat.isEmpty) - assert(desc.storage.outputFormat.isEmpty) - assert(desc.storage.serde.isEmpty) - assert(desc.properties == Map("prop1Key" -> "prop1Val")) - assert(desc.comment == Option("BLABLA")) + val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand] + assert(command.name.database.isEmpty) + assert(command.name.table == "view1") + assert(command.userSpecifiedColumns == Seq("col1" -> None, "col3" -> Some("hello"))) + assert(command.originalText == Some("SELECT * FROM tab1")) + assert(command.properties == Map("prop1Key" -> "prop1Val")) + assert(command.comment == Some("BLABLA")) } test("create view -- partitioned view") {