diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 8ffc55668ae901e2a959c079377bff9d70871fb6..c68c8f80f87ad7544b0d92b9add274c869bbd88f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -25,6 +25,7 @@ import org.antlr.v4.runtime.tree.TerminalNode
 
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -939,6 +940,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
     selectQuery match {
       case Some(q) =>
+        // Just use whatever is projected in the select statement as our schema
+        if (schema.nonEmpty) {
+          throw operationNotAllowed(
+            "Schema may not be specified in a Create Table As Select (CTAS) statement",
+            ctx)
+        }
         // Hive does not allow to use a CTAS statement to create a partitioned table.
         if (tableDesc.partitionColumnNames.nonEmpty) {
           val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
index 4315197e125f4eb1706ac6b31d11442585f5f3b0..0827b04252bc4e9fe0d58eb11c2eda25e2d80803 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
@@ -744,24 +744,14 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
 
   test("SPARK-14933 - select parquet table") {
     withTable("parquet_t") {
-      sql(
-        """
-          |create table parquet_t (c1 int, c2 string)
-          |stored as parquet select 1, 'abc'
-        """.stripMargin)
-
+      sql("create table parquet_t stored as parquet as select 1 as c1, 'abc' as c2")
       checkHiveQl("select * from parquet_t")
     }
   }
 
   test("SPARK-14933 - select orc table") {
     withTable("orc_t") {
-      sql(
-        """
-          |create table orc_t (c1 int, c2 string)
-          |stored as orc select 1, 'abc'
-        """.stripMargin)
-
+      sql("create table orc_t stored as orc as select 1 as c1, 'abc' as c2")
       checkHiveQl("select * from orc_t")
     }
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index ba9fe54db86ee40d0ab3ff1d1f538cbb7df8b571..867aadb5f556938b631c5701fe61fba3521aa01c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -51,12 +51,6 @@ class HiveDDLCommandSuite extends PlanTest {
   test("Test CTAS #1") {
     val s1 =
       """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view
-        |(viewTime INT,
-        |userid BIGINT,
-        |page_url STRING,
-        |referrer_url STRING,
-        |ip STRING COMMENT 'IP Address of the User',
-        |country STRING COMMENT 'country of origination')
         |COMMENT 'This is the staging page view table'
         |STORED AS RCFILE
         |LOCATION '/user/external/page_view'
@@ -69,13 +63,7 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(desc.identifier.table == "page_view")
     assert(desc.tableType == CatalogTableType.EXTERNAL)
     assert(desc.storage.locationUri == Some("/user/external/page_view"))
-    assert(desc.schema ==
-      CatalogColumn("viewtime", "int") ::
-      CatalogColumn("userid", "bigint") ::
-      CatalogColumn("page_url", "string") ::
-      CatalogColumn("referrer_url", "string") ::
-      CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
-      CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
+    assert(desc.schema.isEmpty) // will be populated later when the table is actually created
     assert(desc.comment == Some("This is the staging page view table"))
     // TODO will be SQLText
     assert(desc.viewText.isEmpty)
@@ -91,12 +79,6 @@ class HiveDDLCommandSuite extends PlanTest {
   test("Test CTAS #2") {
     val s2 =
       """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view
-        |(viewTime INT,
-        |userid BIGINT,
-        |page_url STRING,
-        |referrer_url STRING,
-        |ip STRING COMMENT 'IP Address of the User',
-        |country STRING COMMENT 'country of origination')
         |COMMENT 'This is the staging page view table'
         |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
         | STORED AS
@@ -112,13 +94,7 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(desc.identifier.table == "page_view")
     assert(desc.tableType == CatalogTableType.EXTERNAL)
     assert(desc.storage.locationUri == Some("/user/external/page_view"))
-    assert(desc.schema ==
-      CatalogColumn("viewtime", "int") ::
-      CatalogColumn("userid", "bigint") ::
-      CatalogColumn("page_url", "string") ::
-      CatalogColumn("referrer_url", "string") ::
-      CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
-      CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
+    assert(desc.schema.isEmpty) // will be populated later when the table is actually created
     // TODO will be SQLText
     assert(desc.comment == Some("This is the staging page view table"))
     assert(desc.viewText.isEmpty)
@@ -190,6 +166,11 @@ class HiveDDLCommandSuite extends PlanTest {
       " AS SELECT key, value FROM (SELECT 1 as key, 2 as value) tmp")
   }
 
+  test("CTAS statement with schema") {
+    assertUnsupported(s"CREATE TABLE ctas1 (age INT, name STRING) AS SELECT * FROM src")
+    assertUnsupported(s"CREATE TABLE ctas1 (age INT, name STRING) AS SELECT 1, 'hello'")
+  }
+
   test("unsupported operations") {
     intercept[ParseException] {
       parser.parsePlan(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index ffeed63695dfb29b65ec978d62598c850034e952..0f56b2c0d1f4356b5dc52fbec219a38fb3acbbb6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -435,10 +435,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
     }
 
     // Non-External parquet pointing to /tmp/...
-
-    sql("CREATE TABLE parquet_tmp(c1 int, c2 int) " +
-      " STORED AS parquet " +
-      " AS SELECT 1, 2")
+    sql("CREATE TABLE parquet_tmp STORED AS parquet AS SELECT 1, 2")
 
     val answer4 =
       sql("SELECT input_file_name() as file FROM parquet_tmp").head().getString(0)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 499819f32b43409de5845270a5ca39366e5af100..8244ff4ce05525f65b5caab909faa6492ee2833e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -608,17 +608,15 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
   test("specifying the column list for CTAS") {
     Seq((1, "111111"), (2, "222222")).toDF("key", "value").createOrReplaceTempView("mytable1")
 
-    sql("create table gen__tmp(a int, b string) as select key, value from mytable1")
+    sql("create table gen__tmp as select key as a, value as b from mytable1")
     checkAnswer(
       sql("SELECT a, b from gen__tmp"),
       sql("select key, value from mytable1").collect())
     sql("DROP TABLE gen__tmp")
 
-    sql("create table gen__tmp(a double, b double) as select key, value from mytable1")
-    checkAnswer(
-      sql("SELECT a, b from gen__tmp"),
-      sql("select cast(key as double), cast(value as double) from mytable1").collect())
-    sql("DROP TABLE gen__tmp")
+    intercept[AnalysisException] {
+      sql("create table gen__tmp(a int, b string) as select key, value from mytable1")
+    }
 
     sql("drop table mytable1")
   }
@@ -1225,8 +1223,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
   test("SPARK-10741: Sort on Aggregate using parquet") {
     withTable("test10741") {
       withTempTable("src") {
-        Seq("a" -> 5, "a" -> 9, "b" -> 6).toDF().createOrReplaceTempView("src")
-        sql("CREATE TABLE test10741(c1 STRING, c2 INT) STORED AS PARQUET AS SELECT * FROM src")
+        Seq("a" -> 5, "a" -> 9, "b" -> 6).toDF("c1", "c2").createOrReplaceTempView("src")
+        sql("CREATE TABLE test10741 STORED AS PARQUET AS SELECT * FROM src")
       }
 
       checkAnswer(sql(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index 72db3618e08709f0d3d4278dfbbe6ede8f098636..39846f145c42c829f784bf775b1e47065fb3c6e6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -308,10 +308,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
   test("SPARK-14933 - create view from hive parquet tabale") {
     withTable("t_part") {
       withView("v_part") {
-        spark.sql(
-          """create table t_part (c1 int, c2 int)
-            |stored as parquet as select 1 as a, 2 as b
-          """.stripMargin)
+        spark.sql("create table t_part stored as parquet as select 1 as a, 2 as b")
         spark.sql("create view v_part as select * from t_part")
         checkAnswer(
           sql("select * from t_part"),
@@ -323,10 +320,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
   test("SPARK-14933 - create view from hive orc tabale") {
     withTable("t_orc") {
       withView("v_orc") {
-        spark.sql(
-          """create table t_orc (c1 int, c2 int)
-            |stored as orc as select 1 as a, 2 as b
-          """.stripMargin)
+        spark.sql("create table t_orc stored as orc as select 1 as a, 2 as b")
         spark.sql("create view v_orc as select * from t_orc")
         checkAnswer(
           sql("select * from t_orc"),