diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index e7bbc7d5db49397fd11dcc91352cdf37df381ace..8f0f8910b36ab2751de2ff55a74666768a803e4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -319,6 +319,15 @@ private[spark] object SQLConf { doc = "When true, some predicates will be pushed down into the Hive metastore so that " + "unmatching partitions can be eliminated earlier.") + val CANONICALIZE_VIEW = booleanConf("spark.sql.canonicalizeView", + defaultValue = Some(false), + doc = "When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands. " + + "Note that this function is experimental and should ony be used when you are using " + + "non-hive-compatible tables written by Spark SQL. The SQL string used to create " + + "view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " + + "possible, or you may get wrong result.", + isPublic = false) + val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord", defaultValue = Some("_corrupt_record"), doc = "<TODO>") @@ -362,7 +371,7 @@ private[spark] object SQLConf { val PARTITION_DISCOVERY_ENABLED = booleanConf("spark.sql.sources.partitionDiscovery.enabled", defaultValue = Some(true), - doc = "When true, automtically discover data partitions.") + doc = "When true, automatically discover data partitions.") val PARTITION_COLUMN_TYPE_INFERENCE = booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled", @@ -372,7 +381,7 @@ private[spark] object SQLConf { val PARTITION_MAX_FILES = intConf("spark.sql.sources.maxConcurrentWrites", defaultValue = Some(5), - doc = "The maximum number of concurent files to open before falling back on sorting when " + + doc = "The maximum number of concurrent files to open before falling back on sorting when " + "writing out files using dynamic partitioning.") // The output committer class used by HadoopFsRelation. The specified class needs to be a @@ -471,6 +480,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING) + private[spark] def canonicalizeView: Boolean = getConf(CANONICALIZE_VIEW) + private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN) private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, getConf(TUNGSTEN_ENABLED)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index ea1521a48c8a7189eeb0a7fbcd3a5df24d1c369b..cf59bc0d590b0e4aff6901511a2d898aa668cc9d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource} import org.apache.spark.sql.execution.{FileRelation, datasources} import org.apache.spark.sql.hive.client._ +import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode} @@ -588,6 +589,28 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive // Wait until children are resolved. case p: LogicalPlan if !p.childrenResolved => p case p: LogicalPlan if p.resolved => p + + case CreateViewAsSelect(table, child, allowExisting, replace, sql) => + if (conf.canonicalizeView) { + if (allowExisting && replace) { + throw new AnalysisException( + "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.") + } + + val (dbName, tblName) = processDatabaseAndTableName( + table.specifiedDatabase.getOrElse(client.currentDatabase), table.name) + + execution.CreateViewAsSelect( + table.copy( + specifiedDatabase = Some(dbName), + name = tblName), + child.output, + allowExisting, + replace) + } else { + HiveNativeCommand(sql) + } + case p @ CreateTableAsSelect(table, child, allowExisting) => val schema = if (table.schema.nonEmpty) { table.schema diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 256440a9a2e97a90725ff19547c8efd13babea89..2bf22f54496411b567fe691964f7036a63cf60e4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -77,6 +77,16 @@ private[hive] case class CreateTableAsSelect( childrenResolved } +private[hive] case class CreateViewAsSelect( + tableDesc: HiveTable, + child: LogicalPlan, + allowExisting: Boolean, + replace: Boolean, + sql: String) extends UnaryNode with Command { + override def output: Seq[Attribute] = Seq.empty[Attribute] + override lazy val resolved: Boolean = false +} + /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */ private[hive] object HiveQl extends Logging { protected val nativeCommands = Seq( @@ -99,7 +109,6 @@ private[hive] object HiveQl extends Logging { "TOK_ALTERTABLE_SKEWED", "TOK_ALTERTABLE_TOUCH", "TOK_ALTERTABLE_UNARCHIVE", - "TOK_ALTERVIEW", "TOK_ALTERVIEW_ADDPARTS", "TOK_ALTERVIEW_AS", "TOK_ALTERVIEW_DROPPARTS", @@ -110,7 +119,6 @@ private[hive] object HiveQl extends Logging { "TOK_CREATEFUNCTION", "TOK_CREATEINDEX", "TOK_CREATEROLE", - "TOK_CREATEVIEW", "TOK_DESCDATABASE", "TOK_DESCFUNCTION", @@ -254,12 +262,17 @@ private[hive] object HiveQl extends Logging { * Otherwise, there will be Null pointer exception, * when retrieving properties form HiveConf. */ - val hContext = new Context(SessionState.get().getConf()) - val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext)) + val hContext = createContext() + val node = getAst(sql, hContext) hContext.clear() node } + private def createContext(): Context = new Context(SessionState.get().getConf()) + + private def getAst(sql: String, context: Context) = + ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, context)) + /** * Returns the HiveConf */ @@ -280,15 +293,18 @@ private[hive] object HiveQl extends Logging { /** Creates LogicalPlan for a given HiveQL string. */ def createPlan(sql: String): LogicalPlan = { try { - val tree = getAst(sql) - if (nativeCommands contains tree.getText) { + val context = createContext() + val tree = getAst(sql, context) + val plan = if (nativeCommands contains tree.getText) { HiveNativeCommand(sql) } else { - nodeToPlan(tree) match { + nodeToPlan(tree, context) match { case NativePlaceholder => HiveNativeCommand(sql) case other => other } } + context.clear() + plan } catch { case pe: org.apache.hadoop.hive.ql.parse.ParseException => pe.getMessage match { @@ -342,7 +358,9 @@ private[hive] object HiveQl extends Logging { } } - protected def getClauses(clauseNames: Seq[String], nodeList: Seq[ASTNode]): Seq[Option[Node]] = { + protected def getClauses( + clauseNames: Seq[String], + nodeList: Seq[ASTNode]): Seq[Option[ASTNode]] = { var remainingNodes = nodeList val clauses = clauseNames.map { clauseName => val (matches, nonMatches) = remainingNodes.partition(_.getText.toUpperCase == clauseName) @@ -489,7 +507,43 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } } - protected def nodeToPlan(node: Node): LogicalPlan = node match { + private def createView( + view: ASTNode, + context: Context, + viewNameParts: ASTNode, + query: ASTNode, + schema: Seq[HiveColumn], + properties: Map[String, String], + allowExist: Boolean, + replace: Boolean): CreateViewAsSelect = { + val (db, viewName) = extractDbNameTableName(viewNameParts) + + val originalText = context.getTokenRewriteStream + .toString(query.getTokenStartIndex, query.getTokenStopIndex) + + val tableDesc = HiveTable( + specifiedDatabase = db, + name = viewName, + schema = schema, + partitionColumns = Seq.empty[HiveColumn], + properties = properties, + serdeProperties = Map[String, String](), + tableType = VirtualView, + location = None, + inputFormat = None, + outputFormat = None, + serde = None, + viewText = Some(originalText)) + + // We need to keep the original SQL string so that if `spark.sql.canonicalizeView` is + // false, we can fall back to use hive native command later. + // We can remove this when parser is configurable(can access SQLConf) in the future. + val sql = context.getTokenRewriteStream + .toString(view.getTokenStartIndex, view.getTokenStopIndex) + CreateViewAsSelect(tableDesc, nodeToPlan(query, context), allowExist, replace, sql) + } + + protected def nodeToPlan(node: ASTNode, context: Context): LogicalPlan = node match { // Special drop table that also uncaches. case Token("TOK_DROPTABLE", Token("TOK_TABNAME", tableNameParts) :: @@ -521,14 +575,14 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val Some(crtTbl) :: _ :: extended :: Nil = getClauses(Seq("TOK_CREATETABLE", "FORMATTED", "EXTENDED"), explainArgs) ExplainCommand( - nodeToPlan(crtTbl), + nodeToPlan(crtTbl, context), extended = extended.isDefined) case Token("TOK_EXPLAIN", explainArgs) => // Ignore FORMATTED if present. val Some(query) :: _ :: extended :: Nil = getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs) ExplainCommand( - nodeToPlan(query), + nodeToPlan(query, context), extended = extended.isDefined) case Token("TOK_DESCTABLE", describeArgs) => @@ -563,6 +617,73 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } } + case view @ Token("TOK_ALTERVIEW", children) => + val Some(viewNameParts) :: maybeQuery :: ignores = + getClauses(Seq( + "TOK_TABNAME", + "TOK_QUERY", + "TOK_ALTERVIEW_ADDPARTS", + "TOK_ALTERVIEW_DROPPARTS", + "TOK_ALTERVIEW_PROPERTIES", + "TOK_ALTERVIEW_RENAME"), children) + + // if ALTER VIEW doesn't have query part, let hive to handle it. + maybeQuery.map { query => + createView(view, context, viewNameParts, query, Nil, Map(), false, true) + }.getOrElse(NativePlaceholder) + + case view @ Token("TOK_CREATEVIEW", children) + if children.collect { case t @ Token("TOK_QUERY", _) => t }.nonEmpty => + val Seq( + Some(viewNameParts), + Some(query), + maybeComment, + replace, + allowExisting, + maybeProperties, + maybeColumns, + maybePartCols + ) = getClauses(Seq( + "TOK_TABNAME", + "TOK_QUERY", + "TOK_TABLECOMMENT", + "TOK_ORREPLACE", + "TOK_IFNOTEXISTS", + "TOK_TABLEPROPERTIES", + "TOK_TABCOLNAME", + "TOK_VIEWPARTCOLS"), children) + + // If the view is partitioned, we let hive handle it. + if (maybePartCols.isDefined) { + NativePlaceholder + } else { + val schema = maybeColumns.map { cols => + BaseSemanticAnalyzer.getColumns(cols, true).asScala.map { field => + // We can't specify column types when create view, so fill it with null first, and + // update it after the schema has been resolved later. + HiveColumn(field.getName, null, field.getComment) + } + }.getOrElse(Seq.empty[HiveColumn]) + + val properties = scala.collection.mutable.Map.empty[String, String] + + maybeProperties.foreach { + case Token("TOK_TABLEPROPERTIES", list :: Nil) => + properties ++= getProperties(list) + } + + maybeComment.foreach { + case Token("TOK_TABLECOMMENT", child :: Nil) => + val comment = BaseSemanticAnalyzer.unescapeSQLString(child.getText) + if (comment ne null) { + properties += ("comment" -> comment) + } + } + + createView(view, context, viewNameParts, query, schema, properties.toMap, + allowExisting.isDefined, replace.isDefined) + } + case Token("TOK_CREATETABLE", children) if children.collect { case t @ Token("TOK_QUERY", _) => t }.nonEmpty => // Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL @@ -774,7 +895,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case _ => // Unsupport features } - CreateTableAsSelect(tableDesc, nodeToPlan(query), allowExisting != None) + CreateTableAsSelect(tableDesc, nodeToPlan(query, context), allowExisting != None) // If its not a "CTAS" like above then take it as a native command case Token("TOK_CREATETABLE", _) => NativePlaceholder @@ -793,7 +914,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C insertClauses.last match { case Token("TOK_CTE", cteClauses) => val cteRelations = cteClauses.map(node => { - val relation = nodeToRelation(node).asInstanceOf[Subquery] + val relation = nodeToRelation(node, context).asInstanceOf[Subquery] (relation.alias, relation) }).toMap (Some(args.head), insertClauses.init, Some(cteRelations)) @@ -847,7 +968,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } val relations = fromClause match { - case Some(f) => nodeToRelation(f) + case Some(f) => nodeToRelation(f, context) case None => OneRowRelation } @@ -1094,7 +1215,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C cteRelations.map(With(query, _)).getOrElse(query) // HIVE-9039 renamed TOK_UNION => TOK_UNIONALL while adding TOK_UNIONDISTINCT - case Token("TOK_UNIONALL", left :: right :: Nil) => Union(nodeToPlan(left), nodeToPlan(right)) + case Token("TOK_UNIONALL", left :: right :: Nil) => + Union(nodeToPlan(left, context), nodeToPlan(right, context)) case a: ASTNode => throw new NotImplementedError(s"No parse rules for $node:\n ${dumpTree(a).toString} ") @@ -1102,10 +1224,10 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C val allJoinTokens = "(TOK_.*JOIN)".r val laterViewToken = "TOK_LATERAL_VIEW(.*)".r - def nodeToRelation(node: Node): LogicalPlan = node match { + def nodeToRelation(node: Node, context: Context): LogicalPlan = node match { case Token("TOK_SUBQUERY", query :: Token(alias, Nil) :: Nil) => - Subquery(cleanIdentifier(alias), nodeToPlan(query)) + Subquery(cleanIdentifier(alias), nodeToPlan(query, context)) case Token(laterViewToken(isOuter), selectClause :: relationClause :: Nil) => val Token("TOK_SELECT", @@ -1121,7 +1243,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C outer = isOuter.nonEmpty, Some(alias.toLowerCase), attributes.map(UnresolvedAttribute(_)), - nodeToRelation(relationClause)) + nodeToRelation(relationClause, context)) /* All relations, possibly with aliases or sampling clauses. */ case Token("TOK_TABREF", clauses) => @@ -1189,7 +1311,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C }.map(_._2) val isPreserved = tableOrdinals.map(i => (i - 1 < 0) || joinArgs(i - 1).getText == "PRESERVE") - val tables = tableOrdinals.map(i => nodeToRelation(joinArgs(i))) + val tables = tableOrdinals.map(i => nodeToRelation(joinArgs(i), context)) val joinExpressions = tableOrdinals.map(i => joinArgs(i + 1).getChildren.asScala.map(nodeToExpr)) @@ -1244,8 +1366,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case "TOK_FULLOUTERJOIN" => FullOuter case "TOK_LEFTSEMIJOIN" => LeftSemi } - Join(nodeToRelation(relation1), - nodeToRelation(relation2), + Join(nodeToRelation(relation1, context), + nodeToRelation(relation2, context), joinType, other.headOption.map(nodeToExpr)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala index 3811c152a7ae6c15f96dbd0000265d8dcd8dffaa..915eae9d21e23d8181286046288283e86e4fd097 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala @@ -19,13 +19,12 @@ package org.apache.spark.sql.hive.client import java.io.PrintStream import java.util.{Map => JMap} +import javax.annotation.Nullable import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} import org.apache.spark.sql.catalyst.expressions.Expression -private[hive] case class HiveDatabase( - name: String, - location: String) +private[hive] case class HiveDatabase(name: String, location: String) private[hive] abstract class TableType { val name: String } private[hive] case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" } @@ -45,7 +44,7 @@ private[hive] case class HivePartition( values: Seq[String], storage: HiveStorageDescriptor) -private[hive] case class HiveColumn(name: String, hiveType: String, comment: String) +private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String) private[hive] case class HiveTable( specifiedDatabase: Option[String], name: String, @@ -126,6 +125,12 @@ private[hive] trait ClientInterface { /** Returns the metadata for the specified table or None if it doens't exist. */ def getTableOption(dbName: String, tableName: String): Option[HiveTable] + /** Creates a view with the given metadata. */ + def createView(view: HiveTable): Unit + + /** Updates the given view with new metadata. */ + def alertView(view: HiveTable): Unit + /** Creates a table with the given metadata. */ def createTable(table: HiveTable): Unit diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 4d1e3ed9198e684653a8fe01510d56e5f9e9899e..8f6d448b2aef4cfadb5bdbfc31fb2f2dda52ed44 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -354,6 +354,37 @@ private[hive] class ClientWrapper( qlTable } + private def toViewTable(view: HiveTable): metadata.Table = { + // TODO: this is duplicated with `toQlTable` except the table type stuff. + val tbl = new metadata.Table(view.database, view.name) + tbl.setTableType(HTableType.VIRTUAL_VIEW) + tbl.setSerializationLib(null) + tbl.clearSerDeInfo() + + // TODO: we will save the same SQL string to original and expanded text, which is different + // from Hive. + tbl.setViewOriginalText(view.viewText.get) + tbl.setViewExpandedText(view.viewText.get) + + tbl.setFields(view.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava) + view.properties.foreach { case (k, v) => tbl.setProperty(k, v) } + + // set owner + tbl.setOwner(conf.getUser) + // set create time + tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int]) + + tbl + } + + override def createView(view: HiveTable): Unit = withHiveState { + client.createTable(toViewTable(view)) + } + + override def alertView(view: HiveTable): Unit = withHiveState { + client.alterTable(view.qualifiedName, toViewTable(view)) + } + override def createTable(table: HiveTable): Unit = withHiveState { val qlTable = toQlTable(table) client.createTable(qlTable) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala new file mode 100644 index 0000000000000000000000000000000000000000..2b504ac974f071433883b5531330c5e47e7d6e22 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext} +import org.apache.spark.sql.{AnalysisException, Row, SQLContext} +import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable} + +/** + * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of + * depending on Hive meta-store. + */ +// TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is different +// from Hive and may not work for some cases like create view on self join. +private[hive] case class CreateViewAsSelect( + tableDesc: HiveTable, + childSchema: Seq[Attribute], + allowExisting: Boolean, + orReplace: Boolean) extends RunnableCommand { + + assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length) + assert(tableDesc.viewText.isDefined) + + override def run(sqlContext: SQLContext): Seq[Row] = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] + val database = tableDesc.database + val viewName = tableDesc.name + + if (hiveContext.catalog.tableExists(Seq(database, viewName))) { + if (allowExisting) { + // view already exists, will do nothing, to keep consistent with Hive + } else if (orReplace) { + hiveContext.catalog.client.alertView(prepareTable()) + } else { + throw new AnalysisException(s"View $database.$viewName already exists. " + + "If you want to update the view definition, please use ALTER VIEW AS or " + + "CREATE OR REPLACE VIEW AS") + } + } else { + hiveContext.catalog.client.createView(prepareTable()) + } + + Seq.empty[Row] + } + + private def prepareTable(): HiveTable = { + // setup column types according to the schema of child. + val schema = if (tableDesc.schema == Nil) { + childSchema.map { attr => + HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null) + } + } else { + childSchema.zip(tableDesc.schema).map { case (attr, col) => + HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment) + } + } + + val columnNames = childSchema.map(f => verbose(f.name)) + + // When user specified column names for view, we should create a project to do the renaming. + // When no column name specified, we still need to create a project to declare the columns + // we need, to make us more robust to top level `*`s. + val projectList = if (tableDesc.schema == Nil) { + columnNames.mkString(", ") + } else { + columnNames.zip(tableDesc.schema.map(f => verbose(f.name))).map { + case (name, alias) => s"$name AS $alias" + }.mkString(", ") + } + + val viewName = verbose(tableDesc.name) + + val expandedText = s"SELECT $projectList FROM (${tableDesc.viewText.get}) $viewName" + + tableDesc.copy(schema = schema, viewText = Some(expandedText)) + } + + // escape backtick with double-backtick in column name and wrap it with backtick. + private def verbose(name: String) = s"`${name.replaceAll("`", "``")}`" +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 8c3f9ac20263786a2fd4b3002f569b7f160aa7ad..ec5b83b98e4013661926ce550d1aab1eba759511 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1248,4 +1248,121 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { """.stripMargin), Row("b", 6.0) :: Row("a", 7.0) :: Nil) } } + + test("correctly parse CREATE VIEW statement") { + withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") { + withTable("jt") { + val df = (1 until 10).map(i => i -> i).toDF("i", "j") + df.write.format("json").saveAsTable("jt") + sql( + """CREATE VIEW IF NOT EXISTS + |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla') + |COMMENT 'blabla' + |TBLPROPERTIES ('a' = 'b') + |AS SELECT * FROM jt""".stripMargin) + checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i))) + sql("DROP VIEW testView") + } + } + } + + test("correctly handle CREATE VIEW IF NOT EXISTS") { + withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") { + withTable("jt", "jt2") { + sqlContext.range(1, 10).write.format("json").saveAsTable("jt") + sql("CREATE VIEW testView AS SELECT id FROM jt") + + val df = (1 until 10).map(i => i -> i).toDF("i", "j") + df.write.format("json").saveAsTable("jt2") + sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2") + + // make sure our view doesn't change. + checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) + sql("DROP VIEW testView") + } + } + } + + test("correctly handle CREATE OR REPLACE VIEW") { + withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") { + withTable("jt", "jt2") { + sqlContext.range(1, 10).write.format("json").saveAsTable("jt") + sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt") + checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) + + val df = (1 until 10).map(i => i -> i).toDF("i", "j") + df.write.format("json").saveAsTable("jt2") + sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2") + // make sure the view has been changed. + checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i))) + + sql("DROP VIEW testView") + + val e = intercept[AnalysisException] { + sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt") + } + assert(e.message.contains("not allowed to define a view")) + } + } + } + + test("correctly handle ALTER VIEW") { + withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") { + withTable("jt", "jt2") { + sqlContext.range(1, 10).write.format("json").saveAsTable("jt") + sql("CREATE VIEW testView AS SELECT id FROM jt") + + val df = (1 until 10).map(i => i -> i).toDF("i", "j") + df.write.format("json").saveAsTable("jt2") + sql("ALTER VIEW testView AS SELECT * FROM jt2") + // make sure the view has been changed. + checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i))) + + sql("DROP VIEW testView") + } + } + } + + test("create hive view for json table") { + // json table is not hive-compatible, make sure the new flag fix it. + withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") { + withTable("jt") { + sqlContext.range(1, 10).write.format("json").saveAsTable("jt") + sql("CREATE VIEW testView AS SELECT id FROM jt") + checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) + sql("DROP VIEW testView") + } + } + } + + test("create hive view for partitioned parquet table") { + // partitioned parquet table is not hive-compatible, make sure the new flag fix it. + withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") { + withTable("parTable") { + val df = Seq(1 -> "a").toDF("i", "j") + df.write.format("parquet").partitionBy("i").saveAsTable("parTable") + sql("CREATE VIEW testView AS SELECT i, j FROM parTable") + checkAnswer(sql("SELECT * FROM testView"), Row(1, "a")) + sql("DROP VIEW testView") + } + } + } + + test("create hive view for joined tables") { + // make sure the new flag can handle some complex cases like join and schema change. + withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") { + withTable("jt1", "jt2") { + sqlContext.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") + sqlContext.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") + sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") + checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i))) + + val df = (1 until 10).map(i => i -> i).toDF("id1", "newCol") + df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("jt1") + checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i))) + + sql("DROP VIEW testView") + } + } + } }