Skip to content
Snippets Groups Projects
Commit 20a89478 authored by Wenchen Fan's avatar Wenchen Fan Committed by Yin Huai
Browse files

[SPARK-14346][SQL][FOLLOW-UP] add tests for CREAT TABLE USING with partition and bucket

## What changes were proposed in this pull request?

https://github.com/apache/spark/pull/12781 introduced PARTITIONED BY, CLUSTERED BY, and SORTED BY keywords to CREATE TABLE USING. This PR adds tests to make sure those keywords are handled correctly.

This PR also fixes a mistake that we should create non-hive-compatible table if partition or bucket info exists.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13144 from cloud-fan/add-test.
parent c0c3ec35
No related branches found
No related tags found
No related merge requests found
...@@ -399,8 +399,8 @@ object CreateDataSourceTableUtils extends Logging { ...@@ -399,8 +399,8 @@ object CreateDataSourceTableUtils extends Logging {
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive." "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive."
(None, message) (None, message)
case (Some(serde), relation: HadoopFsRelation) case (Some(serde), relation: HadoopFsRelation) if relation.location.paths.length == 1 &&
if relation.location.paths.length == 1 && relation.partitionSchema.isEmpty => relation.partitionSchema.isEmpty && relation.bucketSpec.isEmpty =>
val hiveTable = newHiveCompatibleMetastoreTable(relation, serde) val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
val message = val message =
s"Persisting data source relation $qualifiedTableName with a single input path " + s"Persisting data source relation $qualifiedTableName with a single input path " +
...@@ -415,6 +415,13 @@ object CreateDataSourceTableUtils extends Logging { ...@@ -415,6 +415,13 @@ object CreateDataSourceTableUtils extends Logging {
"Input path(s): " + relation.location.paths.mkString("\n", "\n", "") "Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
(None, message) (None, message)
case (Some(serde), relation: HadoopFsRelation) if relation.bucketSpec.nonEmpty =>
val message =
s"Persisting bucketed data source relation $qualifiedTableName into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
"Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
(None, message)
case (Some(serde), relation: HadoopFsRelation) => case (Some(serde), relation: HadoopFsRelation) =>
val message = val message =
s"Persisting data source relation $qualifiedTableName with multiple input paths into " + s"Persisting data source relation $qualifiedTableName with multiple input paths into " +
......
...@@ -24,7 +24,9 @@ import org.apache.spark.sql.catalyst.parser.ParseException ...@@ -24,7 +24,9 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsing}
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
// TODO: merge this with DDLSuite (SPARK-14441) // TODO: merge this with DDLSuite (SPARK-14441)
class DDLCommandSuite extends PlanTest { class DDLCommandSuite extends PlanTest {
...@@ -238,6 +240,57 @@ class DDLCommandSuite extends PlanTest { ...@@ -238,6 +240,57 @@ class DDLCommandSuite extends PlanTest {
} }
} }
test("create table using - with partitioned by") {
val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet PARTITIONED BY (a)"
val expected = CreateTableUsing(
TableIdentifier("my_tab"),
Some(new StructType().add("a", IntegerType).add("b", StringType)),
"parquet",
false,
Map.empty,
null,
None,
false,
true)
parser.parsePlan(query) match {
case ct: CreateTableUsing =>
// We can't compare array in `CreateTableUsing` directly, so here we compare
// `partitionColumns` ahead, and make `partitionColumns` null before plan comparison.
assert(Seq("a") == ct.partitionColumns.toSeq)
comparePlans(ct.copy(partitionColumns = null), expected)
case other =>
fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," +
s"got ${other.getClass.getName}: $query")
}
}
test("create table using - with bucket") {
val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
"CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
val expected = CreateTableUsing(
TableIdentifier("my_tab"),
Some(new StructType().add("a", IntegerType).add("b", StringType)),
"parquet",
false,
Map.empty,
null,
Some(BucketSpec(5, Seq("a"), Seq("b"))),
false,
true)
parser.parsePlan(query) match {
case ct: CreateTableUsing =>
// `Array.empty == Array.empty` returns false, here we set `partitionColumns` to null before
// plan comparison.
assert(ct.partitionColumns.isEmpty)
comparePlans(ct.copy(partitionColumns = null), expected)
case other =>
fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," +
s"got ${other.getClass.getName}: $query")
}
}
// ALTER TABLE table_name RENAME TO new_table_name; // ALTER TABLE table_name RENAME TO new_table_name;
// ALTER VIEW view_name RENAME TO new_view_name; // ALTER VIEW view_name RENAME TO new_view_name;
test("alter table/view: rename table/view") { test("alter table/view: rename table/view") {
......
...@@ -29,8 +29,10 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor ...@@ -29,8 +29,10 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructType}
class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private val escapedIdentifier = "`(.+)`".r private val escapedIdentifier = "`(.+)`".r
...@@ -350,6 +352,48 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { ...@@ -350,6 +352,48 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(catalog.getTableMetadata(tableIdent1) === expectedTable) assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
} }
test("create table using") {
val catalog = spark.sessionState.catalog
withTable("tbl") {
sql("CREATE TABLE tbl(a INT, b INT) USING parquet")
val table = catalog.getTableMetadata(TableIdentifier("tbl"))
assert(table.tableType == CatalogTableType.MANAGED)
assert(table.schema == Seq(CatalogColumn("a", "int"), CatalogColumn("b", "int")))
assert(table.properties("spark.sql.sources.provider") == "parquet")
}
}
test("create table using - with partitioned by") {
val catalog = spark.sessionState.catalog
withTable("tbl") {
sql("CREATE TABLE tbl(a INT, b INT) USING parquet PARTITIONED BY (a)")
val table = catalog.getTableMetadata(TableIdentifier("tbl"))
assert(table.tableType == CatalogTableType.MANAGED)
assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
assert(table.properties("spark.sql.sources.provider") == "parquet")
assert(DDLUtils.getSchemaFromTableProperties(table) ==
Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
assert(DDLUtils.getPartitionColumnsFromTableProperties(table) ==
Seq("a"))
}
}
test("create table using - with bucket") {
val catalog = spark.sessionState.catalog
withTable("tbl") {
sql("CREATE TABLE tbl(a INT, b INT) USING parquet " +
"CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS")
val table = catalog.getTableMetadata(TableIdentifier("tbl"))
assert(table.tableType == CatalogTableType.MANAGED)
assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
assert(table.properties("spark.sql.sources.provider") == "parquet")
assert(DDLUtils.getSchemaFromTableProperties(table) ==
Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
Some(BucketSpec(5, Seq("a"), Seq("b"))))
}
}
test("alter table: rename") { test("alter table: rename") {
val catalog = spark.sessionState.catalog val catalog = spark.sessionState.catalog
val tableIdent1 = TableIdentifier("tab1", Some("dbx")) val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment