diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index bd05855f0a19d4387dd84b00275987561d36054b..4531fe4a0ebafaaa9165c058d787019832341bed 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -71,11 +71,7 @@ statement
     | createTableHeader ('(' colTypeList ')')? tableProvider
         (OPTIONS tablePropertyList)?
         (PARTITIONED BY partitionColumnNames=identifierList)?
-        bucketSpec?                                                    #createTableUsing
-    | createTableHeader tableProvider
-        (OPTIONS tablePropertyList)?
-        (PARTITIONED BY partitionColumnNames=identifierList)?
-        bucketSpec? AS? query                                          #createTableUsing
+        bucketSpec? (AS? query)?                                       #createTableUsing
     | createTableHeader ('(' columns=colTypeList ')')?
         (COMMENT STRING)?
         (PARTITIONED BY '(' partitionColumns=colTypeList ')')?
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index df509a56792e60c489e264a6c9018b49afc77001..0300bfe1ece390d643aed5012744d327a13391ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -322,7 +322,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a [[CreateTable]] logical plan.
+   * Create a data source table, returning a [[CreateTable]] logical plan.
+   *
+   * Expected format:
+   * {{{
+   *   CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
+   *   USING table_provider
+   *   [OPTIONS table_property_list]
+   *   [PARTITIONED BY (col_name, col_name, ...)]
+   *   [CLUSTERED BY (col_name, col_name, ...)
+   *    [SORTED BY (col_name [ASC|DESC], ...)]
+   *    INTO num_buckets BUCKETS
+   *   ]
+   *   [AS select_statement];
+   * }}}
    */
   override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) {
     val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
@@ -371,6 +384,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx)
       }
 
+      // Don't allow explicit specification of schema for CTAS
+      if (schema.nonEmpty) {
+        operationNotAllowed(
+          "Schema may not be specified in a Create Table As Select (CTAS) statement",
+          ctx)
+      }
       CreateTable(tableDesc, mode, Some(query))
     } else {
       if (temp) {
@@ -1052,7 +1071,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
             "CTAS statement."
           operationNotAllowed(errorMessage, ctx)
         }
-        // Just use whatever is projected in the select statement as our schema
+
+        // Don't allow explicit specification of schema for CTAS.
         if (schema.nonEmpty) {
           operationNotAllowed(
             "Schema may not be specified in a Create Table As Select (CTAS) statement",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 5cc9467395adc1bb4f293a97de767b61dbb01796..61939fe5ef5b564741dea14acc8aaa0239e4f38a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -249,4 +249,13 @@ class CreateTableAsSelectSuite
       }
     }
   }
+
+  test("specifying the column list for CTAS") {
+    withTable("t") {
+      val e = intercept[ParseException] {
+        sql("CREATE TABLE t (a int, b int) USING parquet AS SELECT 1, 2")
+      }.getMessage
+      assert(e.contains("Schema may not be specified in a Create Table As Select (CTAS)"))
+    }
+  }
 }