diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2d664d3ee691bd368b19a407a9b949851f7f9c85..c9ba6700998c3369e2f591bace3ff669a33a8e4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -367,6 +367,14 @@ private[spark] object SQLConf {
           "possible, or you may get wrong result.",
     isPublic = false)
 
+  val CANONICAL_NATIVE_VIEW = booleanConf("spark.sql.nativeView.canonical",
+    defaultValue = Some(true),
+    doc = "When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " +
+          "CREATE VIEW statement using SQL query string generated from view definition logical " +
+          "plan.  If the logical plan doesn't have a SQL representation, we fallback to the " +
+          "original native view implementation.",
+    isPublic = false)
+
   val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord",
     defaultValue = Some("_corrupt_record"),
     doc = "The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
@@ -550,6 +558,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with ParserCon
 
   private[spark] def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
 
+  private[spark] def canonicalView: Boolean = getConf(CANONICAL_NATIVE_VIEW)
+
   def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
 
   private[spark] def subexpressionEliminationEnabled: Boolean =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 5f73d71d4510a1560ac074a9de7aa95f7cdaadf8..d48143762cac0b9d8393dc4ea1660437c1c6068e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -154,9 +154,22 @@ private[sql] trait SQLTestUtils
     }
   }
 
+  /**
+   * Drops view `viewName` after calling `f`.
+   */
+  protected def withView(viewNames: String*)(f: => Unit): Unit = {
+    try f finally {
+      viewNames.foreach { name =>
+        sqlContext.sql(s"DROP VIEW IF EXISTS $name")
+      }
+    }
+  }
+
   /**
    * Creates a temporary database and switches current database to it before executing `f`.  This
    * database is dropped after `f` returns.
+   *
+   * Note that this method doesn't switch current database before executing `f`.
    */
   protected def withTempDatabase(f: String => Unit): Unit = {
     val dbName = s"db_${UUID.randomUUID().toString.replace('-', '_')}"
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 80e45d5162801d29bad312e322f748282d25dcd2..a9c0e9ab7caef140d2a38186988cd7c55bc0b437 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -579,25 +579,24 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
       case p: LogicalPlan if !p.childrenResolved => p
       case p: LogicalPlan if p.resolved => p
 
-      case CreateViewAsSelect(table, child, allowExisting, replace, sql) =>
-        if (conf.nativeView) {
-          if (allowExisting && replace) {
-            throw new AnalysisException(
-              "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
-          }
+      case CreateViewAsSelect(table, child, allowExisting, replace, sql) if conf.nativeView =>
+        if (allowExisting && replace) {
+          throw new AnalysisException(
+            "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
+        }
 
-          val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
+        val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
 
-          execution.CreateViewAsSelect(
-            table.copy(
-              specifiedDatabase = Some(dbName),
-              name = tblName),
-            child.output,
-            allowExisting,
-            replace)
-        } else {
-          HiveNativeCommand(sql)
-        }
+        execution.CreateViewAsSelect(
+          table.copy(
+            specifiedDatabase = Some(dbName),
+            name = tblName),
+          child,
+          allowExisting,
+          replace)
+
+      case CreateViewAsSelect(table, child, allowExisting, replace, sql) =>
+        HiveNativeCommand(sql)
 
       case p @ CreateTableAsSelect(table, child, allowExisting) =>
         val schema = if (table.schema.nonEmpty) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 6e288afbb4d2d80af8b88443ffad5292235072be..31bda56e8a163ab6b41eeb03c84ba4e808c96660 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -19,9 +19,10 @@ package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
+import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, SQLBuilder}
 import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
 
 /**
@@ -32,10 +33,12 @@ import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
 // from Hive and may not work for some cases like create view on self join.
 private[hive] case class CreateViewAsSelect(
     tableDesc: HiveTable,
-    childSchema: Seq[Attribute],
+    child: LogicalPlan,
     allowExisting: Boolean,
     orReplace: Boolean) extends RunnableCommand {
 
+  private val childSchema = child.output
+
   assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
   assert(tableDesc.viewText.isDefined)
 
@@ -44,55 +47,83 @@ private[hive] case class CreateViewAsSelect(
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val hiveContext = sqlContext.asInstanceOf[HiveContext]
 
-    if (hiveContext.catalog.tableExists(tableIdentifier)) {
-      if (allowExisting) {
-        // view already exists, will do nothing, to keep consistent with Hive
-      } else if (orReplace) {
-        hiveContext.catalog.client.alertView(prepareTable())
-      } else {
+    hiveContext.catalog.tableExists(tableIdentifier) match {
+      case true if allowExisting =>
+        // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
+        // already exists.
+
+      case true if orReplace =>
+        // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
+        hiveContext.catalog.client.alertView(prepareTable(sqlContext))
+
+      case true =>
+        // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
+        // exists.
         throw new AnalysisException(s"View $tableIdentifier already exists. " +
           "If you want to update the view definition, please use ALTER VIEW AS or " +
           "CREATE OR REPLACE VIEW AS")
-      }
-    } else {
-      hiveContext.catalog.client.createView(prepareTable())
+
+      case false =>
+        hiveContext.catalog.client.createView(prepareTable(sqlContext))
     }
 
     Seq.empty[Row]
   }
 
-  private def prepareTable(): HiveTable = {
-    // setup column types according to the schema of child.
-    val schema = if (tableDesc.schema == Nil) {
-      childSchema.map { attr =>
-        HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
-      }
+  private def prepareTable(sqlContext: SQLContext): HiveTable = {
+    val expandedText = if (sqlContext.conf.canonicalView) {
+      rebuildViewQueryString(sqlContext).getOrElse(wrapViewTextWithSelect)
     } else {
-      childSchema.zip(tableDesc.schema).map { case (attr, col) =>
-        HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment)
+      wrapViewTextWithSelect
+    }
+
+    val viewSchema = {
+      if (tableDesc.schema.isEmpty) {
+        childSchema.map { attr =>
+          HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
+        }
+      } else {
+        childSchema.zip(tableDesc.schema).map { case (attr, col) =>
+          HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment)
+        }
       }
     }
 
-    val columnNames = childSchema.map(f => verbose(f.name))
+    tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
+  }
 
+  private def wrapViewTextWithSelect: String = {
     // When user specified column names for view, we should create a project to do the renaming.
     // When no column name specified, we still need to create a project to declare the columns
     // we need, to make us more robust to top level `*`s.
-    val projectList = if (tableDesc.schema == Nil) {
-      columnNames.mkString(", ")
-    } else {
-      columnNames.zip(tableDesc.schema.map(f => verbose(f.name))).map {
-        case (name, alias) => s"$name AS $alias"
-      }.mkString(", ")
+    val viewOutput = {
+      val columnNames = childSchema.map(f => quote(f.name))
+      if (tableDesc.schema.isEmpty) {
+        columnNames.mkString(", ")
+      } else {
+        columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
+          case (name, alias) => s"$name AS $alias"
+        }.mkString(", ")
+      }
     }
 
-    val viewName = verbose(tableDesc.name)
-
-    val expandedText = s"SELECT $projectList FROM (${tableDesc.viewText.get}) $viewName"
+    val viewText = tableDesc.viewText.get
+    val viewName = quote(tableDesc.name)
+    s"SELECT $viewOutput FROM ($viewText) $viewName"
+  }
 
-    tableDesc.copy(schema = schema, viewText = Some(expandedText))
+  private def rebuildViewQueryString(sqlContext: SQLContext): Option[String] = {
+    val logicalPlan = if (tableDesc.schema.isEmpty) {
+      child
+    } else {
+      val projectList = childSchema.zip(tableDesc.schema).map {
+        case (attr, col) => Alias(attr, col.name)()
+      }
+      sqlContext.executePlan(Project(projectList, child)).analyzed
+    }
+    new SQLBuilder(logicalPlan, sqlContext).toSQL
   }
 
   // escape backtick with double-backtick in column name and wrap it with backtick.
-  private def verbose(name: String) = s"`${name.replaceAll("`", "``")}`"
+  private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index 261a4746f4287adbddad11e110e26621c699e7e7..1f731db26f3875a2dfb7a80cd8bbf5f35cf79772 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -147,7 +147,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
 
   // TODO Enable this
   // Query plans transformed by DistinctAggregationRewriter are not recognized yet
-  ignore("distinct and non-distinct aggregation") {
+  ignore("multi-distinct columns") {
     checkHiveQl("SELECT a, COUNT(DISTINCT b), COUNT(DISTINCT c), SUM(d) FROM t2 GROUP BY a")
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 683008960aa286fbc70594b0e6c33263284a6717..9e53d8a81e75370dfa277c4b9b95fc1e2dd38c02 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1319,67 +1319,119 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
-  test("correctly handle CREATE OR REPLACE VIEW") {
-    withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
-      withTable("jt", "jt2") {
-        sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
-        sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
-        checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+  Seq(true, false).foreach { enabled =>
+    val prefix = (if (enabled) "With" else "Without") + " canonical native view: "
+    test(s"$prefix correctly handle CREATE OR REPLACE VIEW") {
+      withSQLConf(
+        SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+        withTable("jt", "jt2") {
+          sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+          sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
+          checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+
+          val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+          df.write.format("json").saveAsTable("jt2")
+          sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
+          // make sure the view has been changed.
+          checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+
+          sql("DROP VIEW testView")
+
+          val e = intercept[AnalysisException] {
+            sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
+          }
+          assert(e.message.contains("not allowed to define a view"))
+        }
+      }
+    }
 
-        val df = (1 until 10).map(i => i -> i).toDF("i", "j")
-        df.write.format("json").saveAsTable("jt2")
-        sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
-        // make sure the view has been changed.
-        checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+    test(s"$prefix correctly handle ALTER VIEW") {
+      withSQLConf(
+        SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+        withTable("jt", "jt2") {
+          withView("testView") {
+            sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+            sql("CREATE VIEW testView AS SELECT id FROM jt")
+
+            val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+            df.write.format("json").saveAsTable("jt2")
+            sql("ALTER VIEW testView AS SELECT * FROM jt2")
+            // make sure the view has been changed.
+            checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+          }
+        }
+      }
+    }
 
-        sql("DROP VIEW testView")
+    test(s"$prefix create hive view for json table") {
+      // json table is not hive-compatible, make sure the new flag fix it.
+      withSQLConf(
+        SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+        withTable("jt") {
+          withView("testView") {
+            sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+            sql("CREATE VIEW testView AS SELECT id FROM jt")
+            checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+          }
+        }
+      }
+    }
 
-        val e = intercept[AnalysisException] {
-          sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
+    test(s"$prefix create hive view for partitioned parquet table") {
+      // partitioned parquet table is not hive-compatible, make sure the new flag fix it.
+      withSQLConf(
+        SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+        withTable("parTable") {
+          withView("testView") {
+            val df = Seq(1 -> "a").toDF("i", "j")
+            df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
+            sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
+            checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
+          }
         }
-        assert(e.message.contains("not allowed to define a view"))
       }
     }
   }
 
-  test("correctly handle ALTER VIEW") {
-    withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
-      withTable("jt", "jt2") {
-        sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
-        sql("CREATE VIEW testView AS SELECT id FROM jt")
-
-        val df = (1 until 10).map(i => i -> i).toDF("i", "j")
-        df.write.format("json").saveAsTable("jt2")
-        sql("ALTER VIEW testView AS SELECT * FROM jt2")
-        // make sure the view has been changed.
-        checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
-
-        sql("DROP VIEW testView")
+  test("CTE within view") {
+    withSQLConf(
+      SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
+      withView("cte_view") {
+        sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w")
+        checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
       }
     }
   }
 
-  test("create hive view for json table") {
-    // json table is not hive-compatible, make sure the new flag fix it.
-    withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
-      withTable("jt") {
-        sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
-        sql("CREATE VIEW testView AS SELECT id FROM jt")
-        checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
-        sql("DROP VIEW testView")
+  test("Using view after switching current database") {
+    withSQLConf(
+      SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
+      withView("v") {
+        sql("CREATE VIEW v AS SELECT * FROM src")
+        withTempDatabase { db =>
+          activateDatabase(db) {
+            // Should look up table `src` in database `default`.
+            checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
+
+            // The new `src` table shouldn't be scanned.
+            sql("CREATE TABLE src(key INT, value STRING)")
+            checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
+          }
+        }
       }
     }
   }
 
-  test("create hive view for partitioned parquet table") {
-    // partitioned parquet table is not hive-compatible, make sure the new flag fix it.
-    withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
-      withTable("parTable") {
-        val df = Seq(1 -> "a").toDF("i", "j")
-        df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
-        sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
-        checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
-        sql("DROP VIEW testView")
+  test("Using view after adding more columns") {
+    withSQLConf(
+      SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
+      withTable("add_col") {
+        sqlContext.range(10).write.saveAsTable("add_col")
+        withView("v") {
+          sql("CREATE VIEW v AS SELECT * FROM add_col")
+          sqlContext.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col")
+          checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10))
+        }
       }
     }
   }