diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index bf21c9d5248a8316a9f6beee61ac1b7547ffc1d0..c8d0f4e3c5bbd530168191341ea0fe0b27b355cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -143,10 +143,7 @@ class SparkSqlAstBuilder extends AstBuilder {
   override def visitExplain(ctx: ExplainContext): LogicalPlan = withOrigin(ctx) {
     val options = ctx.explainOption.asScala
     if (options.exists(_.FORMATTED != null)) {
-      logWarning("EXPLAIN FORMATTED option is ignored.")
-    }
-    if (options.exists(_.LOGICAL != null)) {
-      logWarning("EXPLAIN LOGICAL option is ignored.")
+      logWarning("Unsupported operation: EXPLAIN FORMATTED option")
     }
 
     // Create the explain comment.
@@ -206,7 +203,7 @@ class SparkSqlAstBuilder extends AstBuilder {
   override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) {
     val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
     if (external) {
-      logWarning("EXTERNAL option is not supported.")
+      throw new ParseException("Unsupported operation: EXTERNAL option", ctx)
     }
     val options = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)
     val provider = ctx.tableProvider.qualifiedName.getText
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 8e63b6987650153bf9a309473f044062ad9a1fb4..b1c1fd095107554d638b941cb14fb78d4cecd5ca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -665,7 +665,7 @@ class DDLCommandSuite extends PlanTest {
     comparePlans(parsed2, expected2)
   }
 
-  test("commands only available in HiveContext") {
+  test("unsupported operations") {
     intercept[ParseException] {
       parser.parsePlan("DROP TABLE D1.T1")
     }
@@ -682,6 +682,14 @@ class DDLCommandSuite extends PlanTest {
           |TBLPROPERTIES('prop1Key '= "prop1Val", ' `prop2Key` '= "prop2Val")
         """.stripMargin)
     }
+    intercept[ParseException] {
+      parser.parsePlan(
+        """
+          |CREATE EXTERNAL TABLE oneToTenDef
+          |USING org.apache.spark.sql.sources
+          |OPTIONS (from '1', to '10')
+        """.stripMargin)
+    }
     intercept[ParseException] {
       parser.parsePlan("SELECT TRANSFORM (key, value) USING 'cat' AS (tKey, tValue) FROM testData")
     }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
index ab69d3502e938fb757c52d7c579191cde6d8b8c3..657edb493aa264ef90bfd0b0e7263d409d8addf3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -162,14 +162,16 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
 
       // Unsupported clauses.
       if (temp) {
-        logWarning("TEMPORARY clause is ignored.")
+        throw new ParseException(s"Unsupported operation: TEMPORARY clause.", ctx)
       }
       if (ctx.bucketSpec != null) {
         // TODO add this - we need cluster columns in the CatalogTable for this to work.
-        logWarning("CLUSTERED BY ... [ORDERED BY ...] INTO ... BUCKETS clause is ignored.")
+        throw new ParseException("Unsupported operation: " +
+          "CLUSTERED BY ... [ORDERED BY ...] INTO ... BUCKETS clause.", ctx)
       }
       if (ctx.skewSpec != null) {
-        logWarning("SKEWED BY ... ON ... [STORED AS DIRECTORIES] clause is ignored.")
+        throw new ParseException("Operation not allowed: " +
+          "SKEWED BY ... ON ... [STORED AS DIRECTORIES] clause.", ctx)
       }
 
       // Create the schema.
@@ -230,7 +232,7 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
       throw new ParseException(s"Operation not allowed: partitioned views", ctx)
     } else {
       if (ctx.STRING != null) {
-        logWarning("COMMENT clause is ignored.")
+        throw new ParseException("Unsupported operation: COMMENT clause", ctx)
       }
       val identifiers = Option(ctx.identifierCommentList).toSeq.flatMap(_.identifierComment.asScala)
       val schema = identifiers.map { ic =>
@@ -296,7 +298,8 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
       recordReader: Token,
       schemaLess: Boolean): HiveScriptIOSchema = {
     if (recordWriter != null || recordReader != null) {
-      logWarning("Used defined record reader/writer classes are currently ignored.")
+      throw new ParseException(
+        "Unsupported operation: Used defined record reader/writer classes.", ctx)
     }
 
     // Decode and input/output format.
@@ -370,7 +373,8 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
       ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
     import ctx._
     if (inDriver != null || outDriver != null) {
-      logWarning("INPUTDRIVER ... OUTPUTDRIVER ... clauses are ignored.")
+      throw new ParseException(
+        s"Operation not allowed: INPUTDRIVER ... OUTPUTDRIVER ... clauses", ctx)
     }
     EmptyStorageFormat.copy(
       inputFormat = Option(string(inFmt)),
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index c5f01da4fabbb30a3c820d23abed4a2cbd198f14..12a582c10a68cb94d346905999d2a021f758caba 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -180,6 +180,65 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22")))
   }
 
+  test("unsupported operations") {
+    intercept[ParseException] {
+      parser.parsePlan(
+        """
+          |CREATE TEMPORARY TABLE ctas2
+          |ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"
+          |WITH SERDEPROPERTIES("serde_p1"="p1","serde_p2"="p2")
+          |STORED AS RCFile
+          |TBLPROPERTIES("tbl_p1"="p11", "tbl_p2"="p22")
+          |AS SELECT key, value FROM src ORDER BY key, value
+        """.stripMargin)
+    }
+    intercept[ParseException] {
+      parser.parsePlan(
+        """CREATE TABLE ctas2
+          |STORED AS
+          |INPUTFORMAT "org.apache.hadoop.mapred.TextInputFormat"
+          |OUTPUTFORMAT "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
+          |INPUTDRIVER "org.apache.hadoop.hive.howl.rcfile.RCFileInputDriver"
+          |OUTPUTDRIVER "org.apache.hadoop.hive.howl.rcfile.RCFileOutputDriver"
+          |AS SELECT key, value FROM src ORDER BY key, value
+        """.stripMargin)
+    }
+    intercept[ParseException] {
+      parser.parsePlan(
+        """
+          |CREATE TABLE user_info_bucketed(user_id BIGINT, firstname STRING, lastname STRING)
+          |CLUSTERED BY(user_id) INTO 256 BUCKETS
+          |AS SELECT key, value FROM src ORDER BY key, value
+        """.stripMargin)
+    }
+    intercept[ParseException] {
+      parser.parsePlan(
+        """
+          |CREATE TABLE user_info_bucketed(user_id BIGINT, firstname STRING, lastname STRING)
+          |SKEWED BY (key) ON (1,5,6)
+          |AS SELECT key, value FROM src ORDER BY key, value
+        """.stripMargin)
+    }
+    intercept[ParseException] {
+      parser.parsePlan(
+        """
+          |SELECT TRANSFORM (key, value) USING 'cat' AS (tKey, tValue)
+          |ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe'
+          |RECORDREADER 'org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesRecordReader'
+          |FROM testData
+        """.stripMargin)
+    }
+    intercept[ParseException] {
+      parser.parsePlan(
+        """
+          |CREATE OR REPLACE VIEW IF NOT EXISTS view1 (col1, col3)
+          |COMMENT 'blabla'
+          |TBLPROPERTIES('prop1Key'="prop1Val")
+          |AS SELECT * FROM tab1
+        """.stripMargin)
+    }
+  }
+
   test("Invalid interval term should throw AnalysisException") {
     def assertError(sql: String, errorMessage: String): Unit = {
       val e = intercept[AnalysisException] {
@@ -277,7 +336,6 @@ class HiveDDLCommandSuite extends PlanTest {
       """
         |CREATE OR REPLACE VIEW IF NOT EXISTS view1
         |(col1, col3)
-        |COMMENT 'I cannot spell'
         |TBLPROPERTIES('prop1Key'="prop1Val")
         |AS SELECT * FROM tab1
       """.stripMargin
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index 4c3f45052249628a989260751fce57f02e7bd4fa..8de2bdcfc020259f877ea322f2bf6e9a9c9a2ed2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -26,7 +26,7 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
     super.beforeAll()
     sql(
       """
-        |CREATE EXTERNAL TABLE parquet_tab1 (c1 INT, c2 STRING)
+        |CREATE TABLE parquet_tab1 (c1 INT, c2 STRING)
         |USING org.apache.spark.sql.parquet.DefaultSource
       """.stripMargin)
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index d7ec85c15d23993e2406e1a1bbbdfd6047864e79..f3796a9966239940fc4da17ad3de68cfbedb872b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1491,7 +1491,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         sql(
           """CREATE VIEW IF NOT EXISTS
             |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
-            |COMMENT 'blabla'
             |TBLPROPERTIES ('a' = 'b')
             |AS SELECT * FROM jt""".stripMargin)
         checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))