Skip to content
Snippets Groups Projects
Commit 24bea000 authored by Cheng Lian's avatar Cheng Lian Committed by Yin Huai
Browse files

[SPARK-14954] [SQL] Add PARTITION BY and BUCKET BY clause for data source CTAS syntax

Currently, we can only create persisted partitioned and/or bucketed data source tables using the Dataset API but not using SQL DDL. This PR implements the following syntax to add partitioning and bucketing support to the SQL DDL:

```
CREATE TABLE <table-name>
USING <provider> [OPTIONS (<key1> <value1>, <key2> <value2>, ...)]
[PARTITIONED BY (col1, col2, ...)]
[CLUSTERED BY (col1, col2, ...) [SORTED BY (col1, col2, ...)] INTO <n> BUCKETS]
AS SELECT ...
```

Test cases are added in `MetastoreDataSourcesSuite` to check the newly added syntax.

Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>

Closes #12734 from liancheng/spark-14954.
parent f405de87
No related branches found
No related tags found
No related merge requests found
......@@ -47,7 +47,9 @@ statement
| createTableHeader ('(' colTypeList ')')? tableProvider
(OPTIONS tablePropertyList)? #createTableUsing
| createTableHeader tableProvider
(OPTIONS tablePropertyList)? AS? query #createTableUsing
(OPTIONS tablePropertyList)?
(PARTITIONED BY partitionColumnNames=identifierList)?
bucketSpec? AS? query #createTableUsing
| createTableHeader ('(' columns=colTypeList ')')?
(COMMENT STRING)?
(PARTITIONED BY '(' partitionColumns=colTypeList ')')?
......
......@@ -289,6 +289,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
val options = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)
val provider = ctx.tableProvider.qualifiedName.getText
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
if (ctx.query != null) {
// Get the backing query.
......@@ -302,9 +303,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
} else {
SaveMode.ErrorIfExists
}
CreateTableUsingAsSelect(table, provider, temp, Array.empty, None, mode, options, query)
val partitionColumnNames =
Option(ctx.partitionColumnNames)
.map(visitIdentifierList(_).toArray)
.getOrElse(Array.empty[String])
CreateTableUsingAsSelect(
table, provider, temp, partitionColumnNames, bucketSpec, mode, options, query)
} else {
val struct = Option(ctx.colTypeList).map(createStructType)
val struct = Option(ctx.colTypeList()).map(createStructType)
CreateTableUsing(table, struct, provider, temp, options, ifNotExists, managedIfNoPath = false)
}
}
......
......@@ -940,4 +940,97 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
.schema.forall { c => DataTypeParser.parse(c.dataType) == ArrayType(StringType) })
}
}
test("CTAS: persisted partitioned data source table") {
withTempDir { dir =>
withTable("t") {
val path = dir.getCanonicalPath
sql(
s"""CREATE TABLE t USING PARQUET
|OPTIONS (PATH '$path')
|PARTITIONED BY (a)
|AS SELECT 1 AS a, 2 AS b
""".stripMargin
)
val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt === 1)
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBuckets"))
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBucketCols"))
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols"))
checkAnswer(table("t"), Row(2, 1))
}
}
}
test("CTAS: persisted bucketed data source table") {
withTempDir { dir =>
withTable("t") {
val path = dir.getCanonicalPath
sql(
s"""CREATE TABLE t USING PARQUET
|OPTIONS (PATH '$path')
|CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
|AS SELECT 1 AS a, 2 AS b
""".stripMargin
)
val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols"))
assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2)
assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1)
assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt === 1)
checkAnswer(table("t"), Row(1, 2))
}
withTable("t") {
val path = dir.getCanonicalPath
sql(
s"""CREATE TABLE t USING PARQUET
|OPTIONS (PATH '$path')
|CLUSTERED BY (a) INTO 2 BUCKETS
|AS SELECT 1 AS a, 2 AS b
""".stripMargin
)
val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols"))
assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2)
assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1)
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols"))
checkAnswer(table("t"), Row(1, 2))
}
}
}
test("CTAS: persisted partitioned bucketed data source table") {
withTempDir { dir =>
withTable("t") {
val path = dir.getCanonicalPath
sql(
s"""CREATE TABLE t USING PARQUET
|OPTIONS (PATH '$path')
|PARTITIONED BY (a)
|CLUSTERED BY (b) SORTED BY (c) INTO 2 BUCKETS
|AS SELECT 1 AS a, 2 AS b, 3 AS c
""".stripMargin
)
val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt === 1)
assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2)
assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1)
assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt === 1)
checkAnswer(table("t"), Row(2, 3, 1))
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment