From 7d2ed8cc030f3d84fea47fded072c320c3d87ca7 Mon Sep 17 00:00:00 2001
From: Andrew Or <andrew@databricks.com>
Date: Wed, 13 Apr 2016 11:08:34 -0700
Subject: [PATCH] [SPARK-14388][SQL] Implement CREATE TABLE

## What changes were proposed in this pull request?

This patch implements the `CREATE TABLE` command using the `SessionCatalog`. Previously we handled only `CTAS` and `CREATE TABLE ... USING`. This requires us to refactor `CatalogTable` to accept various fields (e.g. bucket and skew columns) and pass them to Hive.

WIP: Note that I haven't verified whether this actually works yet! But I believe it does.

## How was this patch tested?

Tests will come in a future commit.

Author: Andrew Or <andrew@databricks.com>
Author: Yin Huai <yhuai@databricks.com>

Closes #12271 from andrewor14/create-table-ddl.
---
 .../spark/sql/catalyst/parser/SqlBase.g4      |   3 +-
 .../sql/catalyst/catalog/interface.scala      |  26 +-
 .../catalyst/catalog/CatalogTestCases.scala   |   8 +-
 .../spark/sql/execution/SparkSqlParser.scala  |  25 +-
 .../spark/sql/execution/command/ddl.scala     |  23 --
 .../spark/sql/execution/command/tables.scala  |  80 +++++++
 .../execution/command/DDLCommandSuite.scala   |  20 +-
 .../sql/execution/command/DDLSuite.scala      |   2 -
 .../sql/hive/thriftserver/CliSuite.scala      |   8 +-
 .../execution/HiveCompatibilitySuite.scala    | 128 +++++-----
 .../spark/sql/hive/HiveMetastoreCatalog.scala |  16 +-
 .../sql/hive/client/HiveClientImpl.scala      |  44 +++-
 .../sql/hive/execution/HiveSqlParser.scala    | 223 +++++++++++-------
 .../spark/sql/hive/HiveDDLCommandSuite.scala  | 217 +++++++++++++++--
 .../sql/hive/HiveMetastoreCatalogSuite.scala  |   4 +-
 .../sql/hive/InsertIntoHiveTableSuite.scala   |   2 +-
 .../sql/hive/execution/PruningSuite.scala     |   2 +-
 .../sql/hive/execution/SQLQuerySuite.scala    |   2 +-
 18 files changed, 571 insertions(+), 262 deletions(-)
 create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 0e2cd39448..a937ad1eb7 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -272,8 +272,7 @@ createFileFormat
     ;
 
 fileFormat
-    : INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)?
-      (INPUTDRIVER inDriver=STRING OUTPUTDRIVER outDriver=STRING)?                         #tableFileFormat
+    : INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)?         #tableFileFormat
     | identifier                                                                           #genericFileFormat
     ;
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 4ef59316ce..ad989a97e4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -220,14 +220,30 @@ case class CatalogTable(
     tableType: CatalogTableType,
     storage: CatalogStorageFormat,
     schema: Seq[CatalogColumn],
-    partitionColumns: Seq[CatalogColumn] = Seq.empty,
-    sortColumns: Seq[CatalogColumn] = Seq.empty,
-    numBuckets: Int = 0,
+    partitionColumnNames: Seq[String] = Seq.empty,
+    sortColumnNames: Seq[String] = Seq.empty,
+    bucketColumnNames: Seq[String] = Seq.empty,
+    numBuckets: Int = -1,
     createTime: Long = System.currentTimeMillis,
-    lastAccessTime: Long = System.currentTimeMillis,
+    lastAccessTime: Long = -1,
     properties: Map[String, String] = Map.empty,
     viewOriginalText: Option[String] = None,
-    viewText: Option[String] = None) {
+    viewText: Option[String] = None,
+    comment: Option[String] = None) {
+
+  // Verify that the provided columns are part of the schema
+  private val colNames = schema.map(_.name).toSet
+  private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = {
+    require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " +
+      s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
+  }
+  requireSubsetOfSchema(partitionColumnNames, "partition")
+  requireSubsetOfSchema(sortColumnNames, "sort")
+  requireSubsetOfSchema(bucketColumnNames, "bucket")
+
+  /** Columns this table is partitioned by. */
+  def partitionColumns: Seq[CatalogColumn] =
+    schema.filter { c => partitionColumnNames.contains(c.name) }
 
   /** Return the database this table was specified to belong to, assuming it exists. */
   def database: String = identifier.database.getOrElse {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index 0d9b0851fa..f961fe3292 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -553,8 +553,12 @@ abstract class CatalogTestUtils {
       identifier = TableIdentifier(name, database),
       tableType = CatalogTableType.EXTERNAL_TABLE,
       storage = storageFormat,
-      schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
-      partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")))
+      schema = Seq(
+        CatalogColumn("col1", "int"),
+        CatalogColumn("col2", "string"),
+        CatalogColumn("a", "int"),
+        CatalogColumn("b", "string")),
+      partitionColumnNames = Seq("a", "b"))
   }
 
   def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 73d9640c35..af92cecee5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -179,7 +179,9 @@ class SparkSqlAstBuilder extends AstBuilder {
     }
   }
 
-  /** Type to keep track of a table header. */
+  /**
+   * Type to keep track of a table header: (identifier, isTemporary, ifNotExists, isExternal).
+   */
   type TableHeader = (TableIdentifier, Boolean, Boolean, Boolean)
 
   /**
@@ -616,10 +618,7 @@ class SparkSqlAstBuilder extends AstBuilder {
       case s: GenericFileFormatContext =>
         (Seq.empty[String], Option(s.identifier.getText))
       case s: TableFileFormatContext =>
-        val elements = Seq(s.inFmt, s.outFmt) ++
-          Option(s.serdeCls).toSeq ++
-          Option(s.inDriver).toSeq ++
-          Option(s.outDriver).toSeq
+        val elements = Seq(s.inFmt, s.outFmt) ++ Option(s.serdeCls).toSeq
         (elements.map(string), None)
     }
     AlterTableSetFileFormat(
@@ -773,22 +772,6 @@ class SparkSqlAstBuilder extends AstBuilder {
         .map(_.identifier.getText))
   }
 
-  /**
-   * Create a skew specification. This contains three components:
-   * - The Skewed Columns
-   * - Values for which are skewed. The size of each entry must match the number of skewed columns.
-   * - A store in directory flag.
-   */
-  override def visitSkewSpec(
-      ctx: SkewSpecContext): (Seq[String], Seq[Seq[String]], Boolean) = withOrigin(ctx) {
-    val skewedValues = if (ctx.constantList != null) {
-      Seq(visitConstantList(ctx.constantList))
-    } else {
-      visitNestedConstantList(ctx.nestedConstantList)
-    }
-    (visitIdentifierList(ctx.identifierList), skewedValues, ctx.DIRECTORIES != null)
-  }
-
   /**
    * Convert a nested constants list into a sequence of string sequences.
    */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 5137bd11d8..234099ad15 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -224,29 +224,6 @@ case class DropTable(
   }
 }
 
-/**
- * A command that renames a table/view.
- *
- * The syntax of this command is:
- * {{{
- *   ALTER TABLE table1 RENAME TO table2;
- *   ALTER VIEW view1 RENAME TO view2;
- * }}}
- */
-case class AlterTableRename(
-    oldName: TableIdentifier,
-    newName: TableIdentifier)
-  extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val catalog = sqlContext.sessionState.catalog
-    catalog.invalidateTable(oldName)
-    catalog.renameTable(oldName, newName)
-    Seq.empty[Row]
-  }
-
-}
-
 /**
  * A command that sets table/view properties.
  *
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
new file mode 100644
index 0000000000..9c6030502d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+
+
+// TODO: move the rest of the table commands from ddl.scala to this file
+
+/**
+ * A command to create a table.
+ *
+ * Note: This is currently used only for creating Hive tables.
+ * This is not intended for temporary tables.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ *   CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
+ *   [(col1 data_type [COMMENT col_comment], ...)]
+ *   [COMMENT table_comment]
+ *   [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
+ *   [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS]
+ *   [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...)
+ *   [STORED AS DIRECTORIES]
+ *   [ROW FORMAT row_format]
+ *   [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
+ *   [LOCATION path]
+ *   [TBLPROPERTIES (property_name=property_value, ...)]
+ *   [AS select_statement];
+ * }}}
+ */
+case class CreateTable(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    sqlContext.sessionState.catalog.createTable(table, ifNotExists)
+    Seq.empty[Row]
+  }
+
+}
+
+
+/**
+ * A command that renames a table/view.
+ *
+ * The syntax of this command is:
+ * {{{
+ *    ALTER TABLE table1 RENAME TO table2;
+ *    ALTER VIEW view1 RENAME TO view2;
+ * }}}
+ */
+case class AlterTableRename(
+    oldName: TableIdentifier,
+    newName: TableIdentifier)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    catalog.invalidateTable(oldName)
+    catalog.renameTable(oldName, newName)
+    Seq.empty[Row]
+  }
+
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 1c8dd68286..6e6475ee29 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -440,37 +440,25 @@ class DDLCommandSuite extends PlanTest {
   }
 
   test("alter table: set file format") {
-    val sql1 =
-      """
-       |ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test'
-       |OUTPUTFORMAT 'test' SERDE 'test' INPUTDRIVER 'test' OUTPUTDRIVER 'test'
-      """.stripMargin
-    val sql2 = "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
+    val sql1 = "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
       "OUTPUTFORMAT 'test' SERDE 'test'"
-    val sql3 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
+    val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
       "SET FILEFORMAT PARQUET"
     val parsed1 = parser.parsePlan(sql1)
     val parsed2 = parser.parsePlan(sql2)
-    val parsed3 = parser.parsePlan(sql3)
     val tableIdent = TableIdentifier("table_name", None)
     val expected1 = AlterTableSetFileFormat(
       tableIdent,
       None,
-      List("test", "test", "test", "test", "test"),
+      List("test", "test", "test"),
       None)(sql1)
     val expected2 = AlterTableSetFileFormat(
-      tableIdent,
-      None,
-      List("test", "test", "test"),
-      None)(sql2)
-    val expected3 = AlterTableSetFileFormat(
       tableIdent,
       Some(Map("dt" -> "2008-08-08", "country" -> "us")),
       Seq(),
-      Some("PARQUET"))(sql3)
+      Some("PARQUET"))(sql2)
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
-    comparePlans(parsed3, expected3)
   }
 
   test("alter table: set location") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 40a8b0e614..9ffffa0bdd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -380,8 +380,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
-  // TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext
-
   test("show tables") {
     withTempTable("show1a", "show2b") {
       sql(
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index bfc3d195ff..eb49eabcb1 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -162,7 +162,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
 
     runCliWithin(3.minute)(
       "CREATE TABLE hive_test(key INT, val STRING);"
-        -> "OK",
+        -> "",
       "SHOW TABLES;"
         -> "hive_test",
       s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE hive_test;"
@@ -187,7 +187,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
       "USE hive_test_db;"
         -> "",
       "CREATE TABLE hive_test(key INT, val STRING);"
-        -> "OK",
+        -> "",
       "SHOW TABLES;"
         -> "hive_test"
     )
@@ -210,9 +210,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
       """CREATE TABLE t1(key string, val string)
         |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
       """.stripMargin
-        -> "OK",
+        -> "",
       "CREATE TABLE sourceTable (key INT, val STRING);"
-        -> "OK",
+        -> "",
       s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;"
         -> "OK",
       "INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;"
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index f0eeda09db..a45d180464 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -366,10 +366,76 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "sort_merge_join_desc_6",
     "sort_merge_join_desc_7",
 
+    // These tests try to create a table with bucketed columns, which we don't support
+    "auto_join32",
+    "auto_join_filters",
+    "auto_smb_mapjoin_14",
+    "ct_case_insensitive",
+    "explain_rearrange",
+    "groupby_sort_10",
+    "groupby_sort_2",
+    "groupby_sort_3",
+    "groupby_sort_4",
+    "groupby_sort_5",
+    "groupby_sort_7",
+    "groupby_sort_8",
+    "groupby_sort_9",
+    "groupby_sort_test_1",
+    "inputddl4",
+    "join_filters",
+    "join_nulls",
+    "join_nullsafe",
+    "load_dyn_part2",
+    "orc_empty_files",
+    "reduce_deduplicate",
+    "smb_mapjoin9",
+    "smb_mapjoin_1",
+    "smb_mapjoin_10",
+    "smb_mapjoin_13",
+    "smb_mapjoin_14",
+    "smb_mapjoin_15",
+    "smb_mapjoin_16",
+    "smb_mapjoin_17",
+    "smb_mapjoin_2",
+    "smb_mapjoin_21",
+    "smb_mapjoin_25",
+    "smb_mapjoin_3",
+    "smb_mapjoin_4",
+    "smb_mapjoin_5",
+    "smb_mapjoin_6",
+    "smb_mapjoin_7",
+    "smb_mapjoin_8",
+    "sort_merge_join_desc_1",
+    "sort_merge_join_desc_2",
+    "sort_merge_join_desc_3",
+    "sort_merge_join_desc_4",
+
+    // These tests try to create a table with skewed columns, which we don't support
+    "create_skewed_table1",
+    "skewjoinopt13",
+    "skewjoinopt18",
+    "skewjoinopt9",
+
     // Index commands are not supported
     "drop_index",
     "drop_index_removes_partition_dirs",
     "alter_index",
+    "auto_sortmerge_join_1",
+    "auto_sortmerge_join_10",
+    "auto_sortmerge_join_11",
+    "auto_sortmerge_join_12",
+    "auto_sortmerge_join_13",
+    "auto_sortmerge_join_14",
+    "auto_sortmerge_join_15",
+    "auto_sortmerge_join_16",
+    "auto_sortmerge_join_2",
+    "auto_sortmerge_join_3",
+    "auto_sortmerge_join_4",
+    "auto_sortmerge_join_5",
+    "auto_sortmerge_join_6",
+    "auto_sortmerge_join_7",
+    "auto_sortmerge_join_8",
+    "auto_sortmerge_join_9",
 
     // Macro commands are not supported
     "macro",
@@ -435,33 +501,14 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "auto_join3",
     "auto_join30",
     "auto_join31",
-    "auto_join32",
     "auto_join4",
     "auto_join5",
     "auto_join6",
     "auto_join7",
     "auto_join8",
     "auto_join9",
-    "auto_join_filters",
     "auto_join_nulls",
     "auto_join_reordering_values",
-    "auto_smb_mapjoin_14",
-    "auto_sortmerge_join_1",
-    "auto_sortmerge_join_10",
-    "auto_sortmerge_join_11",
-    "auto_sortmerge_join_12",
-    "auto_sortmerge_join_13",
-    "auto_sortmerge_join_14",
-    "auto_sortmerge_join_15",
-    "auto_sortmerge_join_16",
-    "auto_sortmerge_join_2",
-    "auto_sortmerge_join_3",
-    "auto_sortmerge_join_4",
-    "auto_sortmerge_join_5",
-    "auto_sortmerge_join_6",
-    "auto_sortmerge_join_7",
-    "auto_sortmerge_join_8",
-    "auto_sortmerge_join_9",
     "binary_constant",
     "binarysortable_1",
     "cast1",
@@ -492,13 +539,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "create_insert_outputformat",
     "create_like_tbl_props",
     "create_nested_type",
-    "create_skewed_table1",
     "create_struct_table",
     "create_view_translate",
     "cross_join",
     "cross_product_check_1",
     "cross_product_check_2",
-    "ct_case_insensitive",
     "database_drop",
     "database_location",
     "database_properties",
@@ -534,7 +579,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "escape_distributeby1",
     "escape_orderby1",
     "escape_sortby1",
-    "explain_rearrange",
     "fileformat_mix",
     "fileformat_sequencefile",
     "fileformat_text",
@@ -589,16 +633,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "groupby_neg_float",
     "groupby_ppd",
     "groupby_ppr",
-    "groupby_sort_10",
-    "groupby_sort_2",
-    "groupby_sort_3",
-    "groupby_sort_4",
-    "groupby_sort_5",
     "groupby_sort_6",
-    "groupby_sort_7",
-    "groupby_sort_8",
-    "groupby_sort_9",
-    "groupby_sort_test_1",
     "having",
     "implicit_cast1",
     "index_serde",
@@ -653,7 +688,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "inputddl1",
     "inputddl2",
     "inputddl3",
-    "inputddl4",
     "inputddl6",
     "inputddl7",
     "inputddl8",
@@ -709,11 +743,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "join_array",
     "join_casesensitive",
     "join_empty",
-    "join_filters",
     "join_hive_626",
     "join_map_ppr",
-    "join_nulls",
-    "join_nullsafe",
     "join_rc",
     "join_reorder2",
     "join_reorder3",
@@ -737,7 +768,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "load_dyn_part13",
     "load_dyn_part14",
     "load_dyn_part14_win",
-    "load_dyn_part2",
     "load_dyn_part3",
     "load_dyn_part4",
     "load_dyn_part5",
@@ -790,7 +820,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "nullscript",
     "optional_outer",
     "orc_dictionary_threshold",
-    "orc_empty_files",
     "order",
     "order2",
     "outer_join_ppr",
@@ -846,7 +875,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "rcfile_null_value",
     "rcfile_toleratecorruptions",
     "rcfile_union",
-    "reduce_deduplicate",
     "reduce_deduplicate_exclude_gby",
     "reduce_deduplicate_exclude_join",
     "reduce_deduplicate_extended",
@@ -867,31 +895,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "show_functions",
     "show_partitions",
     "show_tblproperties",
-    "skewjoinopt13",
-    "skewjoinopt18",
-    "skewjoinopt9",
-    "smb_mapjoin9",
-    "smb_mapjoin_1",
-    "smb_mapjoin_10",
-    "smb_mapjoin_13",
-    "smb_mapjoin_14",
-    "smb_mapjoin_15",
-    "smb_mapjoin_16",
-    "smb_mapjoin_17",
-    "smb_mapjoin_2",
-    "smb_mapjoin_21",
-    "smb_mapjoin_25",
-    "smb_mapjoin_3",
-    "smb_mapjoin_4",
-    "smb_mapjoin_5",
-    "smb_mapjoin_6",
-    "smb_mapjoin_7",
-    "smb_mapjoin_8",
     "sort",
-    "sort_merge_join_desc_1",
-    "sort_merge_join_desc_2",
-    "sort_merge_join_desc_3",
-    "sort_merge_join_desc_4",
     "stats0",
     "stats_aggregator_error_1",
     "stats_empty_partition",
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 14f331961e..ccc8345d73 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -91,7 +91,7 @@ private[hive] object HiveSerDe {
       "textfile" ->
         HiveSerDe(
           inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
-          outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")),
+          outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
 
       "avro" ->
         HiveSerDe(
@@ -905,8 +905,13 @@ private[hive] case class MetastoreRelation(
 
     val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
     tTable.setSd(sd)
-    sd.setCols(table.schema.map(toHiveColumn).asJava)
-    tTable.setPartitionKeys(table.partitionColumns.map(toHiveColumn).asJava)
+
+    // Note: In Hive the schema and partition columns must be disjoint sets
+    val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
+      table.partitionColumnNames.contains(c.getName)
+    }
+    sd.setCols(schema.asJava)
+    tTable.setPartitionKeys(partCols.asJava)
 
     table.storage.locationUri.foreach(sd.setLocation)
     table.storage.inputFormat.foreach(sd.setInputFormat)
@@ -1013,7 +1018,10 @@ private[hive] case class MetastoreRelation(
   val partitionKeys = table.partitionColumns.map(_.toAttribute)
 
   /** Non-partitionKey attributes */
-  val attributes = table.schema.map(_.toAttribute)
+  // TODO: just make this hold the schema itself, not just non-partition columns
+  val attributes = table.schema
+    .filter { c => !table.partitionColumnNames.contains(c.name) }
+    .map(_.toAttribute)
 
   val output = attributes ++ partitionKeys
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 39e26acd7f..2a1fff92b5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -299,6 +299,10 @@ private[hive] class HiveClientImpl(
       tableName: String): Option[CatalogTable] = withHiveState {
     logDebug(s"Looking up $dbName.$tableName")
     Option(client.getTable(dbName, tableName, false)).map { h =>
+      // Note: Hive separates partition columns and the schema, but for us the
+      // partition columns are part of the schema
+      val partCols = h.getPartCols.asScala.map(fromHiveColumn)
+      val schema = h.getCols.asScala.map(fromHiveColumn) ++ partCols
       CatalogTable(
         identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
         tableType = h.getTableType match {
@@ -307,9 +311,10 @@ private[hive] class HiveClientImpl(
           case HiveTableType.INDEX_TABLE => CatalogTableType.INDEX_TABLE
           case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIRTUAL_VIEW
         },
-        schema = h.getCols.asScala.map(fromHiveColumn),
-        partitionColumns = h.getPartCols.asScala.map(fromHiveColumn),
-        sortColumns = Seq(),
+        schema = schema,
+        partitionColumnNames = partCols.map(_.name),
+        sortColumnNames = Seq(), // TODO: populate this
+        bucketColumnNames = h.getBucketCols.asScala,
         numBuckets = h.getNumBuckets,
         createTime = h.getTTable.getCreateTime.toLong * 1000,
         lastAccessTime = h.getLastAccessTime.toLong * 1000,
@@ -675,24 +680,37 @@ private[hive] class HiveClientImpl(
 
   private def toHiveTable(table: CatalogTable): HiveTable = {
     val hiveTable = new HiveTable(table.database, table.identifier.table)
-    // For EXTERNAL_TABLE/MANAGED_TABLE, we also need to set EXTERNAL field in
-    // the table properties accodringly. Otherwise, if EXTERNAL_TABLE is the table type
-    // but EXTERNAL field is not set, Hive metastore will change the type to
-    // MANAGED_TABLE (see
-    // metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105)
+    // For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties.
+    // Otherwise, Hive metastore will change the table to a MANAGED_TABLE.
+    // (metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105)
     hiveTable.setTableType(table.tableType match {
       case CatalogTableType.EXTERNAL_TABLE =>
         hiveTable.setProperty("EXTERNAL", "TRUE")
         HiveTableType.EXTERNAL_TABLE
       case CatalogTableType.MANAGED_TABLE =>
-        hiveTable.setProperty("EXTERNAL", "FALSE")
         HiveTableType.MANAGED_TABLE
       case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE
       case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW
     })
-    hiveTable.setFields(table.schema.map(toHiveColumn).asJava)
-    hiveTable.setPartCols(table.partitionColumns.map(toHiveColumn).asJava)
+    // Note: In Hive the schema and partition columns must be disjoint sets
+    val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
+      table.partitionColumnNames.contains(c.getName)
+    }
+    if (table.schema.isEmpty) {
+      // This is a hack to preserve existing behavior. Before Spark 2.0, we do not
+      // set a default serde here (this was done in Hive), and so if the user provides
+      // an empty schema Hive would automatically populate the schema with a single
+      // field "col". However, after SPARK-14388, we set the default serde to
+      // LazySimpleSerde so this implicit behavior no longer happens. Therefore,
+      // we need to do it in Spark ourselves.
+      hiveTable.setFields(
+        Seq(new FieldSchema("col", "array<string>", "from deserializer")).asJava)
+    } else {
+      hiveTable.setFields(schema.asJava)
+    }
+    hiveTable.setPartCols(partCols.asJava)
     // TODO: set sort columns here too
+    hiveTable.setBucketCols(table.bucketColumnNames.asJava)
     hiveTable.setOwner(conf.getUser)
     hiveTable.setNumBuckets(table.numBuckets)
     hiveTable.setCreateTime((table.createTime / 1000).toInt)
@@ -700,9 +718,11 @@ private[hive] class HiveClientImpl(
     table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) }
     table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass)
     table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass)
-    table.storage.serde.foreach(hiveTable.setSerializationLib)
+    hiveTable.setSerializationLib(
+      table.storage.serde.getOrElse("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
     table.storage.serdeProperties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) }
     table.properties.foreach { case (k, v) => hiveTable.setProperty(k, v) }
+    table.comment.foreach { c => hiveTable.setProperty("comment", c) }
     table.viewOriginalText.foreach { t => hiveTable.setViewOriginalText(t) }
     table.viewText.foreach { t => hiveTable.setViewExpandedText(t) }
     hiveTable
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
index 7a435117e7..b14db7fe71 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.hive.serde.serdeConstants
 import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
@@ -33,8 +34,9 @@ import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.CreateTable
 import org.apache.spark.sql.hive.{CreateTableAsSelect => CTAS, CreateViewAsSelect => CreateView}
-import org.apache.spark.sql.hive.{HiveGenericUDTF, HiveSerDe}
+import org.apache.spark.sql.hive.{HiveGenericUDTF, HiveMetastoreTypes, HiveSerDe}
 import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
 
 /**
@@ -121,84 +123,116 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
   }
 
   /**
-   * Create a [[CatalogStorageFormat]]. This is part of the [[CreateTableAsSelect]] command.
+   * Create a [[CatalogStorageFormat]] for creating tables.
    */
   override def visitCreateFileFormat(
       ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
-    if (ctx.storageHandler == null) {
-      typedVisit[CatalogStorageFormat](ctx.fileFormat)
-    } else {
-      visitStorageHandler(ctx.storageHandler)
+    (ctx.fileFormat, ctx.storageHandler) match {
+      // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format
+      case (c: TableFileFormatContext, null) =>
+          visitTableFileFormat(c)
+      // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO
+      case (c: GenericFileFormatContext, null) =>
+        visitGenericFileFormat(c)
+      case (null, storageHandler) =>
+        throw new ParseException("Operation not allowed: ... STORED BY storage_handler ...", ctx)
+      case _ =>
+        throw new ParseException("expected either STORED AS or STORED BY, not both", ctx)
     }
   }
 
   /**
-   * Create a [[CreateTableAsSelect]] command.
+   * Create a table, returning either a [[CreateTable]] or a [[CreateTableAsSelect]].
+   *
+   * This is not used to create datasource tables, which is handled through
+   * "CREATE TABLE ... USING ...".
+   *
+   * Note: several features are currently not supported - temporary tables, bucketing,
+   * skewed columns and storage handlers (STORED BY).
+   *
+   * Expected format:
+   * {{{
+   *   CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
+   *   [(col1 data_type [COMMENT col_comment], ...)]
+   *   [COMMENT table_comment]
+   *   [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
+   *   [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS]
+   *   [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) [STORED AS DIRECTORIES]]
+   *   [ROW FORMAT row_format]
+   *   [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
+   *   [LOCATION path]
+   *   [TBLPROPERTIES (property_name=property_value, ...)]
+   *   [AS select_statement];
+   * }}}
    */
-  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
-    if (ctx.query == null) {
-      HiveNativeCommand(command(ctx))
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
+    val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
+    // TODO: implement temporary tables
+    if (temp) {
+      throw new ParseException(
+        "CREATE TEMPORARY TABLE is not supported yet. " +
+        "Please use registerTempTable as an alternative.", ctx)
+    }
+    if (ctx.skewSpec != null) {
+      throw new ParseException("Operation not allowed: CREATE TABLE ... SKEWED BY ...", ctx)
+    }
+    if (ctx.bucketSpec != null) {
+      throw new ParseException("Operation not allowed: CREATE TABLE ... CLUSTERED BY ...", ctx)
+    }
+    val tableType = if (external) {
+      CatalogTableType.EXTERNAL_TABLE
     } else {
-      // Get the table header.
-      val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
-      val tableType = if (external) {
-        CatalogTableType.EXTERNAL_TABLE
-      } else {
-        CatalogTableType.MANAGED_TABLE
-      }
-
-      // Unsupported clauses.
-      if (temp) {
-        throw new ParseException(s"Unsupported operation: TEMPORARY clause.", ctx)
-      }
-      if (ctx.bucketSpec != null) {
-        // TODO add this - we need cluster columns in the CatalogTable for this to work.
-        throw new ParseException("Unsupported operation: " +
-          "CLUSTERED BY ... [ORDERED BY ...] INTO ... BUCKETS clause.", ctx)
-      }
-      if (ctx.skewSpec != null) {
-        throw new ParseException("Operation not allowed: " +
-          "SKEWED BY ... ON ... [STORED AS DIRECTORIES] clause.", ctx)
-      }
-
-      // Create the schema.
-      val schema = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns(_, _.toLowerCase))
-
-      // Get the column by which the table is partitioned.
-      val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns(_))
-
-      // Create the storage.
-      def format(fmt: ParserRuleContext): CatalogStorageFormat = {
-        Option(fmt).map(typedVisit[CatalogStorageFormat]).getOrElse(EmptyStorageFormat)
-      }
-      // Default storage.
+      CatalogTableType.MANAGED_TABLE
+    }
+    val comment = Option(ctx.STRING).map(string)
+    val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
+    val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns)
+    val properties = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)
+    val selectQuery = Option(ctx.query).map(plan)
+
+    // Note: Hive requires partition columns to be distinct from the schema, so we need
+    // to include the partition columns here explicitly
+    val schema = cols ++ partitionCols
+
+    // Storage format
+    val defaultStorage: CatalogStorageFormat = {
       val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
-      val hiveSerDe = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf).getOrElse {
-        HiveSerDe(
-          inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
-          outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
-      }
-      // Defined storage.
-      val fileStorage = format(ctx.createFileFormat)
-      val rowStorage = format(ctx.rowFormat)
-      val storage = CatalogStorageFormat(
-        Option(ctx.locationSpec).map(visitLocationSpec),
-        fileStorage.inputFormat.orElse(hiveSerDe.inputFormat),
-        fileStorage.outputFormat.orElse(hiveSerDe.outputFormat),
-        rowStorage.serde.orElse(hiveSerDe.serde).orElse(fileStorage.serde),
-        rowStorage.serdeProperties ++ fileStorage.serdeProperties
-      )
+      val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf)
+      CatalogStorageFormat(
+        locationUri = None,
+        inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
+          .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
+        outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
+          .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
+        // Note: Keep this unspecified because we use the presence of the serde to decide
+        // whether to convert a table created by CTAS to a datasource table.
+        serde = None,
+        serdeProperties = Map())
+    }
+    val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
+      .getOrElse(EmptyStorageFormat)
+    val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
+    val location = Option(ctx.locationSpec).map(visitLocationSpec)
+    val storage = CatalogStorageFormat(
+      locationUri = location,
+      inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
+      outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
+      serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
+      serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties)
+
+    // TODO support the sql text - have a proper location for this!
+    val tableDesc = CatalogTable(
+      identifier = name,
+      tableType = tableType,
+      storage = storage,
+      schema = schema,
+      partitionColumnNames = partitionCols.map(_.name),
+      properties = properties,
+      comment = comment)
 
-      val tableDesc = CatalogTable(
-        identifier = table,
-        tableType = tableType,
-        schema = schema,
-        partitionColumns = partitionCols,
-        storage = storage,
-        properties = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty),
-        // TODO support the sql text - have a proper location for this!
-        viewText = Option(ctx.STRING).map(string))
-      CTAS(tableDesc, plan(ctx.query), ifNotExists)
+    selectQuery match {
+      case Some(q) => CTAS(tableDesc, q, ifNotExists)
+      case None => CreateTable(tableDesc, ifNotExists)
     }
   }
 
@@ -353,25 +387,19 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
   private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, Map.empty)
 
   /**
-   * Create a [[CatalogStorageFormat]]. The INPUTDRIVER and OUTPUTDRIVER clauses are currently
-   * ignored.
+   * Create a [[CatalogStorageFormat]].
    */
   override def visitTableFileFormat(
       ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
-    import ctx._
-    if (inDriver != null || outDriver != null) {
-      throw new ParseException(
-        s"Operation not allowed: INPUTDRIVER ... OUTPUTDRIVER ... clauses", ctx)
-    }
     EmptyStorageFormat.copy(
-      inputFormat = Option(string(inFmt)),
-      outputFormat = Option(string(outFmt)),
-      serde = Option(serdeCls).map(string)
+      inputFormat = Option(string(ctx.inFmt)),
+      outputFormat = Option(string(ctx.outFmt)),
+      serde = Option(ctx.serdeCls).map(string)
     )
   }
 
   /**
-   * Resolve a [[HiveSerDe]] based on the format name given.
+   * Resolve a [[HiveSerDe]] based on the name given and return it as a [[CatalogStorageFormat]].
    */
   override def visitGenericFileFormat(
       ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
@@ -388,11 +416,28 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
   }
 
   /**
-   * Storage Handlers are currently not supported in the statements we support (CTAS).
+   * Create a [[RowFormat]] used for creating tables.
+   *
+   * Example format:
+   * {{{
+   *   SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)]
+   * }}}
+   *
+   * OR
+   *
+   * {{{
+   *   DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]]
+   *   [COLLECTION ITEMS TERMINATED BY char]
+   *   [MAP KEYS TERMINATED BY char]
+   *   [LINES TERMINATED BY char]
+   *   [NULL DEFINED AS char]
+   * }}}
    */
-  override def visitStorageHandler(
-      ctx: StorageHandlerContext): CatalogStorageFormat = withOrigin(ctx) {
-    throw new ParseException("Storage Handlers are currently unsupported.", ctx)
+  private def visitRowFormat(ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) {
+    ctx match {
+      case serde: RowFormatSerdeContext => visitRowFormatSerde(serde)
+      case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited)
+    }
   }
 
   /**
@@ -435,13 +480,15 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
   /**
    * Create a sequence of [[CatalogColumn]]s from a column list
    */
-  private def visitCatalogColumns(
-      ctx: ColTypeListContext,
-      formatter: String => String = identity): Seq[CatalogColumn] = withOrigin(ctx) {
+  private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = withOrigin(ctx) {
     ctx.colType.asScala.map { col =>
       CatalogColumn(
-        formatter(col.identifier.getText),
-        col.dataType.getText.toLowerCase, // TODO validate this?
+        col.identifier.getText.toLowerCase,
+        // Note: for types like "STRUCT<myFirstName: STRING, myLastName: STRING>" we can't
+        // just convert the whole type string to lower case, otherwise the struct field names
+        // will no longer be case sensitive. Instead, we rely on our parser to get the proper
+        // case before passing it to Hive.
+        CatalystSqlParser.parseDataType(col.dataType.getText).simpleString,
         nullable = true,
         Option(col.STRING).map(string))
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index e8086aec32..68d3ea6ed9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
+import org.apache.spark.sql.execution.command.CreateTable
 import org.apache.spark.sql.hive.execution.{HiveNativeCommand, HiveSqlParser}
 
 class HiveDDLCommandSuite extends PlanTest {
@@ -36,6 +37,7 @@ class HiveDDLCommandSuite extends PlanTest {
 
   private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
     parser.parsePlan(sql).collect {
+      case CreateTable(desc, allowExisting) => (desc, allowExisting)
       case CreateTableAsSelect(desc, _, allowExisting) => (desc, allowExisting)
       case CreateViewAsSelect(desc, _, allowExisting, _, _) => (desc, allowExisting)
     }.head
@@ -76,9 +78,12 @@ class HiveDDLCommandSuite extends PlanTest {
       CatalogColumn("page_url", "string") ::
       CatalogColumn("referrer_url", "string") ::
       CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
-      CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
+      CatalogColumn("country", "string", comment = Some("country of origination")) ::
+      CatalogColumn("dt", "string", comment = Some("date type")) ::
+      CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
+    assert(desc.comment == Some("This is the staging page view table"))
     // TODO will be SQLText
-    assert(desc.viewText == Option("This is the staging page view table"))
+    assert(desc.viewText.isEmpty)
     assert(desc.viewOriginalText.isEmpty)
     assert(desc.partitionColumns ==
       CatalogColumn("dt", "string", comment = Some("date type")) ::
@@ -123,9 +128,12 @@ class HiveDDLCommandSuite extends PlanTest {
       CatalogColumn("page_url", "string") ::
       CatalogColumn("referrer_url", "string") ::
       CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
-      CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
+      CatalogColumn("country", "string", comment = Some("country of origination")) ::
+      CatalogColumn("dt", "string", comment = Some("date type")) ::
+      CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
     // TODO will be SQLText
-    assert(desc.viewText == Option("This is the staging page view table"))
+    assert(desc.comment == Some("This is the staging page view table"))
+    assert(desc.viewText.isEmpty)
     assert(desc.viewOriginalText.isEmpty)
     assert(desc.partitionColumns ==
       CatalogColumn("dt", "string", comment = Some("date type")) ::
@@ -151,7 +159,7 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(desc.storage.serdeProperties == Map())
     assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat"))
     assert(desc.storage.outputFormat ==
-      Some("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
+      Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
     assert(desc.storage.serde.isEmpty)
     assert(desc.properties == Map())
   }
@@ -203,17 +211,6 @@ class HiveDDLCommandSuite extends PlanTest {
           |AS SELECT key, value FROM src ORDER BY key, value
         """.stripMargin)
     }
-    intercept[ParseException] {
-      parser.parsePlan(
-        """CREATE TABLE ctas2
-          |STORED AS
-          |INPUTFORMAT "org.apache.hadoop.mapred.TextInputFormat"
-          |OUTPUTFORMAT "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
-          |INPUTDRIVER "org.apache.hadoop.hive.howl.rcfile.RCFileInputDriver"
-          |OUTPUTDRIVER "org.apache.hadoop.hive.howl.rcfile.RCFileOutputDriver"
-          |AS SELECT key, value FROM src ORDER BY key, value
-        """.stripMargin)
-    }
     intercept[ParseException] {
       parser.parsePlan(
         """
@@ -324,6 +321,194 @@ class HiveDDLCommandSuite extends PlanTest {
       """.stripMargin)
   }
 
+  test("create table - basic") {
+    val query = "CREATE TABLE my_table (id int, name string)"
+    val (desc, allowExisting) = extractTableDesc(query)
+    assert(!allowExisting)
+    assert(desc.identifier.database.isEmpty)
+    assert(desc.identifier.table == "my_table")
+    assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
+    assert(desc.schema == Seq(CatalogColumn("id", "int"), CatalogColumn("name", "string")))
+    assert(desc.partitionColumnNames.isEmpty)
+    assert(desc.sortColumnNames.isEmpty)
+    assert(desc.bucketColumnNames.isEmpty)
+    assert(desc.numBuckets == -1)
+    assert(desc.viewText.isEmpty)
+    assert(desc.viewOriginalText.isEmpty)
+    assert(desc.storage.locationUri.isEmpty)
+    assert(desc.storage.inputFormat ==
+      Some("org.apache.hadoop.mapred.TextInputFormat"))
+    assert(desc.storage.outputFormat ==
+      Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
+    assert(desc.storage.serde.isEmpty)
+    assert(desc.storage.serdeProperties.isEmpty)
+    assert(desc.properties.isEmpty)
+    assert(desc.comment.isEmpty)
+  }
+
+  test("create table - with database name") {
+    val query = "CREATE TABLE dbx.my_table (id int, name string)"
+    val (desc, _) = extractTableDesc(query)
+    assert(desc.identifier.database == Some("dbx"))
+    assert(desc.identifier.table == "my_table")
+  }
+
+  test("create table - temporary") {
+    val query = "CREATE TEMPORARY TABLE tab1 (id int, name string)"
+    val e = intercept[ParseException] { parser.parsePlan(query) }
+    assert(e.message.contains("registerTempTable"))
+  }
+
+  test("create table - external") {
+    val query = "CREATE EXTERNAL TABLE tab1 (id int, name string)"
+    val (desc, _) = extractTableDesc(query)
+    assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+  }
+
+  test("create table - if not exists") {
+    val query = "CREATE TABLE IF NOT EXISTS tab1 (id int, name string)"
+    val (_, allowExisting) = extractTableDesc(query)
+    assert(allowExisting)
+  }
+
+  test("create table - comment") {
+    val query = "CREATE TABLE my_table (id int, name string) COMMENT 'its hot as hell below'"
+    val (desc, _) = extractTableDesc(query)
+    assert(desc.comment == Some("its hot as hell below"))
+  }
+
+  test("create table - partitioned columns") {
+    val query = "CREATE TABLE my_table (id int, name string) PARTITIONED BY (month int)"
+    val (desc, _) = extractTableDesc(query)
+    assert(desc.schema == Seq(
+      CatalogColumn("id", "int"),
+      CatalogColumn("name", "string"),
+      CatalogColumn("month", "int")))
+    assert(desc.partitionColumnNames == Seq("month"))
+  }
+
+  test("create table - clustered by") {
+    val baseQuery = "CREATE TABLE my_table (id int, name string) CLUSTERED BY(id)"
+    val query1 = s"$baseQuery INTO 10 BUCKETS"
+    val query2 = s"$baseQuery SORTED BY(id) INTO 10 BUCKETS"
+    val e1 = intercept[ParseException] { parser.parsePlan(query1) }
+    val e2 = intercept[ParseException] { parser.parsePlan(query2) }
+    assert(e1.getMessage.contains("Operation not allowed"))
+    assert(e2.getMessage.contains("Operation not allowed"))
+  }
+
+  test("create table - skewed by") {
+    val baseQuery = "CREATE TABLE my_table (id int, name string) SKEWED BY"
+    val query1 = s"$baseQuery(id) ON (1, 10, 100)"
+    val query2 = s"$baseQuery(id, name) ON ((1, 'x'), (2, 'y'), (3, 'z'))"
+    val query3 = s"$baseQuery(id, name) ON ((1, 'x'), (2, 'y'), (3, 'z')) STORED AS DIRECTORIES"
+    val e1 = intercept[ParseException] { parser.parsePlan(query1) }
+    val e2 = intercept[ParseException] { parser.parsePlan(query2) }
+    val e3 = intercept[ParseException] { parser.parsePlan(query3) }
+    assert(e1.getMessage.contains("Operation not allowed"))
+    assert(e2.getMessage.contains("Operation not allowed"))
+    assert(e3.getMessage.contains("Operation not allowed"))
+  }
+
+  test("create table - row format") {
+    val baseQuery = "CREATE TABLE my_table (id int, name string) ROW FORMAT"
+    val query1 = s"$baseQuery SERDE 'org.apache.poof.serde.Baff'"
+    val query2 = s"$baseQuery SERDE 'org.apache.poof.serde.Baff' WITH SERDEPROPERTIES ('k1'='v1')"
+    val query3 =
+      s"""
+        |$baseQuery DELIMITED FIELDS TERMINATED BY 'x' ESCAPED BY 'y'
+        |COLLECTION ITEMS TERMINATED BY 'a'
+        |MAP KEYS TERMINATED BY 'b'
+        |LINES TERMINATED BY '\n'
+        |NULL DEFINED AS 'c'
+      """.stripMargin
+    val (desc1, _) = extractTableDesc(query1)
+    val (desc2, _) = extractTableDesc(query2)
+    val (desc3, _) = extractTableDesc(query3)
+    assert(desc1.storage.serde == Some("org.apache.poof.serde.Baff"))
+    assert(desc1.storage.serdeProperties.isEmpty)
+    assert(desc2.storage.serde == Some("org.apache.poof.serde.Baff"))
+    assert(desc2.storage.serdeProperties == Map("k1" -> "v1"))
+    assert(desc3.storage.serdeProperties == Map(
+      "field.delim" -> "x",
+      "escape.delim" -> "y",
+      "serialization.format" -> "x",
+      "line.delim" -> "\n",
+      "colelction.delim" -> "a", // yes, it's a typo from Hive :)
+      "mapkey.delim" -> "b"))
+  }
+
+  test("create table - file format") {
+    val baseQuery = "CREATE TABLE my_table (id int, name string) STORED AS"
+    val query1 = s"$baseQuery INPUTFORMAT 'winput' OUTPUTFORMAT 'wowput'"
+    val query2 = s"$baseQuery ORC"
+    val (desc1, _) = extractTableDesc(query1)
+    val (desc2, _) = extractTableDesc(query2)
+    assert(desc1.storage.inputFormat == Some("winput"))
+    assert(desc1.storage.outputFormat == Some("wowput"))
+    assert(desc1.storage.serde.isEmpty)
+    assert(desc2.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"))
+    assert(desc2.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
+    assert(desc2.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
+  }
+
+  test("create table - storage handler") {
+    val baseQuery = "CREATE TABLE my_table (id int, name string) STORED BY"
+    val query1 = s"$baseQuery 'org.papachi.StorageHandler'"
+    val query2 = s"$baseQuery 'org.mamachi.StorageHandler' WITH SERDEPROPERTIES ('k1'='v1')"
+    val e1 = intercept[ParseException] { parser.parsePlan(query1) }
+    val e2 = intercept[ParseException] { parser.parsePlan(query2) }
+    assert(e1.getMessage.contains("Operation not allowed"))
+    assert(e2.getMessage.contains("Operation not allowed"))
+  }
+
+  test("create table - location") {
+    val query = "CREATE TABLE my_table (id int, name string) LOCATION '/path/to/mars'"
+    val (desc, _) = extractTableDesc(query)
+    assert(desc.storage.locationUri == Some("/path/to/mars"))
+  }
+
+  test("create table - properties") {
+    val query = "CREATE TABLE my_table (id int, name string) TBLPROPERTIES ('k1'='v1', 'k2'='v2')"
+    val (desc, _) = extractTableDesc(query)
+    assert(desc.properties == Map("k1" -> "v1", "k2" -> "v2"))
+  }
+
+  test("create table - everything!") {
+    val query =
+      """
+        |CREATE EXTERNAL TABLE IF NOT EXISTS dbx.my_table (id int, name string)
+        |COMMENT 'no comment'
+        |PARTITIONED BY (month int)
+        |ROW FORMAT SERDE 'org.apache.poof.serde.Baff' WITH SERDEPROPERTIES ('k1'='v1')
+        |STORED AS INPUTFORMAT 'winput' OUTPUTFORMAT 'wowput'
+        |LOCATION '/path/to/mercury'
+        |TBLPROPERTIES ('k1'='v1', 'k2'='v2')
+      """.stripMargin
+    val (desc, allowExisting) = extractTableDesc(query)
+    assert(allowExisting)
+    assert(desc.identifier.database == Some("dbx"))
+    assert(desc.identifier.table == "my_table")
+    assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+    assert(desc.schema == Seq(
+      CatalogColumn("id", "int"),
+      CatalogColumn("name", "string"),
+      CatalogColumn("month", "int")))
+    assert(desc.partitionColumnNames == Seq("month"))
+    assert(desc.sortColumnNames.isEmpty)
+    assert(desc.bucketColumnNames.isEmpty)
+    assert(desc.numBuckets == -1)
+    assert(desc.viewText.isEmpty)
+    assert(desc.viewOriginalText.isEmpty)
+    assert(desc.storage.locationUri == Some("/path/to/mercury"))
+    assert(desc.storage.inputFormat == Some("winput"))
+    assert(desc.storage.outputFormat == Some("wowput"))
+    assert(desc.storage.serde == Some("org.apache.poof.serde.Baff"))
+    assert(desc.storage.serdeProperties == Map("k1" -> "v1"))
+    assert(desc.properties == Map("k1" -> "v1", "k2" -> "v2"))
+    assert(desc.comment == Some("no comment"))
+  }
+
   test("create view -- basic") {
     val v1 = "CREATE VIEW view1 AS SELECT * FROM tab1"
     val (desc, exists) = extractTableDesc(v1)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index ada8621d07..8648834f0d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -88,7 +88,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
         assert(hiveTable.storage.outputFormat === Some(outputFormat))
         assert(hiveTable.storage.serde === Some(serde))
 
-        assert(hiveTable.partitionColumns.isEmpty)
+        assert(hiveTable.partitionColumnNames.isEmpty)
         assert(hiveTable.tableType === CatalogTableType.MANAGED_TABLE)
 
         val columns = hiveTable.schema
@@ -151,7 +151,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
           assert(hiveTable.storage.outputFormat === Some(outputFormat))
           assert(hiveTable.storage.serde === Some(serde))
 
-          assert(hiveTable.partitionColumns.isEmpty)
+          assert(hiveTable.partitionColumnNames.isEmpty)
           assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE)
 
           val columns = hiveTable.schema
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 40e9c9362c..4db95636e7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -81,7 +81,7 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
   test("Double create fails when allowExisting = false") {
     sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
 
-    intercept[QueryExecutionException] {
+    intercept[AnalysisException] {
       sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
     }
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
index 37c01792d9..97cb9d9720 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -149,7 +149,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
       val (actualScannedColumns, actualPartValues) = plan.collect {
         case p @ HiveTableScan(columns, relation, _) =>
           val columnNames = columns.map(_.name)
-          val partValues = if (relation.table.partitionColumns.nonEmpty) {
+          val partValues = if (relation.table.partitionColumnNames.nonEmpty) {
             p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues)
           } else {
             Seq.empty
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 7eaf19dfe9..5ce16be4dc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -360,7 +360,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       var message = intercept[AnalysisException] {
         sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
       }.getMessage
-      assert(message.contains("ctas1 already exists"))
+      assert(message.contains("already exists"))
       checkRelation("ctas1", true)
       sql("DROP TABLE ctas1")
 
-- 
GitLab