From 20a89478e168cb6901ef89f4cb6aa79193ed244a Mon Sep 17 00:00:00 2001 From: Wenchen Fan <wenchen@databricks.com> Date: Tue, 17 May 2016 10:12:51 -0700 Subject: [PATCH] [SPARK-14346][SQL][FOLLOW-UP] add tests for CREAT TABLE USING with partition and bucket ## What changes were proposed in this pull request? https://github.com/apache/spark/pull/12781 introduced PARTITIONED BY, CLUSTERED BY, and SORTED BY keywords to CREATE TABLE USING. This PR adds tests to make sure those keywords are handled correctly. This PR also fixes a mistake that we should create non-hive-compatible table if partition or bucket info exists. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #13144 from cloud-fan/add-test. --- .../command/createDataSourceTables.scala | 11 +++- .../execution/command/DDLCommandSuite.scala | 53 +++++++++++++++++++ .../sql/execution/command/DDLSuite.scala | 44 +++++++++++++++ 3 files changed, 106 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 7d3c52570f..70e5108d93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -399,8 +399,8 @@ object CreateDataSourceTableUtils extends Logging { "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive." (None, message) - case (Some(serde), relation: HadoopFsRelation) - if relation.location.paths.length == 1 && relation.partitionSchema.isEmpty => + case (Some(serde), relation: HadoopFsRelation) if relation.location.paths.length == 1 && + relation.partitionSchema.isEmpty && relation.bucketSpec.isEmpty => val hiveTable = newHiveCompatibleMetastoreTable(relation, serde) val message = s"Persisting data source relation $qualifiedTableName with a single input path " + @@ -415,6 +415,13 @@ object CreateDataSourceTableUtils extends Logging { "Input path(s): " + relation.location.paths.mkString("\n", "\n", "") (None, message) + case (Some(serde), relation: HadoopFsRelation) if relation.bucketSpec.nonEmpty => + val message = + s"Persisting bucketed data source relation $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + + "Input path(s): " + relation.location.paths.mkString("\n", "\n", "") + (None, message) + case (Some(serde), relation: HadoopFsRelation) => val message = s"Persisting data source relation $qualifiedTableName with multiple input paths into " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 13df4493e2..897170ea57 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -24,7 +24,9 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.SparkSqlParser +import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsing} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} // TODO: merge this with DDLSuite (SPARK-14441) class DDLCommandSuite extends PlanTest { @@ -238,6 +240,57 @@ class DDLCommandSuite extends PlanTest { } } + test("create table using - with partitioned by") { + val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet PARTITIONED BY (a)" + val expected = CreateTableUsing( + TableIdentifier("my_tab"), + Some(new StructType().add("a", IntegerType).add("b", StringType)), + "parquet", + false, + Map.empty, + null, + None, + false, + true) + + parser.parsePlan(query) match { + case ct: CreateTableUsing => + // We can't compare array in `CreateTableUsing` directly, so here we compare + // `partitionColumns` ahead, and make `partitionColumns` null before plan comparison. + assert(Seq("a") == ct.partitionColumns.toSeq) + comparePlans(ct.copy(partitionColumns = null), expected) + case other => + fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + s"got ${other.getClass.getName}: $query") + } + } + + test("create table using - with bucket") { + val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " + + "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS" + val expected = CreateTableUsing( + TableIdentifier("my_tab"), + Some(new StructType().add("a", IntegerType).add("b", StringType)), + "parquet", + false, + Map.empty, + null, + Some(BucketSpec(5, Seq("a"), Seq("b"))), + false, + true) + + parser.parsePlan(query) match { + case ct: CreateTableUsing => + // `Array.empty == Array.empty` returns false, here we set `partitionColumns` to null before + // plan comparison. + assert(ct.partitionColumns.isEmpty) + comparePlans(ct.copy(partitionColumns = null), expected) + case other => + fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + + s"got ${other.getClass.getName}: $query") + } + } + // ALTER TABLE table_name RENAME TO new_table_name; // ALTER VIEW view_name RENAME TO new_view_name; test("alter table/view: rename table/view") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 82123bec88..d72dc092e2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -29,8 +29,10 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{IntegerType, StructType} class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { private val escapedIdentifier = "`(.+)`".r @@ -350,6 +352,48 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(catalog.getTableMetadata(tableIdent1) === expectedTable) } + test("create table using") { + val catalog = spark.sessionState.catalog + withTable("tbl") { + sql("CREATE TABLE tbl(a INT, b INT) USING parquet") + val table = catalog.getTableMetadata(TableIdentifier("tbl")) + assert(table.tableType == CatalogTableType.MANAGED) + assert(table.schema == Seq(CatalogColumn("a", "int"), CatalogColumn("b", "int"))) + assert(table.properties("spark.sql.sources.provider") == "parquet") + } + } + + test("create table using - with partitioned by") { + val catalog = spark.sessionState.catalog + withTable("tbl") { + sql("CREATE TABLE tbl(a INT, b INT) USING parquet PARTITIONED BY (a)") + val table = catalog.getTableMetadata(TableIdentifier("tbl")) + assert(table.tableType == CatalogTableType.MANAGED) + assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible + assert(table.properties("spark.sql.sources.provider") == "parquet") + assert(DDLUtils.getSchemaFromTableProperties(table) == + Some(new StructType().add("a", IntegerType).add("b", IntegerType))) + assert(DDLUtils.getPartitionColumnsFromTableProperties(table) == + Seq("a")) + } + } + + test("create table using - with bucket") { + val catalog = spark.sessionState.catalog + withTable("tbl") { + sql("CREATE TABLE tbl(a INT, b INT) USING parquet " + + "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS") + val table = catalog.getTableMetadata(TableIdentifier("tbl")) + assert(table.tableType == CatalogTableType.MANAGED) + assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible + assert(table.properties("spark.sql.sources.provider") == "parquet") + assert(DDLUtils.getSchemaFromTableProperties(table) == + Some(new StructType().add("a", IntegerType).add("b", IntegerType))) + assert(DDLUtils.getBucketSpecFromTableProperties(table) == + Some(BucketSpec(5, Seq("a"), Seq("b")))) + } + } + test("alter table: rename") { val catalog = spark.sessionState.catalog val tableIdent1 = TableIdentifier("tab1", Some("dbx")) -- GitLab