From c06110187b3e41405fc13aba367abdd4183ed9a6 Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@databricks.com>
Date: Fri, 22 Apr 2016 20:30:51 -0700
Subject: [PATCH] [SPARK-14842][SQL] Implement view creation in sql/core

## What changes were proposed in this pull request?
This patch re-implements view creation command in sql/core, based on the pre-existing view creation command in the Hive module. This consolidates the view creation logical command and physical command into a single one, called CreateViewCommand.

## How was this patch tested?
All the code should've been tested by existing tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #12615 from rxin/SPARK-14842-2.
---
 .../org/apache/spark/sql/types/DataType.scala |   3 +
 .../spark/sql/types/UserDefinedType.scala     |   2 +
 .../spark/sql/execution/SparkSqlParser.scala  |   8 +-
 .../spark/sql/execution/command/views.scala   | 132 ++++++++++++++++-
 .../spark/sql/hive/HiveMetastoreCatalog.scala |  19 +--
 .../spark/sql/hive/client/HiveClient.scala    |   6 -
 .../sql/hive/client/HiveClientImpl.scala      |   8 -
 .../hive/execution/CreateViewAsSelect.scala   | 137 ------------------
 .../spark/sql/hive/HiveDDLCommandSuite.scala  |   7 +-
 9 files changed, 140 insertions(+), 182 deletions(-)
 delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
index 3d4a02b0ff..4fc65cbce1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -62,6 +62,9 @@ abstract class DataType extends AbstractDataType {
   /** Readable string representation for the type. */
   def simpleString: String = typeName
 
+  /** String representation for the type saved in external catalogs. */
+  def catalogString: String = simpleString
+
   /** Readable string representation for the type with truncation */
   private[sql] def simpleString(maxNumberFields: Int): String = simpleString
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
index 71a9b9f808..aa36121bde 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
@@ -91,6 +91,8 @@ abstract class UserDefinedType[UserType >: Null] extends DataType with Serializa
     case that: UserDefinedType[_] => this.acceptsType(that)
     case _ => false
   }
+
+  override def catalogString: String = sqlType.simpleString
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index e983a4cee6..7dc888cdde 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -1101,7 +1101,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create or replace a view. This creates a [[CreateViewAsSelectLogicalCommand]] command.
+   * Create or replace a view. This creates a [[CreateViewCommand]] command.
    *
    * For example:
    * {{{
@@ -1134,7 +1134,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Alter the query of a view. This creates a [[CreateViewAsSelectLogicalCommand]] command.
+   * Alter the query of a view. This creates a [[CreateViewCommand]] command.
    */
   override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) {
     createView(
@@ -1149,7 +1149,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a [[CreateViewAsSelectLogicalCommand]] command.
+   * Create a [[CreateViewCommand]] command.
    */
   private def createView(
       ctx: ParserRuleContext,
@@ -1170,7 +1170,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       viewOriginalText = sql,
       viewText = sql,
       comment = comment)
-    CreateViewAsSelectLogicalCommand(tableDesc, plan(query), allowExist, replace, command(ctx))
+    CreateViewCommand(tableDesc, plan(query), allowExist, replace, command(ctx))
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index aa6112c7f0..082f944f99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -17,16 +17,136 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
+import scala.util.control.NonFatal
 
-case class CreateViewAsSelectLogicalCommand(
+import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.catalyst.SQLBuilder
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+
+
+/**
+ * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
+ * depending on Hive meta-store.
+ *
+ * @param tableDesc the catalog table
+ * @param child the logical plan that represents the view; this is used to generate a canonicalized
+ *              version of the SQL that can be saved in the catalog.
+ * @param allowExisting if true, and if the view already exists, noop; if false, and if the view
+ *                already exists, throws analysis exception.
+ * @param replace if true, and if the view already exists, updates it; if false, and if the view
+ *                already exists, throws analysis exception.
+ * @param sql the original sql
+ */
+case class CreateViewCommand(
     tableDesc: CatalogTable,
     child: LogicalPlan,
     allowExisting: Boolean,
     replace: Boolean,
-    sql: String) extends UnaryNode with Command {
+    sql: String)
+  extends RunnableCommand {
+
+  // TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is
+  // different from Hive and may not work for some cases like create view on self join.
+
   override def output: Seq[Attribute] = Seq.empty[Attribute]
-  override lazy val resolved: Boolean = false
+
+  require(tableDesc.tableType == CatalogTableType.VIRTUAL_VIEW)
+  require(tableDesc.viewText.isDefined)
+
+  private val tableIdentifier = tableDesc.identifier
+
+  if (allowExisting && replace) {
+    throw new AnalysisException(
+      "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
+  }
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val analzyedPlan = sqlContext.executePlan(child).analyzed
+
+    require(tableDesc.schema == Nil || tableDesc.schema.length == analzyedPlan.output.length)
+    val sessionState = sqlContext.sessionState
+
+    if (sessionState.catalog.tableExists(tableIdentifier)) {
+      if (allowExisting) {
+        // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
+        // already exists.
+      } else if (replace) {
+        // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
+        sessionState.catalog.alterTable(prepareTable(sqlContext, analzyedPlan))
+      } else {
+        // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
+        // exists.
+        throw new AnalysisException(s"View $tableIdentifier already exists. " +
+          "If you want to update the view definition, please use ALTER VIEW AS or " +
+          "CREATE OR REPLACE VIEW AS")
+      }
+    } else {
+      // Create the view if it doesn't exist.
+      sessionState.catalog.createTable(
+        prepareTable(sqlContext, analzyedPlan), ignoreIfExists = false)
+    }
+
+    Seq.empty[Row]
+  }
+
+  private def prepareTable(sqlContext: SQLContext, analzyedPlan: LogicalPlan): CatalogTable = {
+    val expandedText = if (sqlContext.conf.canonicalView) {
+      try rebuildViewQueryString(sqlContext, analzyedPlan) catch {
+        case NonFatal(e) => wrapViewTextWithSelect(analzyedPlan)
+      }
+    } else {
+      wrapViewTextWithSelect(analzyedPlan)
+    }
+
+    val viewSchema = {
+      if (tableDesc.schema.isEmpty) {
+        analzyedPlan.output.map { a =>
+          CatalogColumn(a.name, a.dataType.simpleString)
+        }
+      } else {
+        analzyedPlan.output.zip(tableDesc.schema).map { case (a, col) =>
+          CatalogColumn(col.name, a.dataType.simpleString, nullable = true, col.comment)
+        }
+      }
+    }
+
+    tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
+  }
+
+  private def wrapViewTextWithSelect(analzyedPlan: LogicalPlan): String = {
+    // When user specified column names for view, we should create a project to do the renaming.
+    // When no column name specified, we still need to create a project to declare the columns
+    // we need, to make us more robust to top level `*`s.
+    val viewOutput = {
+      val columnNames = analzyedPlan.output.map(f => quote(f.name))
+      if (tableDesc.schema.isEmpty) {
+        columnNames.mkString(", ")
+      } else {
+        columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
+          case (name, alias) => s"$name AS $alias"
+        }.mkString(", ")
+      }
+    }
+
+    val viewText = tableDesc.viewText.get
+    val viewName = quote(tableDesc.identifier.table)
+    s"SELECT $viewOutput FROM ($viewText) $viewName"
+  }
+
+  private def rebuildViewQueryString(sqlContext: SQLContext, analzyedPlan: LogicalPlan): String = {
+    val logicalPlan = if (tableDesc.schema.isEmpty) {
+      analzyedPlan
+    } else {
+      val projectList = analzyedPlan.output.zip(tableDesc.schema).map {
+        case (attr, col) => Alias(attr, col.name)()
+      }
+      sqlContext.executePlan(Project(projectList, analzyedPlan)).analyzed
+    }
+    new SQLBuilder(logicalPlan).toSQL
+  }
+
+  // escape backtick with double-backtick in column name and wrap it with backtick.
+  private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index df2b6beac6..6ccff454b1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, HiveNativeCommand}
+import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewCommand, HiveNativeCommand}
 import org.apache.spark.sql.execution.datasources.{Partition => _, _}
 import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
 import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
@@ -629,22 +629,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
       case p: LogicalPlan if !p.childrenResolved => p
       case p: LogicalPlan if p.resolved => p
 
-      case CreateViewAsSelectLogicalCommand(table, child, allowExisting, replace, sql)
-          if conf.nativeView =>
-        if (allowExisting && replace) {
-          throw new AnalysisException(
-            "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
-        }
-
-        val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
-
-        execution.CreateViewAsSelect(
-          table.copy(identifier = TableIdentifier(tblName, Some(dbName))),
-          child,
-          allowExisting,
-          replace)
-
-      case CreateViewAsSelectLogicalCommand(table, child, allowExisting, replace, sql) =>
+      case CreateViewCommand(table, child, allowExisting, replace, sql) if !conf.nativeView =>
         HiveNativeCommand(sql)
 
       case p @ CreateTableAsSelectLogicalPlan(table, child, allowExisting) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 6f7e7bf451..ae719f86aa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -75,12 +75,6 @@ private[hive] trait HiveClient {
   /** Returns the metadata for the specified table or None if it doesn't exist. */
   def getTableOption(dbName: String, tableName: String): Option[CatalogTable]
 
-  /** Creates a view with the given metadata. */
-  def createView(view: CatalogTable): Unit
-
-  /** Updates the given view with new metadata. */
-  def alertView(view: CatalogTable): Unit
-
   /** Creates a table with the given metadata. */
   def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 703d991829..6327431368 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -342,14 +342,6 @@ private[hive] class HiveClientImpl(
     }
   }
 
-  override def createView(view: CatalogTable): Unit = withHiveState {
-    client.createTable(toHiveViewTable(view))
-  }
-
-  override def alertView(view: CatalogTable): Unit = withHiveState {
-    client.alterTable(view.qualifiedName, toHiveViewTable(view))
-  }
-
   override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
     client.createTable(toHiveTable(table), ignoreIfExists)
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
deleted file mode 100644
index fa830a1a0e..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import scala.util.control.NonFatal
-
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
-import org.apache.spark.sql.catalyst.SQLBuilder
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveSessionState}
-
-/**
- * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
- * depending on Hive meta-store.
- */
-// TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is different
-// from Hive and may not work for some cases like create view on self join.
-private[hive] case class CreateViewAsSelect(
-    tableDesc: CatalogTable,
-    child: LogicalPlan,
-    allowExisting: Boolean,
-    orReplace: Boolean) extends RunnableCommand {
-
-  private val childSchema = child.output
-
-  assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
-  assert(tableDesc.viewText.isDefined)
-
-  private val tableIdentifier = tableDesc.identifier
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
-
-    sessionState.catalog.tableExists(tableIdentifier) match {
-      case true if allowExisting =>
-        // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
-        // already exists.
-
-      case true if orReplace =>
-        // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
-        sessionState.metadataHive.alertView(prepareTable(sqlContext))
-
-      case true =>
-        // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
-        // exists.
-        throw new AnalysisException(s"View $tableIdentifier already exists. " +
-          "If you want to update the view definition, please use ALTER VIEW AS or " +
-          "CREATE OR REPLACE VIEW AS")
-
-      case false =>
-        sessionState.metadataHive.createView(prepareTable(sqlContext))
-    }
-
-    Seq.empty[Row]
-  }
-
-  private def prepareTable(sqlContext: SQLContext): CatalogTable = {
-    val expandedText = if (sqlContext.conf.canonicalView) {
-      try rebuildViewQueryString(sqlContext) catch {
-        case NonFatal(e) => wrapViewTextWithSelect
-      }
-    } else {
-      wrapViewTextWithSelect
-    }
-
-    val viewSchema = {
-      if (tableDesc.schema.isEmpty) {
-        childSchema.map { a =>
-          CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType))
-        }
-      } else {
-        childSchema.zip(tableDesc.schema).map { case (a, col) =>
-          CatalogColumn(
-            col.name,
-            HiveMetastoreTypes.toMetastoreType(a.dataType),
-            nullable = true,
-            col.comment)
-        }
-      }
-    }
-
-    tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
-  }
-
-  private def wrapViewTextWithSelect: String = {
-    // When user specified column names for view, we should create a project to do the renaming.
-    // When no column name specified, we still need to create a project to declare the columns
-    // we need, to make us more robust to top level `*`s.
-    val viewOutput = {
-      val columnNames = childSchema.map(f => quote(f.name))
-      if (tableDesc.schema.isEmpty) {
-        columnNames.mkString(", ")
-      } else {
-        columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
-          case (name, alias) => s"$name AS $alias"
-        }.mkString(", ")
-      }
-    }
-
-    val viewText = tableDesc.viewText.get
-    val viewName = quote(tableDesc.identifier.table)
-    s"SELECT $viewOutput FROM ($viewText) $viewName"
-  }
-
-  private def rebuildViewQueryString(sqlContext: SQLContext): String = {
-    val logicalPlan = if (tableDesc.schema.isEmpty) {
-      child
-    } else {
-      val projectList = childSchema.zip(tableDesc.schema).map {
-        case (attr, col) => Alias(attr, col.name)()
-      }
-      sqlContext.executePlan(Project(projectList, child)).analyzed
-    }
-    new SQLBuilder(logicalPlan).toSQL
-  }
-
-  // escape backtick with double-backtick in column name and wrap it with backtick.
-  private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index f6e3a4bd2d..e3204ff793 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
-import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand, LoadData}
+import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewCommand, HiveNativeCommand, LoadData}
 import org.apache.spark.sql.hive.test.TestHive
 
 class HiveDDLCommandSuite extends PlanTest {
@@ -39,7 +39,7 @@ class HiveDDLCommandSuite extends PlanTest {
     parser.parsePlan(sql).collect {
       case CreateTable(desc, allowExisting) => (desc, allowExisting)
       case CreateTableAsSelectLogicalPlan(desc, _, allowExisting) => (desc, allowExisting)
-      case CreateViewAsSelectLogicalCommand(desc, _, allowExisting, _, _) => (desc, allowExisting)
+      case CreateViewCommand(desc, _, allowExisting, _, _) => (desc, allowExisting)
     }.head
   }
 
@@ -521,14 +521,13 @@ class HiveDDLCommandSuite extends PlanTest {
   test("create view - full") {
     val v1 =
       """
-        |CREATE OR REPLACE VIEW IF NOT EXISTS view1
+        |CREATE OR REPLACE VIEW view1
         |(col1, col3)
         |COMMENT 'BLABLA'
         |TBLPROPERTIES('prop1Key'="prop1Val")
         |AS SELECT * FROM tab1
       """.stripMargin
     val (desc, exists) = extractTableDesc(v1)
-    assert(exists)
     assert(desc.identifier.database.isEmpty)
     assert(desc.identifier.table == "view1")
     assert(desc.tableType == CatalogTableType.VIRTUAL_VIEW)
-- 
GitLab