diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
index 3d4a02b0ffebd0f91696d821923b565c9a0bd8cf..4fc65cbce15bd54e526a68d004292d7dc3ac7d0e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -62,6 +62,9 @@ abstract class DataType extends AbstractDataType {
   /** Readable string representation for the type. */
   def simpleString: String = typeName
 
+  /** String representation for the type saved in external catalogs. */
+  def catalogString: String = simpleString
+
   /** Readable string representation for the type with truncation */
   private[sql] def simpleString(maxNumberFields: Int): String = simpleString
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
index 71a9b9f8082a462a064940b03a665e8a481745ea..aa36121bdee7fcceb3ac3574dcfcb14bc5071bd5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
@@ -91,6 +91,8 @@ abstract class UserDefinedType[UserType >: Null] extends DataType with Serializa
     case that: UserDefinedType[_] => this.acceptsType(that)
     case _ => false
   }
+
+  override def catalogString: String = sqlType.simpleString
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index e983a4cee6d7f424d5e861ad06a292710460cd54..7dc888cdde4aacce7039cdbc6484256c8f2dc8b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -1101,7 +1101,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create or replace a view. This creates a [[CreateViewAsSelectLogicalCommand]] command.
+   * Create or replace a view. This creates a [[CreateViewCommand]] command.
    *
    * For example:
    * {{{
@@ -1134,7 +1134,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Alter the query of a view. This creates a [[CreateViewAsSelectLogicalCommand]] command.
+   * Alter the query of a view. This creates a [[CreateViewCommand]] command.
    */
   override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) {
     createView(
@@ -1149,7 +1149,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a [[CreateViewAsSelectLogicalCommand]] command.
+   * Create a [[CreateViewCommand]] command.
    */
   private def createView(
       ctx: ParserRuleContext,
@@ -1170,7 +1170,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       viewOriginalText = sql,
       viewText = sql,
       comment = comment)
-    CreateViewAsSelectLogicalCommand(tableDesc, plan(query), allowExist, replace, command(ctx))
+    CreateViewCommand(tableDesc, plan(query), allowExist, replace, command(ctx))
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index aa6112c7f043494c483dbc2557aa44999c8e43bb..082f944f99bb04a7f1e3a10cd23442e4c2c37e9d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -17,16 +17,136 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
+import scala.util.control.NonFatal
 
-case class CreateViewAsSelectLogicalCommand(
+import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.catalyst.SQLBuilder
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+
+
+/**
+ * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
+ * depending on Hive meta-store.
+ *
+ * @param tableDesc the catalog table
+ * @param child the logical plan that represents the view; this is used to generate a canonicalized
+ *              version of the SQL that can be saved in the catalog.
+ * @param allowExisting if true, and if the view already exists, noop; if false, and if the view
+ *                already exists, throws analysis exception.
+ * @param replace if true, and if the view already exists, updates it; if false, and if the view
+ *                already exists, throws analysis exception.
+ * @param sql the original sql
+ */
+case class CreateViewCommand(
     tableDesc: CatalogTable,
     child: LogicalPlan,
     allowExisting: Boolean,
     replace: Boolean,
-    sql: String) extends UnaryNode with Command {
+    sql: String)
+  extends RunnableCommand {
+
+  // TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is
+  // different from Hive and may not work for some cases like create view on self join.
+
   override def output: Seq[Attribute] = Seq.empty[Attribute]
-  override lazy val resolved: Boolean = false
+
+  require(tableDesc.tableType == CatalogTableType.VIRTUAL_VIEW)
+  require(tableDesc.viewText.isDefined)
+
+  private val tableIdentifier = tableDesc.identifier
+
+  if (allowExisting && replace) {
+    throw new AnalysisException(
+      "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
+  }
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val analzyedPlan = sqlContext.executePlan(child).analyzed
+
+    require(tableDesc.schema == Nil || tableDesc.schema.length == analzyedPlan.output.length)
+    val sessionState = sqlContext.sessionState
+
+    if (sessionState.catalog.tableExists(tableIdentifier)) {
+      if (allowExisting) {
+        // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
+        // already exists.
+      } else if (replace) {
+        // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
+        sessionState.catalog.alterTable(prepareTable(sqlContext, analzyedPlan))
+      } else {
+        // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
+        // exists.
+        throw new AnalysisException(s"View $tableIdentifier already exists. " +
+          "If you want to update the view definition, please use ALTER VIEW AS or " +
+          "CREATE OR REPLACE VIEW AS")
+      }
+    } else {
+      // Create the view if it doesn't exist.
+      sessionState.catalog.createTable(
+        prepareTable(sqlContext, analzyedPlan), ignoreIfExists = false)
+    }
+
+    Seq.empty[Row]
+  }
+
+  private def prepareTable(sqlContext: SQLContext, analzyedPlan: LogicalPlan): CatalogTable = {
+    val expandedText = if (sqlContext.conf.canonicalView) {
+      try rebuildViewQueryString(sqlContext, analzyedPlan) catch {
+        case NonFatal(e) => wrapViewTextWithSelect(analzyedPlan)
+      }
+    } else {
+      wrapViewTextWithSelect(analzyedPlan)
+    }
+
+    val viewSchema = {
+      if (tableDesc.schema.isEmpty) {
+        analzyedPlan.output.map { a =>
+          CatalogColumn(a.name, a.dataType.simpleString)
+        }
+      } else {
+        analzyedPlan.output.zip(tableDesc.schema).map { case (a, col) =>
+          CatalogColumn(col.name, a.dataType.simpleString, nullable = true, col.comment)
+        }
+      }
+    }
+
+    tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
+  }
+
+  private def wrapViewTextWithSelect(analzyedPlan: LogicalPlan): String = {
+    // When user specified column names for view, we should create a project to do the renaming.
+    // When no column name specified, we still need to create a project to declare the columns
+    // we need, to make us more robust to top level `*`s.
+    val viewOutput = {
+      val columnNames = analzyedPlan.output.map(f => quote(f.name))
+      if (tableDesc.schema.isEmpty) {
+        columnNames.mkString(", ")
+      } else {
+        columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
+          case (name, alias) => s"$name AS $alias"
+        }.mkString(", ")
+      }
+    }
+
+    val viewText = tableDesc.viewText.get
+    val viewName = quote(tableDesc.identifier.table)
+    s"SELECT $viewOutput FROM ($viewText) $viewName"
+  }
+
+  private def rebuildViewQueryString(sqlContext: SQLContext, analzyedPlan: LogicalPlan): String = {
+    val logicalPlan = if (tableDesc.schema.isEmpty) {
+      analzyedPlan
+    } else {
+      val projectList = analzyedPlan.output.zip(tableDesc.schema).map {
+        case (attr, col) => Alias(attr, col.name)()
+      }
+      sqlContext.executePlan(Project(projectList, analzyedPlan)).analyzed
+    }
+    new SQLBuilder(logicalPlan).toSQL
+  }
+
+  // escape backtick with double-backtick in column name and wrap it with backtick.
+  private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index df2b6beac640306e1b9798ca3d44097c809d3dc4..6ccff454b16a5b518a7d056564f71176d607c0d7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, HiveNativeCommand}
+import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewCommand, HiveNativeCommand}
 import org.apache.spark.sql.execution.datasources.{Partition => _, _}
 import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
 import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
@@ -629,22 +629,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
       case p: LogicalPlan if !p.childrenResolved => p
       case p: LogicalPlan if p.resolved => p
 
-      case CreateViewAsSelectLogicalCommand(table, child, allowExisting, replace, sql)
-          if conf.nativeView =>
-        if (allowExisting && replace) {
-          throw new AnalysisException(
-            "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
-        }
-
-        val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
-
-        execution.CreateViewAsSelect(
-          table.copy(identifier = TableIdentifier(tblName, Some(dbName))),
-          child,
-          allowExisting,
-          replace)
-
-      case CreateViewAsSelectLogicalCommand(table, child, allowExisting, replace, sql) =>
+      case CreateViewCommand(table, child, allowExisting, replace, sql) if !conf.nativeView =>
         HiveNativeCommand(sql)
 
       case p @ CreateTableAsSelectLogicalPlan(table, child, allowExisting) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 6f7e7bf45106ffe13b6bf726d608f95af875ecdd..ae719f86aa7e8f14c63c9f927f8d714146435cb5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -75,12 +75,6 @@ private[hive] trait HiveClient {
   /** Returns the metadata for the specified table or None if it doesn't exist. */
   def getTableOption(dbName: String, tableName: String): Option[CatalogTable]
 
-  /** Creates a view with the given metadata. */
-  def createView(view: CatalogTable): Unit
-
-  /** Updates the given view with new metadata. */
-  def alertView(view: CatalogTable): Unit
-
   /** Creates a table with the given metadata. */
   def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 703d991829c71bbd3285007afb6e9803e56c3450..6327431368fc539e6313ddd4b4edcf1dc069ab54 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -342,14 +342,6 @@ private[hive] class HiveClientImpl(
     }
   }
 
-  override def createView(view: CatalogTable): Unit = withHiveState {
-    client.createTable(toHiveViewTable(view))
-  }
-
-  override def alertView(view: CatalogTable): Unit = withHiveState {
-    client.alterTable(view.qualifiedName, toHiveViewTable(view))
-  }
-
   override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
     client.createTable(toHiveTable(table), ignoreIfExists)
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
deleted file mode 100644
index fa830a1a0e6c56cc576ae03217403b042c071d0b..0000000000000000000000000000000000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import scala.util.control.NonFatal
-
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
-import org.apache.spark.sql.catalyst.SQLBuilder
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveSessionState}
-
-/**
- * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
- * depending on Hive meta-store.
- */
-// TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is different
-// from Hive and may not work for some cases like create view on self join.
-private[hive] case class CreateViewAsSelect(
-    tableDesc: CatalogTable,
-    child: LogicalPlan,
-    allowExisting: Boolean,
-    orReplace: Boolean) extends RunnableCommand {
-
-  private val childSchema = child.output
-
-  assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
-  assert(tableDesc.viewText.isDefined)
-
-  private val tableIdentifier = tableDesc.identifier
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
-
-    sessionState.catalog.tableExists(tableIdentifier) match {
-      case true if allowExisting =>
-        // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
-        // already exists.
-
-      case true if orReplace =>
-        // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
-        sessionState.metadataHive.alertView(prepareTable(sqlContext))
-
-      case true =>
-        // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
-        // exists.
-        throw new AnalysisException(s"View $tableIdentifier already exists. " +
-          "If you want to update the view definition, please use ALTER VIEW AS or " +
-          "CREATE OR REPLACE VIEW AS")
-
-      case false =>
-        sessionState.metadataHive.createView(prepareTable(sqlContext))
-    }
-
-    Seq.empty[Row]
-  }
-
-  private def prepareTable(sqlContext: SQLContext): CatalogTable = {
-    val expandedText = if (sqlContext.conf.canonicalView) {
-      try rebuildViewQueryString(sqlContext) catch {
-        case NonFatal(e) => wrapViewTextWithSelect
-      }
-    } else {
-      wrapViewTextWithSelect
-    }
-
-    val viewSchema = {
-      if (tableDesc.schema.isEmpty) {
-        childSchema.map { a =>
-          CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType))
-        }
-      } else {
-        childSchema.zip(tableDesc.schema).map { case (a, col) =>
-          CatalogColumn(
-            col.name,
-            HiveMetastoreTypes.toMetastoreType(a.dataType),
-            nullable = true,
-            col.comment)
-        }
-      }
-    }
-
-    tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
-  }
-
-  private def wrapViewTextWithSelect: String = {
-    // When user specified column names for view, we should create a project to do the renaming.
-    // When no column name specified, we still need to create a project to declare the columns
-    // we need, to make us more robust to top level `*`s.
-    val viewOutput = {
-      val columnNames = childSchema.map(f => quote(f.name))
-      if (tableDesc.schema.isEmpty) {
-        columnNames.mkString(", ")
-      } else {
-        columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
-          case (name, alias) => s"$name AS $alias"
-        }.mkString(", ")
-      }
-    }
-
-    val viewText = tableDesc.viewText.get
-    val viewName = quote(tableDesc.identifier.table)
-    s"SELECT $viewOutput FROM ($viewText) $viewName"
-  }
-
-  private def rebuildViewQueryString(sqlContext: SQLContext): String = {
-    val logicalPlan = if (tableDesc.schema.isEmpty) {
-      child
-    } else {
-      val projectList = childSchema.zip(tableDesc.schema).map {
-        case (attr, col) => Alias(attr, col.name)()
-      }
-      sqlContext.executePlan(Project(projectList, child)).analyzed
-    }
-    new SQLBuilder(logicalPlan).toSQL
-  }
-
-  // escape backtick with double-backtick in column name and wrap it with backtick.
-  private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index f6e3a4bd2d4e2e077e835b0d40687079b6d28b22..e3204ff793eb8a90c39778f3e0e32de83dbd0ccb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
-import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand, LoadData}
+import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewCommand, HiveNativeCommand, LoadData}
 import org.apache.spark.sql.hive.test.TestHive
 
 class HiveDDLCommandSuite extends PlanTest {
@@ -39,7 +39,7 @@ class HiveDDLCommandSuite extends PlanTest {
     parser.parsePlan(sql).collect {
       case CreateTable(desc, allowExisting) => (desc, allowExisting)
       case CreateTableAsSelectLogicalPlan(desc, _, allowExisting) => (desc, allowExisting)
-      case CreateViewAsSelectLogicalCommand(desc, _, allowExisting, _, _) => (desc, allowExisting)
+      case CreateViewCommand(desc, _, allowExisting, _, _) => (desc, allowExisting)
     }.head
   }
 
@@ -521,14 +521,13 @@ class HiveDDLCommandSuite extends PlanTest {
   test("create view - full") {
     val v1 =
       """
-        |CREATE OR REPLACE VIEW IF NOT EXISTS view1
+        |CREATE OR REPLACE VIEW view1
         |(col1, col3)
         |COMMENT 'BLABLA'
         |TBLPROPERTIES('prop1Key'="prop1Val")
         |AS SELECT * FROM tab1
       """.stripMargin
     val (desc, exists) = extractTableDesc(v1)
-    assert(exists)
     assert(desc.identifier.database.isEmpty)
     assert(desc.identifier.table == "view1")
     assert(desc.tableType == CatalogTableType.VIRTUAL_VIEW)