diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index fa4ccf42b55c5d0a3aeed5acd82381560ed39844..1316d90fa471ef98972890b26836c2cfbd481e73 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -344,6 +344,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         table, provider, partitionColumnNames, bucketSpec, mode, options, query)
     } else {
       val struct = Option(ctx.colTypeList()).map(createStructType)
+      if (struct.isEmpty && bucketSpec.nonEmpty) {
+        throw new ParseException(
+          "Expected explicit specification of table schema when using CLUSTERED BY clause.", ctx)
+      }
+
       CreateTableUsing(
         table,
         struct,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
index 6008d73717f779c9191f098a70a447918bca471f..961d035b768702ea3d97f457b9064581ea280ae4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import org.apache.spark.sql.AnalysisException
+
 /**
  * A container for bucketing information.
  * Bucketing is a technology for decomposing data sets into more manageable parts, and the number
@@ -29,7 +31,11 @@ package org.apache.spark.sql.execution.datasources
 private[sql] case class BucketSpec(
     numBuckets: Int,
     bucketColumnNames: Seq[String],
-    sortColumnNames: Seq[String])
+    sortColumnNames: Seq[String]) {
+  if (numBuckets <= 0) {
+    throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.")
+  }
+}
 
 private[sql] object BucketingUtils {
   // The file name of bucketed data should have 3 parts:
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 169250d9bb1c27e3db2e54f5d2c3b4f978846adb..28f625b1cdce2e405bf41e73ffe10fae72c8177a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.internal.SQLConf
@@ -1264,6 +1265,29 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
+  test("create table using CLUSTERED BY without schema specification") {
+    import testImplicits._
+    withTempPath { tempDir =>
+      withTable("jsonTable") {
+        (("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath)
+
+        val e = intercept[ParseException] {
+        sql(
+          s"""
+             |CREATE TABLE jsonTable
+             |USING org.apache.spark.sql.json
+             |OPTIONS (
+             |  path '${tempDir.getCanonicalPath}'
+             |)
+             |CLUSTERED BY (inexistentColumnA) SORTED BY (inexistentColumnB) INTO 2 BUCKETS
+           """.stripMargin)
+        }.getMessage
+        assert(e.contains(
+          "Expected explicit specification of table schema when using CLUSTERED BY clause"))
+      }
+    }
+  }
+
   test("create table with datasource properties (not allowed)") {
     assertUnsupported("CREATE TABLE my_tab TBLPROPERTIES ('spark.sql.sources.me'='anything')")
     assertUnsupported("CREATE TABLE my_tab ROW FORMAT SERDE 'serde' " +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 251a25665a421ae8638aeafd582e4b8f94c38682..5ab585faa4acbdc059e88cf331fd00c8b1348cef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -204,7 +205,7 @@ class CreateTableAsSelectSuite
     }
   }
 
-  test("create table using as select - with bucket") {
+  test("create table using as select - with non-zero buckets") {
     val catalog = spark.sessionState.catalog
     withTable("t") {
       sql(
@@ -217,7 +218,23 @@ class CreateTableAsSelectSuite
       )
       val table = catalog.getTableMetadata(TableIdentifier("t"))
       assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
-        Some(BucketSpec(5, Seq("a"), Seq("b"))))
+        Option(BucketSpec(5, Seq("a"), Seq("b"))))
+    }
+  }
+
+  test("create table using as select - with zero buckets") {
+    withTable("t") {
+      val e = intercept[AnalysisException] {
+        sql(
+          s"""
+             |CREATE TABLE t USING PARQUET
+             |OPTIONS (PATH '${path.toString}')
+             |CLUSTERED BY (a) SORTED BY (b) INTO 0 BUCKETS
+             |AS SELECT 1 AS a, 2 AS b
+           """.stripMargin
+        )
+      }.getMessage
+      assert(e.contains("Expected positive number of buckets, but got `0`"))
     }
   }
 }