diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 2adccdd7bf61d5b8ca961d77f134a23f3b7eda75..80d32822f58ce0bdad6e52b80666ccd12c0cb538 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -223,25 +223,6 @@ case class CatalogTable(
     )
   }
 
-  /**
-   * Insert/Update the view query output column names in `properties`.
-   */
-  def withQueryColumnNames(columns: Seq[String]): CatalogTable = {
-    val props = new mutable.HashMap[String, String]
-    if (columns.nonEmpty) {
-      props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString)
-      columns.zipWithIndex.foreach { case (colName, index) =>
-        props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName)
-      }
-    }
-
-    // We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
-    // while `CatalogTable` should be serializable.
-    copy(properties = properties.filterNot { case (key, _) =>
-      key.startsWith(VIEW_QUERY_OUTPUT_PREFIX)
-    } ++ props)
-  }
-
   /** Syntactic sugar to update a field in `storage`. */
   def withNewStorage(
       locationUri: Option[String] = storage.locationUri,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 154141bf83c7d17bda23fcb77708426c394b2802..3da4bcfe9363cc45df104719a3c95f0108731ed9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.sql.execution.command
 
-import scala.util.control.NonFatal
+import scala.collection.mutable
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions.Alias
@@ -64,9 +64,9 @@ object PersistedView extends ViewType
 
 
 /**
- * Create or replace a view with given query plan. This command will convert the query plan to
- * canonicalized SQL string, and store it as view text in metastore, if we need to create a
- * permanent view.
+ * Create or replace a view with given query plan. This command will generate some view-specific
+ * properties(e.g. view default database, view query output column names) and store them as
+ * properties in metastore, if we need to create a permanent view.
  *
  * @param name the name of this view.
  * @param userSpecifiedColumns the output column names and optional comments specified by users,
@@ -75,8 +75,8 @@ object PersistedView extends ViewType
  * @param properties the properties of this view.
  * @param originalText the original SQL text of this view, can be None if this view is created via
  *                     Dataset API.
- * @param child the logical plan that represents the view; this is used to generate a canonicalized
- *              version of the SQL that can be saved in the catalog.
+ * @param child the logical plan that represents the view; this is used to generate the logical
+ *              plan for temporary view and the view schema.
  * @param allowExisting if true, and if the view already exists, noop; if false, and if the view
  *                already exists, throws analysis exception.
  * @param replace if true, and if the view already exists, updates it; if false, and if the view
@@ -95,6 +95,8 @@ case class CreateViewCommand(
     viewType: ViewType)
   extends RunnableCommand {
 
+  import ViewHelper._
+
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
 
   if (viewType == PersistedView) {
@@ -137,22 +139,12 @@ case class CreateViewCommand(
     // This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved)
     verifyTemporaryObjectsNotExists(sparkSession)
 
-    val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
-      analyzedPlan
-    } else {
-      val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
-        case (attr, (colName, None)) => Alias(attr, colName)()
-        case (attr, (colName, Some(colComment))) =>
-          val meta = new MetadataBuilder().putString("comment", colComment).build()
-          Alias(attr, colName)(explicitMetadata = Some(meta))
-      }
-      sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
-    }
-
     val catalog = sparkSession.sessionState.catalog
     if (viewType == LocalTempView) {
+      val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
       catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace)
     } else if (viewType == GlobalTempView) {
+      val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
       catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace)
     } else if (catalog.tableExists(name)) {
       val tableMetadata = catalog.getTableMetadata(name)
@@ -163,7 +155,7 @@ case class CreateViewCommand(
         throw new AnalysisException(s"$name is not a view")
       } else if (replace) {
         // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
-        catalog.alterTable(prepareTable(sparkSession, aliasedPlan))
+        catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
       } else {
         // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
         // exists.
@@ -173,7 +165,7 @@ case class CreateViewCommand(
       }
     } else {
       // Create the view if it doesn't exist.
-      catalog.createTable(prepareTable(sparkSession, aliasedPlan), ignoreIfExists = false)
+      catalog.createTable(prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
     }
     Seq.empty[Row]
   }
@@ -207,29 +199,44 @@ case class CreateViewCommand(
   }
 
   /**
-   * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
-   * SQL based on the analyzed plan, and also creates the proper schema for the view.
+   * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns,
+   * else return the analyzed plan directly.
    */
-  private def prepareTable(sparkSession: SparkSession, aliasedPlan: LogicalPlan): CatalogTable = {
-    val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL
-
-    // Validate the view SQL - make sure we can parse it and analyze it.
-    // If we cannot analyze the generated query, there is probably a bug in SQL generation.
-    try {
-      sparkSession.sql(viewSQL).queryExecution.assertAnalyzed()
-    } catch {
-      case NonFatal(e) =>
-        throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
+  private def aliasPlan(session: SparkSession, analyzedPlan: LogicalPlan): LogicalPlan = {
+    if (userSpecifiedColumns.isEmpty) {
+      analyzedPlan
+    } else {
+      val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
+        case (attr, (colName, None)) => Alias(attr, colName)()
+        case (attr, (colName, Some(colComment))) =>
+          val meta = new MetadataBuilder().putString("comment", colComment).build()
+          Alias(attr, colName)(explicitMetadata = Some(meta))
+      }
+      session.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
     }
+  }
+
+  /**
+   * Returns a [[CatalogTable]] that can be used to save in the catalog. Generate the view-specific
+   * properties(e.g. view default database, view query output column names) and store them as
+   * properties in the CatalogTable, and also creates the proper schema for the view.
+   */
+  private def prepareTable(session: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
+    if (originalText.isEmpty) {
+      throw new AnalysisException(
+        "It is not allowed to create a persisted view from the Dataset API")
+    }
+
+    val newProperties = generateViewProperties(properties, session, analyzedPlan)
 
     CatalogTable(
       identifier = name,
       tableType = CatalogTableType.VIEW,
       storage = CatalogStorageFormat.empty,
-      schema = aliasedPlan.schema,
-      properties = properties,
+      schema = aliasPlan(session, analyzedPlan).schema,
+      properties = newProperties,
       viewOriginalText = originalText,
-      viewText = Some(viewSQL),
+      viewText = originalText,
       comment = comment
     )
   }
@@ -244,14 +251,16 @@ case class CreateViewCommand(
  * @param name the name of this view.
  * @param originalText the original SQL text of this view. Note that we can only alter a view by
  *                     SQL API, which means we always have originalText.
- * @param query the logical plan that represents the view; this is used to generate a canonicalized
- *              version of the SQL that can be saved in the catalog.
+ * @param query the logical plan that represents the view; this is used to generate the new view
+ *              schema.
  */
 case class AlterViewAsCommand(
     name: TableIdentifier,
     originalText: String,
     query: LogicalPlan) extends RunnableCommand {
 
+  import ViewHelper._
+
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
   override def run(session: SparkSession): Seq[Row] = {
@@ -275,21 +284,80 @@ case class AlterViewAsCommand(
       throw new AnalysisException(s"${viewMeta.identifier} is not a view.")
     }
 
-    val viewSQL: String = new SQLBuilder(analyzedPlan).toSQL
-    // Validate the view SQL - make sure we can parse it and analyze it.
-    // If we cannot analyze the generated query, there is probably a bug in SQL generation.
-    try {
-      session.sql(viewSQL).queryExecution.assertAnalyzed()
-    } catch {
-      case NonFatal(e) =>
-        throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
-    }
+    val newProperties = generateViewProperties(viewMeta.properties, session, analyzedPlan)
 
     val updatedViewMeta = viewMeta.copy(
       schema = analyzedPlan.schema,
+      properties = newProperties,
       viewOriginalText = Some(originalText),
-      viewText = Some(viewSQL))
+      viewText = Some(originalText))
 
     session.sessionState.catalog.alterTable(updatedViewMeta)
   }
 }
+
+object ViewHelper {
+
+  import CatalogTable._
+
+  /**
+   * Generate the view default database in `properties`.
+   */
+  private def generateViewDefaultDatabase(databaseName: String): Map[String, String] = {
+    Map(VIEW_DEFAULT_DATABASE -> databaseName)
+  }
+
+  /**
+   * Generate the view query output column names in `properties`.
+   */
+  private def generateQueryColumnNames(columns: Seq[String]): Map[String, String] = {
+    val props = new mutable.HashMap[String, String]
+    if (columns.nonEmpty) {
+      props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString)
+      columns.zipWithIndex.foreach { case (colName, index) =>
+        props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName)
+      }
+    }
+    props.toMap
+  }
+
+  /**
+   * Remove the view query output column names in `properties`.
+   */
+  private def removeQueryColumnNames(properties: Map[String, String]): Map[String, String] = {
+    // We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
+    // while `CatalogTable` should be serializable.
+    properties.filterNot { case (key, _) =>
+      key.startsWith(VIEW_QUERY_OUTPUT_PREFIX)
+    }
+  }
+
+  /**
+   * Generate the view properties in CatalogTable, including:
+   * 1. view default database that is used to provide the default database name on view resolution.
+   * 2. the output column names of the query that creates a view, this is used to map the output of
+   *    the view child to the view output during view resolution.
+   *
+   * @param properties the `properties` in CatalogTable.
+   * @param session the spark session.
+   * @param analyzedPlan the analyzed logical plan that represents the child of a view.
+   * @return new view properties including view default database and query column names properties.
+   */
+  def generateViewProperties(
+      properties: Map[String, String],
+      session: SparkSession,
+      analyzedPlan: LogicalPlan): Map[String, String] = {
+    // Generate the query column names, throw an AnalysisException if there exists duplicate column
+    // names.
+    val queryOutput = analyzedPlan.schema.fieldNames
+    assert(queryOutput.distinct.size == queryOutput.size,
+      s"The view output ${queryOutput.mkString("(", ",", ")")} contains duplicate column name.")
+
+    // Generate the view default database name.
+    val viewDefaultDatabase = session.sessionState.catalog.getCurrentDatabase
+
+    removeQueryColumnNames(properties) ++
+      generateViewDefaultDatabase(viewDefaultDatabase) ++
+      generateQueryColumnNames(queryOutput)
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 605dec4a1ef909b5dd61d54d779b6419cd8ef6e0..10607b8dc2c21be0e5d273f2ab9b5ed90e5e25a7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2501,11 +2501,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("should be able to resolve a persistent view") {
-    withTable("t1") {
+    withTable("t1", "t2") {
       withView("v1") {
         sql("CREATE TABLE `t1` USING parquet AS SELECT * FROM VALUES(1, 1) AS t1(a, b)")
-        sql("CREATE VIEW `v1` AS SELECT * FROM t1")
-        checkAnswer(spark.table("v1"), Row(1, 1))
+        sql("CREATE TABLE `t2` USING parquet AS SELECT * FROM VALUES('a', 2, 1.0) AS t2(d, e, f)")
+        sql("CREATE VIEW `v1`(x, y) AS SELECT * FROM t1")
+        checkAnswer(spark.table("v1").orderBy("x"), Row(1, 1))
+
+        sql("ALTER VIEW `v1` AS SELECT * FROM t2")
+        checkAnswer(spark.table("v1").orderBy("f"), Row("a", 2, 1.0))
       }
     }
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 882a1841246803b843b49bd7d5d0747f40b2a7ba..edef30823b55c20a8c653d0dff050533329d8ba3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -381,28 +381,30 @@ class HiveDDLSuite
       spark.range(10).write.saveAsTable(tabName)
       val viewName = "view1"
       withView(viewName) {
-        val catalog = spark.sessionState.catalog
+        def checkProperties(expected: Map[String, String]): Boolean = {
+          val properties = spark.sessionState.catalog.getTableMetadata(TableIdentifier(viewName))
+            .properties
+          properties.filterNot { case (key, value) =>
+            Seq("transient_lastDdlTime", CatalogTable.VIEW_DEFAULT_DATABASE).contains(key) ||
+              key.startsWith(CatalogTable.VIEW_QUERY_OUTPUT_PREFIX)
+          } == expected
+        }
         sql(s"CREATE VIEW $viewName AS SELECT * FROM $tabName")
 
-        assert(catalog.getTableMetadata(TableIdentifier(viewName))
-          .properties.filter(_._1 != "transient_lastDdlTime") == Map())
+        checkProperties(Map())
         sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')")
-        assert(catalog.getTableMetadata(TableIdentifier(viewName))
-          .properties.filter(_._1 != "transient_lastDdlTime") == Map("p" -> "an"))
+        checkProperties(Map("p" -> "an"))
 
         // no exception or message will be issued if we set it again
         sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')")
-        assert(catalog.getTableMetadata(TableIdentifier(viewName))
-          .properties.filter(_._1 != "transient_lastDdlTime") == Map("p" -> "an"))
+        checkProperties(Map("p" -> "an"))
 
         // the value will be updated if we set the same key to a different value
         sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'b')")
-        assert(catalog.getTableMetadata(TableIdentifier(viewName))
-          .properties.filter(_._1 != "transient_lastDdlTime") == Map("p" -> "b"))
+        checkProperties(Map("p" -> "b"))
 
         sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')")
-        assert(catalog.getTableMetadata(TableIdentifier(viewName))
-          .properties.filter(_._1 != "transient_lastDdlTime") == Map())
+        checkProperties(Map())
 
         val message = intercept[AnalysisException] {
           sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')")
@@ -655,10 +657,7 @@ class HiveDDLSuite
           Seq(
             Row("# View Information", "", ""),
             Row("View Original Text:", "SELECT * FROM tbl", ""),
-            Row("View Expanded Text:",
-              "SELECT `gen_attr_0` AS `a` FROM (SELECT `gen_attr_0` FROM " +
-              "(SELECT `a` AS `gen_attr_0` FROM `default`.`tbl`) AS gen_subquery_0) AS tbl",
-              "")
+            Row("View Expanded Text:", "SELECT * FROM tbl", "")
           )
         ))
       }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index 9bc078dbb02283c5dcbfcf40561392284d043ae8..2658e2c91f235425b7fafbaa7011c1a1953d66d4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -222,13 +222,15 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
   }
 
   test("correctly parse CREATE VIEW statement") {
-    sql(
-      """CREATE VIEW IF NOT EXISTS
-        |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
-        |TBLPROPERTIES ('a' = 'b')
-        |AS SELECT * FROM jt""".stripMargin)
-    checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
-    sql("DROP VIEW testView")
+    withView("testView") {
+      sql(
+        """CREATE VIEW IF NOT EXISTS
+          |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
+          |TBLPROPERTIES ('a' = 'b')
+          |AS SELECT * FROM jt
+          |""".stripMargin)
+      checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
+    }
   }
 
   test("correctly parse CREATE TEMPORARY VIEW statement") {