Skip to content
Snippets Groups Projects
Commit 94f14b52 authored by gatorsmile's avatar gatorsmile Committed by Wenchen Fan
Browse files

[SPARK-16556][SPARK-16559][SQL] Fix Two Bugs in Bucket Specification

### What changes were proposed in this pull request?

**Issue 1: Silent Ignorance of Bucket Specification When Creating Table Using Schema Inference**

When creating a data source table without explicit specification of schema or SELECT clause, we silently ignore the bucket specification (CLUSTERED BY... SORTED BY...) in [the code](https://github.com/apache/spark/blob/ce3b98bae28af72299722f56e4e4ef831f471ec0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala#L339-L354).

For example,
```SQL
CREATE TABLE jsonTable
USING org.apache.spark.sql.json
OPTIONS (
  path '${tempDir.getCanonicalPath}'
)
CLUSTERED BY (inexistentColumnA) SORTED BY (inexistentColumnB) INTO 2 BUCKETS
```

This PR captures it and issues an error message.

**Issue 2: Got a run-time `java.lang.ArithmeticException` when num of buckets is set to zero.**

For example,
```SQL
CREATE TABLE t USING PARQUET
OPTIONS (PATH '${path.toString}')
CLUSTERED BY (a) SORTED BY (b) INTO 0 BUCKETS
AS SELECT 1 AS a, 2 AS b
```
The exception we got is
```
ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 1.0 (TID 2)
java.lang.ArithmeticException: / by zero
```

This PR captures the misuse and issues an appropriate error message.

### How was this patch tested?
Added a test case in DDLSuite

Author: gatorsmile <gatorsmile@gmail.com>

Closes #14210 from gatorsmile/createTableWithoutSchema.
parent df2c6d59
No related branches found
No related tags found
No related merge requests found
......@@ -344,6 +344,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
table, provider, partitionColumnNames, bucketSpec, mode, options, query)
} else {
val struct = Option(ctx.colTypeList()).map(createStructType)
if (struct.isEmpty && bucketSpec.nonEmpty) {
throw new ParseException(
"Expected explicit specification of table schema when using CLUSTERED BY clause.", ctx)
}
CreateTableUsing(
table,
struct,
......
......@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.AnalysisException
/**
* A container for bucketing information.
* Bucketing is a technology for decomposing data sets into more manageable parts, and the number
......@@ -29,7 +31,11 @@ package org.apache.spark.sql.execution.datasources
private[sql] case class BucketSpec(
numBuckets: Int,
bucketColumnNames: Seq[String],
sortColumnNames: Seq[String])
sortColumnNames: Seq[String]) {
if (numBuckets <= 0) {
throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.")
}
}
private[sql] object BucketingUtils {
// The file name of bucketed data should have 3 parts:
......
......@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.internal.SQLConf
......@@ -1264,6 +1265,29 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
test("create table using CLUSTERED BY without schema specification") {
import testImplicits._
withTempPath { tempDir =>
withTable("jsonTable") {
(("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath)
val e = intercept[ParseException] {
sql(
s"""
|CREATE TABLE jsonTable
|USING org.apache.spark.sql.json
|OPTIONS (
| path '${tempDir.getCanonicalPath}'
|)
|CLUSTERED BY (inexistentColumnA) SORTED BY (inexistentColumnB) INTO 2 BUCKETS
""".stripMargin)
}.getMessage
assert(e.contains(
"Expected explicit specification of table schema when using CLUSTERED BY clause"))
}
}
}
test("create table with datasource properties (not allowed)") {
assertUnsupported("CREATE TABLE my_tab TBLPROPERTIES ('spark.sql.sources.me'='anything')")
assertUnsupported("CREATE TABLE my_tab ROW FORMAT SERDE 'serde' " +
......
......@@ -22,6 +22,7 @@ import java.io.File
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.DDLUtils
......@@ -204,7 +205,7 @@ class CreateTableAsSelectSuite
}
}
test("create table using as select - with bucket") {
test("create table using as select - with non-zero buckets") {
val catalog = spark.sessionState.catalog
withTable("t") {
sql(
......@@ -217,7 +218,23 @@ class CreateTableAsSelectSuite
)
val table = catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
Some(BucketSpec(5, Seq("a"), Seq("b"))))
Option(BucketSpec(5, Seq("a"), Seq("b"))))
}
}
test("create table using as select - with zero buckets") {
withTable("t") {
val e = intercept[AnalysisException] {
sql(
s"""
|CREATE TABLE t USING PARQUET
|OPTIONS (PATH '${path.toString}')
|CLUSTERED BY (a) SORTED BY (b) INTO 0 BUCKETS
|AS SELECT 1 AS a, 2 AS b
""".stripMargin
)
}.getMessage
assert(e.contains("Expected positive number of buckets, but got `0`"))
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment