diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index d3086fc91e3ec0068156ef0f9115f6593e114c2d..3de8aa02766dc8b2d841683da56db771481167bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -20,7 +20,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, AstBuilder, ParseException}
+import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
 import org.apache.spark.sql.execution.command.{DescribeCommand => _, _}
@@ -474,9 +474,13 @@ class SparkSqlAstBuilder extends AstBuilder {
    *   ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
    *   ALTER VIEW view ADD [IF NOT EXISTS] PARTITION spec
    * }}}
+   *
+   * ALTER VIEW ... DROP PARTITION ... is not supported because the concept of partitioning
+   * is associated with physical tables
    */
   override def visitAddTablePartition(
       ctx: AddTablePartitionContext): LogicalPlan = withOrigin(ctx) {
+    if (ctx.VIEW != null) throw new ParseException(s"Operation not allowed: partitioned views", ctx)
     // Create partition spec to location mapping.
     val specsAndLocs = if (ctx.partitionSpec.isEmpty) {
       ctx.partitionSpecLocation.asScala.map {
@@ -538,9 +542,13 @@ class SparkSqlAstBuilder extends AstBuilder {
    *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
    *   ALTER VIEW view DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...];
    * }}}
+   *
+   * ALTER VIEW ... DROP PARTITION ... is not supported because the concept of partitioning
+   * is associated with physical tables
    */
   override def visitDropTablePartitions(
       ctx: DropTablePartitionsContext): LogicalPlan = withOrigin(ctx) {
+    if (ctx.VIEW != null) throw new ParseException(s"Operation not allowed: partitioned views", ctx)
     AlterTableDropPartition(
       visitTableIdentifier(ctx.tableIdentifier),
       ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 618c9a58a677a90f82747d48ef20690567eeea39..46dcadd6900bfd31d68ce597a66c726aaa6077b9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -351,22 +351,12 @@ class DDLCommandSuite extends PlanTest {
       |(col1=NULL, cOL2='f', col3=5, COL4=true)
     """.stripMargin
 
-    val parsed1 = parser.parsePlan(sql1)
-    val parsed2 = parser.parsePlan(sql2)
-
-    val expected1 = AlterTableAddPartition(
-      TableIdentifier("view_name", None),
-      Seq(
-        (Map("dt" -> "2008-08-08", "country" -> "us"), None),
-        (Map("dt" -> "2009-09-09", "country" -> "uk"), None)),
-      ifNotExists = true)(sql1)
-    val expected2 = AlterTableAddPartition(
-      TableIdentifier("view_name", None),
-      Seq((Map("col1" -> "NULL", "col2" -> "f", "col3" -> "5", "col4" -> "true"), None)),
-      ifNotExists = false)(sql2)
-
-    comparePlans(parsed1, expected1)
-    comparePlans(parsed2, expected2)
+    intercept[ParseException] {
+      parser.parsePlan(sql1)
+    }
+    intercept[ParseException] {
+      parser.parsePlan(sql2)
+    }
   }
 
   test("alter table: rename partition") {
@@ -416,8 +406,13 @@ class DDLCommandSuite extends PlanTest {
 
     val parsed1_table = parser.parsePlan(sql1_table)
     val parsed2_table = parser.parsePlan(sql2_table)
-    val parsed1_view = parser.parsePlan(sql1_view)
-    val parsed2_view = parser.parsePlan(sql2_view)
+
+    intercept[ParseException] {
+      parser.parsePlan(sql1_view)
+    }
+    intercept[ParseException] {
+      parser.parsePlan(sql2_view)
+    }
 
     val tableIdent = TableIdentifier("table_name", None)
     val expected1_table = AlterTableDropPartition(
@@ -435,25 +430,8 @@ class DDLCommandSuite extends PlanTest {
       ifExists = false,
       purge = true)(sql2_table)
 
-    val expected1_view = AlterTableDropPartition(
-      tableIdent,
-      Seq(
-        Map("dt" -> "2008-08-08", "country" -> "us"),
-        Map("dt" -> "2009-09-09", "country" -> "uk")),
-      ifExists = true,
-      purge = false)(sql1_view)
-    val expected2_view = AlterTableDropPartition(
-      tableIdent,
-      Seq(
-        Map("dt" -> "2008-08-08", "country" -> "us"),
-        Map("dt" -> "2009-09-09", "country" -> "uk")),
-      ifExists = false,
-      purge = false)(sql2_table)
-
     comparePlans(parsed1_table, expected1_table)
     comparePlans(parsed2_table, expected2_table)
-    comparePlans(parsed1_view, expected1_view)
-    comparePlans(parsed2_view, expected2_view)
   }
 
   test("alter table: archive partition") {
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index b01f556f0addebb08d67f53a83f2dd24236dd2e5..9e3cb18d457cefacbab81bd17ccf6f55a5bf5198 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -372,7 +372,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "alter_index",
 
     // Macro commands are not supported
-    "macro"
+    "macro",
+
+    // Create partitioned view is not supported
+    "create_like_view",
+    "describe_formatted_view_partitioned"
   )
 
   /**
@@ -482,7 +486,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "cp_mj_rc",
     "create_insert_outputformat",
     "create_like_tbl_props",
-    "create_like_view",
     "create_nested_type",
     "create_skewed_table1",
     "create_struct_table",
@@ -507,7 +510,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "default_partition_name",
     "delimiter",
     "desc_non_existent_tbl",
-    "describe_formatted_view_partitioned",
     "diff_part_input_formats",
     "disable_file_format_check",
     "disallow_incompatible_type_change_off",
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
index c6c0b2ca59dff852a0e78f05b938bf6baad07a31..ab69d3502e938fb757c52d7c579191cde6d8b8c3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -215,11 +215,19 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
 
   /**
    * Create or replace a view. This creates a [[CreateViewAsSelect]] command.
+   *
+   * For example:
+   * {{{
+   *   CREATE VIEW [IF NOT EXISTS] [db_name.]view_name
+   *   [(column_name [COMMENT column_comment], ...) ]
+   *   [COMMENT view_comment]
+   *   [TBLPROPERTIES (property_name = property_value, ...)]
+   *   AS SELECT ...;
+   * }}}
    */
   override def visitCreateView(ctx: CreateViewContext): LogicalPlan = withOrigin(ctx) {
-    // Pass a partitioned view on to hive.
     if (ctx.identifierList != null) {
-      HiveNativeCommand(command(ctx))
+      throw new ParseException(s"Operation not allowed: partitioned views", ctx)
     } else {
       if (ctx.STRING != null) {
         logWarning("COMMENT clause is ignored.")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
similarity index 82%
rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index a8a0d6b8de36452ac49461066b30b45ad7da8dcf..b4e5d4adf172883dcd15dfab2a1164ebde5e492d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -26,16 +26,18 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans
 import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan
 import org.apache.spark.sql.catalyst.expressions.JsonTuple
+import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
-import org.apache.spark.sql.hive.execution.HiveSqlParser
+import org.apache.spark.sql.hive.execution.{HiveNativeCommand, HiveSqlParser}
 
-class HiveQlSuite extends PlanTest {
+class HiveDDLCommandSuite extends PlanTest {
   val parser = HiveSqlParser
 
   private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
     parser.parsePlan(sql).collect {
-      case CreateTableAsSelect(desc, child, allowExisting) => (desc, allowExisting)
+      case CreateTableAsSelect(desc, _, allowExisting) => (desc, allowExisting)
+      case CreateViewAsSelect(desc, _, allowExisting, _, _) => (desc, allowExisting)
     }.head
   }
 
@@ -251,4 +253,56 @@ class HiveQlSuite extends PlanTest {
         |LATERAL VIEW explode(`gen``tab1`.`gen``col1`) `gen``tab2` AS `gen``col2`
       """.stripMargin)
   }
+
+  test("create view -- basic") {
+    val v1 = "CREATE VIEW view1 AS SELECT * FROM tab1"
+    val (desc, exists) = extractTableDesc(v1)
+    assert(!exists)
+    assert(desc.identifier.database.isEmpty)
+    assert(desc.identifier.table == "view1")
+    assert(desc.tableType == CatalogTableType.VIRTUAL_VIEW)
+    assert(desc.storage.locationUri.isEmpty)
+    assert(desc.schema == Seq.empty[CatalogColumn])
+    assert(desc.viewText.contains("SELECT * FROM tab1"))
+    assert(desc.viewOriginalText.contains("SELECT * FROM tab1"))
+    assert(desc.storage.serdeProperties == Map())
+    assert(desc.storage.inputFormat.isEmpty)
+    assert(desc.storage.outputFormat.isEmpty)
+    assert(desc.storage.serde.isEmpty)
+    assert(desc.properties == Map())
+  }
+
+  test("create view - full") {
+    val v1 =
+      """
+        |CREATE OR REPLACE VIEW IF NOT EXISTS view1
+        |(col1, col3)
+        |COMMENT 'I cannot spell'
+        |TBLPROPERTIES('prop1Key'="prop1Val")
+        |AS SELECT * FROM tab1
+      """.stripMargin
+    val (desc, exists) = extractTableDesc(v1)
+    assert(exists)
+    assert(desc.identifier.database.isEmpty)
+    assert(desc.identifier.table == "view1")
+    assert(desc.tableType == CatalogTableType.VIRTUAL_VIEW)
+    assert(desc.storage.locationUri.isEmpty)
+    assert(desc.schema ==
+      CatalogColumn("col1", null, nullable = true, None) ::
+        CatalogColumn("col3", null, nullable = true, None) :: Nil)
+    assert(desc.viewText.contains("SELECT * FROM tab1"))
+    assert(desc.viewOriginalText.contains("SELECT * FROM tab1"))
+    assert(desc.storage.serdeProperties == Map())
+    assert(desc.storage.inputFormat.isEmpty)
+    assert(desc.storage.outputFormat.isEmpty)
+    assert(desc.storage.serde.isEmpty)
+    assert(desc.properties == Map("prop1Key" -> "prop1Val"))
+  }
+
+  test("create view -- partitioned view") {
+    val v1 = "CREATE VIEW view1 partitioned on (ds, hr) as select * from srcpart"
+    intercept[ParseException] {
+      parser.parsePlan(v1).isInstanceOf[HiveNativeCommand]
+    }
+  }
 }