diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 2f2e060b38e1857b77b5e8a8da6a4fd2a80c2fe2..0e2cd39448bba64ae40eacdb1a7cdcab1383b42c 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -148,7 +148,7 @@ hiveNativeCommands | ROLLBACK WORK? | SHOW PARTITIONS tableIdentifier partitionSpec? | DFS .*? - | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | MSCK | LOAD) .*? + | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | LOAD) .*? ; unsupportedHiveNativeCommands @@ -177,6 +177,7 @@ unsupportedHiveNativeCommands | kw1=UNLOCK kw2=DATABASE | kw1=CREATE kw2=TEMPORARY kw3=MACRO | kw1=DROP kw2=TEMPORARY kw3=MACRO + | kw1=MSCK kw2=REPAIR kw3=TABLE ; createTableHeader @@ -651,7 +652,7 @@ nonReserved | AFTER | CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT | INPUTDRIVER | OUTPUTDRIVER | DBPROPERTIES | DFS | TRUNCATE | METADATA | REPLICATION | COMPUTE | STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER - | REVOKE | GRANT | LOCK | UNLOCK | MSCK | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE + | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION ; @@ -867,6 +868,7 @@ GRANT: 'GRANT'; LOCK: 'LOCK'; UNLOCK: 'UNLOCK'; MSCK: 'MSCK'; +REPAIR: 'REPAIR'; EXPORT: 'EXPORT'; IMPORT: 'IMPORT'; LOAD: 'LOAD'; diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala index 0009438b315804e71687480718fa8caab68cc147..0d9b0851fa7be8f47f67f19530fe5301c1aa0ad6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala @@ -281,31 +281,37 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { test("drop partitions") { val catalog = newBasicCatalog() assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2))) - catalog.dropPartitions("db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false) + catalog.dropPartitions( + "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false) assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2))) resetState() val catalog2 = newBasicCatalog() assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2))) - catalog2.dropPartitions("db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false) + catalog2.dropPartitions( + "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false) assert(catalog2.listPartitions("db2", "tbl2").isEmpty) } test("drop partitions when database/table does not exist") { val catalog = newBasicCatalog() intercept[AnalysisException] { - catalog.dropPartitions("does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false) + catalog.dropPartitions( + "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false) } intercept[AnalysisException] { - catalog.dropPartitions("db2", "does_not_exist", Seq(), ignoreIfNotExists = false) + catalog.dropPartitions( + "db2", "does_not_exist", Seq(), ignoreIfNotExists = false) } } test("drop partitions that do not exist") { val catalog = newBasicCatalog() intercept[AnalysisException] { - catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false) + catalog.dropPartitions( + "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false) } - catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true) + catalog.dropPartitions( + "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true) } test("get partition") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 862fc275adf086d85597d71d9d61b6f8812f679d..426273e1e3e6c136cd2755dca39bc82a59432b36 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -496,19 +496,25 @@ class SessionCatalogSuite extends SparkFunSuite { val sessionCatalog = new SessionCatalog(externalCatalog) assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2))) sessionCatalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), Seq(part1.spec), ignoreIfNotExists = false) + TableIdentifier("tbl2", Some("db2")), + Seq(part1.spec), + ignoreIfNotExists = false) assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part2))) // Drop partitions without explicitly specifying database sessionCatalog.setCurrentDatabase("db2") sessionCatalog.dropPartitions( - TableIdentifier("tbl2"), Seq(part2.spec), ignoreIfNotExists = false) + TableIdentifier("tbl2"), + Seq(part2.spec), + ignoreIfNotExists = false) assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) // Drop multiple partitions at once sessionCatalog.createPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part1, part2), ignoreIfExists = false) assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2))) sessionCatalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), ignoreIfNotExists = false) + TableIdentifier("tbl2", Some("db2")), + Seq(part1.spec, part2.spec), + ignoreIfNotExists = false) assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) } @@ -516,11 +522,15 @@ class SessionCatalogSuite extends SparkFunSuite { val catalog = new SessionCatalog(newBasicCatalog()) intercept[AnalysisException] { catalog.dropPartitions( - TableIdentifier("tbl1", Some("does_not_exist")), Seq(), ignoreIfNotExists = false) + TableIdentifier("tbl1", Some("does_not_exist")), + Seq(), + ignoreIfNotExists = false) } intercept[AnalysisException] { catalog.dropPartitions( - TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfNotExists = false) + TableIdentifier("does_not_exist", Some("db2")), + Seq(), + ignoreIfNotExists = false) } } @@ -528,10 +538,14 @@ class SessionCatalogSuite extends SparkFunSuite { val catalog = new SessionCatalog(newBasicCatalog()) intercept[AnalysisException] { catalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = false) + TableIdentifier("tbl2", Some("db2")), + Seq(part3.spec), + ignoreIfNotExists = false) } catalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = true) + TableIdentifier("tbl2", Some("db2")), + Seq(part3.spec), + ignoreIfNotExists = true) } test("get partition") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 3da715cdb33091d7615f15d02b7f3448f2ff11d0..73d9640c35f491b5e1405eec0fd3864095b4f2d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -493,7 +493,9 @@ class SparkSqlAstBuilder extends AstBuilder { */ override def visitAddTablePartition( ctx: AddTablePartitionContext): LogicalPlan = withOrigin(ctx) { - if (ctx.VIEW != null) throw new ParseException(s"Operation not allowed: partitioned views", ctx) + if (ctx.VIEW != null) { + throw new AnalysisException(s"Operation not allowed: partitioned views") + } // Create partition spec to location mapping. val specsAndLocs = if (ctx.partitionSpec.isEmpty) { ctx.partitionSpecLocation.asScala.map { @@ -509,8 +511,7 @@ class SparkSqlAstBuilder extends AstBuilder { AlterTableAddPartition( visitTableIdentifier(ctx.tableIdentifier), specsAndLocs, - ctx.EXISTS != null)( - command(ctx)) + ctx.EXISTS != null) } /** @@ -523,11 +524,8 @@ class SparkSqlAstBuilder extends AstBuilder { */ override def visitExchangeTablePartition( ctx: ExchangeTablePartitionContext): LogicalPlan = withOrigin(ctx) { - AlterTableExchangePartition( - visitTableIdentifier(ctx.from), - visitTableIdentifier(ctx.to), - visitNonOptionalPartitionSpec(ctx.partitionSpec))( - command(ctx)) + throw new AnalysisException( + "Operation not allowed: ALTER TABLE ... EXCHANGE PARTITION ...") } /** @@ -543,8 +541,7 @@ class SparkSqlAstBuilder extends AstBuilder { AlterTableRenamePartition( visitTableIdentifier(ctx.tableIdentifier), visitNonOptionalPartitionSpec(ctx.from), - visitNonOptionalPartitionSpec(ctx.to))( - command(ctx)) + visitNonOptionalPartitionSpec(ctx.to)) } /** @@ -561,13 +558,16 @@ class SparkSqlAstBuilder extends AstBuilder { */ override def visitDropTablePartitions( ctx: DropTablePartitionsContext): LogicalPlan = withOrigin(ctx) { - if (ctx.VIEW != null) throw new ParseException(s"Operation not allowed: partitioned views", ctx) + if (ctx.VIEW != null) { + throw new AnalysisException(s"Operation not allowed: partitioned views") + } + if (ctx.PURGE != null) { + throw new AnalysisException(s"Operation not allowed: PURGE") + } AlterTableDropPartition( visitTableIdentifier(ctx.tableIdentifier), ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), - ctx.EXISTS != null, - ctx.PURGE != null)( - command(ctx)) + ctx.EXISTS != null) } /** @@ -580,10 +580,8 @@ class SparkSqlAstBuilder extends AstBuilder { */ override def visitArchiveTablePartition( ctx: ArchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) { - AlterTableArchivePartition( - visitTableIdentifier(ctx.tableIdentifier), - visitNonOptionalPartitionSpec(ctx.partitionSpec))( - command(ctx)) + throw new AnalysisException( + "Operation not allowed: ALTER TABLE ... ARCHIVE PARTITION ...") } /** @@ -596,10 +594,8 @@ class SparkSqlAstBuilder extends AstBuilder { */ override def visitUnarchiveTablePartition( ctx: UnarchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) { - AlterTableUnarchivePartition( - visitTableIdentifier(ctx.tableIdentifier), - visitNonOptionalPartitionSpec(ctx.partitionSpec))( - command(ctx)) + throw new AnalysisException( + "Operation not allowed: ALTER TABLE ... UNARCHIVE PARTITION ...") } /** @@ -658,10 +654,7 @@ class SparkSqlAstBuilder extends AstBuilder { * }}} */ override def visitTouchTable(ctx: TouchTableContext): LogicalPlan = withOrigin(ctx) { - AlterTableTouch( - visitTableIdentifier(ctx.tableIdentifier), - Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))( - command(ctx)) + throw new AnalysisException("Operation not allowed: ALTER TABLE ... TOUCH ...") } /** @@ -673,11 +666,7 @@ class SparkSqlAstBuilder extends AstBuilder { * }}} */ override def visitCompactTable(ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) { - AlterTableCompact( - visitTableIdentifier(ctx.tableIdentifier), - Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), - string(ctx.STRING))( - command(ctx)) + throw new AnalysisException("Operation not allowed: ALTER TABLE ... COMPACT ...") } /** @@ -689,10 +678,7 @@ class SparkSqlAstBuilder extends AstBuilder { * }}} */ override def visitConcatenateTable(ctx: ConcatenateTableContext): LogicalPlan = withOrigin(ctx) { - AlterTableMerge( - visitTableIdentifier(ctx.tableIdentifier), - Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))( - command(ctx)) + throw new AnalysisException("Operation not allowed: ALTER TABLE ... CONCATENATE") } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 8a37cf8f4cb047e49f39b653c6f3c0f035f9098b..c55b1a690e85bf02fadc2bdb168c0e133ac85d1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.command import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.types._ @@ -348,53 +349,94 @@ case class AlterTableSerDeProperties( } /** - * Add Partition in ALTER TABLE/VIEW: add the table/view partitions. + * Add Partition in ALTER TABLE: add the table partitions. + * * 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE, * EXCEPT that it is ILLEGAL to specify a LOCATION clause. * An error message will be issued if the partition exists, unless 'ifNotExists' is true. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1'] + * }}} */ case class AlterTableAddPartition( tableName: TableIdentifier, partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])], - ifNotExists: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging + ifNotExists: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + if (DDLUtils.isDatasourceTable(table)) { + throw new AnalysisException( + "alter table add partition is not allowed for tables defined using the datasource API") + } + val parts = partitionSpecsAndLocs.map { case (spec, location) => + // inherit table storage format (possibly except for location) + CatalogTablePartition(spec, table.storage.copy(locationUri = location)) + } + catalog.createPartitions(tableName, parts, ignoreIfExists = ifNotExists) + Seq.empty[Row] + } +} + +/** + * Alter a table partition's spec. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2; + * }}} + */ case class AlterTableRenamePartition( tableName: TableIdentifier, oldPartition: TablePartitionSpec, - newPartition: TablePartitionSpec)(sql: String) - extends NativeDDLCommand(sql) with Logging + newPartition: TablePartitionSpec) + extends RunnableCommand { -case class AlterTableExchangePartition( - fromTableName: TableIdentifier, - toTableName: TableIdentifier, - spec: TablePartitionSpec)(sql: String) - extends NativeDDLCommand(sql) with Logging + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.sessionState.catalog.renamePartitions( + tableName, Seq(oldPartition), Seq(newPartition)) + Seq.empty[Row] + } + +} /** - * Drop Partition in ALTER TABLE/VIEW: to drop a particular partition for a table/view. + * Drop Partition in ALTER TABLE: to drop a particular partition for a table. + * * This removes the data and metadata for this partition. * The data is actually moved to the .Trash/Current directory if Trash is configured, * unless 'purge' is true, but the metadata is completely lost. * An error message will be issued if the partition does not exist, unless 'ifExists' is true. * Note: purge is always false when the target is a view. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * }}} */ case class AlterTableDropPartition( tableName: TableIdentifier, specs: Seq[TablePartitionSpec], - ifExists: Boolean, - purge: Boolean)(sql: String) - extends NativeDDLCommand(sql) with Logging + ifExists: Boolean) + extends RunnableCommand { -case class AlterTableArchivePartition( - tableName: TableIdentifier, - spec: TablePartitionSpec)(sql: String) - extends NativeDDLCommand(sql) with Logging + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + if (DDLUtils.isDatasourceTable(table)) { + throw new AnalysisException( + "alter table drop partition is not allowed for tables defined using the datasource API") + } + catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists) + Seq.empty[Row] + } -case class AlterTableUnarchivePartition( - tableName: TableIdentifier, - spec: TablePartitionSpec)(sql: String) - extends NativeDDLCommand(sql) with Logging +} case class AlterTableSetFileFormat( tableName: TableIdentifier, @@ -453,22 +495,6 @@ case class AlterTableSetLocation( } -case class AlterTableTouch( - tableName: TableIdentifier, - partitionSpec: Option[TablePartitionSpec])(sql: String) - extends NativeDDLCommand(sql) with Logging - -case class AlterTableCompact( - tableName: TableIdentifier, - partitionSpec: Option[TablePartitionSpec], - compactType: String)(sql: String) - extends NativeDDLCommand(sql) with Logging - -case class AlterTableMerge( - tableName: TableIdentifier, - partitionSpec: Option[TablePartitionSpec])(sql: String) - extends NativeDDLCommand(sql) with Logging - case class AlterTableChangeCol( tableName: TableIdentifier, partitionSpec: Option[TablePartitionSpec], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index ac69518ddf689df30aea9ab0e42a3b3e005147d0..1c8dd6828673eb5bc153d180c1dd73f73aa4a9c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.command +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest @@ -24,9 +25,17 @@ import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.types._ +// TODO: merge this with DDLSuite (SPARK-14441) class DDLCommandSuite extends PlanTest { private val parser = SparkSqlParser + private def assertUnsupported(sql: String): Unit = { + val e = intercept[AnalysisException] { + parser.parsePlan(sql) + } + assert(e.getMessage.toLowerCase.contains("operation not allowed")) + } + test("create database") { val sql = """ @@ -326,11 +335,11 @@ class DDLCommandSuite extends PlanTest { Seq( (Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")), (Map("dt" -> "2009-09-09", "country" -> "uk"), None)), - ifNotExists = true)(sql1) + ifNotExists = true) val expected2 = AlterTableAddPartition( TableIdentifier("table_name", None), Seq((Map("dt" -> "2008-08-08"), Some("loc"))), - ifNotExists = false)(sql2) + ifNotExists = false) comparePlans(parsed1, expected1) comparePlans(parsed2, expected2) @@ -369,22 +378,16 @@ class DDLCommandSuite extends PlanTest { val expected = AlterTableRenamePartition( TableIdentifier("table_name", None), Map("dt" -> "2008-08-08", "country" -> "us"), - Map("dt" -> "2008-09-09", "country" -> "uk"))(sql) + Map("dt" -> "2008-09-09", "country" -> "uk")) comparePlans(parsed, expected) } - test("alter table: exchange partition") { - val sql = + test("alter table: exchange partition (not supported)") { + assertUnsupported( """ |ALTER TABLE table_name_1 EXCHANGE PARTITION |(dt='2008-08-08', country='us') WITH TABLE table_name_2 - """.stripMargin - val parsed = parser.parsePlan(sql) - val expected = AlterTableExchangePartition( - TableIdentifier("table_name_1", None), - TableIdentifier("table_name_2", None), - Map("dt" -> "2008-08-08", "country" -> "us"))(sql) - comparePlans(parsed, expected) + """.stripMargin) } // ALTER TABLE table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE] @@ -405,7 +408,10 @@ class DDLCommandSuite extends PlanTest { val sql2_view = sql2_table.replace("TABLE", "VIEW").replace("PURGE", "") val parsed1_table = parser.parsePlan(sql1_table) - val parsed2_table = parser.parsePlan(sql2_table) + val e = intercept[ParseException] { + parser.parsePlan(sql2_table) + } + assert(e.getMessage.contains("Operation not allowed")) intercept[ParseException] { parser.parsePlan(sql1_view) @@ -420,36 +426,17 @@ class DDLCommandSuite extends PlanTest { Seq( Map("dt" -> "2008-08-08", "country" -> "us"), Map("dt" -> "2009-09-09", "country" -> "uk")), - ifExists = true, - purge = false)(sql1_table) - val expected2_table = AlterTableDropPartition( - tableIdent, - Seq( - Map("dt" -> "2008-08-08", "country" -> "us"), - Map("dt" -> "2009-09-09", "country" -> "uk")), - ifExists = false, - purge = true)(sql2_table) + ifExists = true) comparePlans(parsed1_table, expected1_table) - comparePlans(parsed2_table, expected2_table) } - test("alter table: archive partition") { - val sql = "ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')" - val parsed = parser.parsePlan(sql) - val expected = AlterTableArchivePartition( - TableIdentifier("table_name", None), - Map("dt" -> "2008-08-08", "country" -> "us"))(sql) - comparePlans(parsed, expected) + test("alter table: archive partition (not supported)") { + assertUnsupported("ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')") } - test("alter table: unarchive partition") { - val sql = "ALTER TABLE table_name UNARCHIVE PARTITION (dt='2008-08-08', country='us')" - val parsed = parser.parsePlan(sql) - val expected = AlterTableUnarchivePartition( - TableIdentifier("table_name", None), - Map("dt" -> "2008-08-08", "country" -> "us"))(sql) - comparePlans(parsed, expected) + test("alter table: unarchive partition (not supported)") { + assertUnsupported("ALTER TABLE table_name UNARCHIVE PARTITION (dt='2008-08-08', country='us')") } test("alter table: set file format") { @@ -505,55 +492,24 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed2, expected2) } - test("alter table: touch") { - val sql1 = "ALTER TABLE table_name TOUCH" - val sql2 = "ALTER TABLE table_name TOUCH PARTITION (dt='2008-08-08', country='us')" - val parsed1 = parser.parsePlan(sql1) - val parsed2 = parser.parsePlan(sql2) - val tableIdent = TableIdentifier("table_name", None) - val expected1 = AlterTableTouch( - tableIdent, - None)(sql1) - val expected2 = AlterTableTouch( - tableIdent, - Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2) - comparePlans(parsed1, expected1) - comparePlans(parsed2, expected2) + test("alter table: touch (not supported)") { + assertUnsupported("ALTER TABLE table_name TOUCH") + assertUnsupported("ALTER TABLE table_name TOUCH PARTITION (dt='2008-08-08', country='us')") } - test("alter table: compact") { - val sql1 = "ALTER TABLE table_name COMPACT 'compaction_type'" - val sql2 = + test("alter table: compact (not supported)") { + assertUnsupported("ALTER TABLE table_name COMPACT 'compaction_type'") + assertUnsupported( """ - |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') - |COMPACT 'MAJOR' - """.stripMargin - val parsed1 = parser.parsePlan(sql1) - val parsed2 = parser.parsePlan(sql2) - val tableIdent = TableIdentifier("table_name", None) - val expected1 = AlterTableCompact( - tableIdent, - None, - "compaction_type")(sql1) - val expected2 = AlterTableCompact( - tableIdent, - Some(Map("dt" -> "2008-08-08", "country" -> "us")), - "MAJOR")(sql2) - comparePlans(parsed1, expected1) - comparePlans(parsed2, expected2) + |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') + |COMPACT 'MAJOR' + """.stripMargin) } - test("alter table: concatenate") { - val sql1 = "ALTER TABLE table_name CONCATENATE" - val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE" - val parsed1 = parser.parsePlan(sql1) - val parsed2 = parser.parsePlan(sql2) - val tableIdent = TableIdentifier("table_name", None) - val expected1 = AlterTableMerge(tableIdent, None)(sql1) - val expected2 = AlterTableMerge( - tableIdent, Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2) - comparePlans(parsed1, expected1) - comparePlans(parsed2, expected2) + test("alter table: concatenate (not supported)") { + assertUnsupported("ALTER TABLE table_name CONCATENATE") + assertUnsupported( + "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE") } test("alter table: change column name/type/position/comment") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index c6479bf33e4d8453d604e0352d167a421d8bcf26..40a8b0e614b2e7a84faedcf7ed10fcc1e310f0f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -58,6 +58,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(e.getMessage.toLowerCase.contains("operation not allowed")) } + private def maybeWrapException[T](expectException: Boolean)(body: => T): Unit = { + if (expectException) intercept[AnalysisException] { body } else body + } + private def createDatabase(catalog: SessionCatalog, name: String): Unit = { catalog.createDatabase(CatalogDatabase(name, "", "", Map()), ignoreIfExists = false) } @@ -320,6 +324,62 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assertUnsupported("ALTER TABLE dbx.tab1 NOT STORED AS DIRECTORIES") } + test("alter table: add partition") { + testAddPartitions(isDatasourceTable = false) + } + + test("alter table: add partition (datasource table)") { + testAddPartitions(isDatasourceTable = true) + } + + test("alter table: add partition is not supported for views") { + assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION (b='2')") + } + + test("alter table: drop partition") { + testDropPartitions(isDatasourceTable = false) + } + + test("alter table: drop partition (datasource table)") { + testDropPartitions(isDatasourceTable = true) + } + + test("alter table: drop partition is not supported for views") { + assertUnsupported("ALTER VIEW dbx.tab1 DROP IF EXISTS PARTITION (b='2')") + } + + test("alter table: rename partition") { + val catalog = sqlContext.sessionState.catalog + val tableIdent = TableIdentifier("tab1", Some("dbx")) + val part1 = Map("a" -> "1") + val part2 = Map("b" -> "2") + val part3 = Map("c" -> "3") + createDatabase(catalog, "dbx") + createTable(catalog, tableIdent) + createTablePartition(catalog, part1, tableIdent) + createTablePartition(catalog, part2, tableIdent) + createTablePartition(catalog, part3, tableIdent) + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(part1, part2, part3)) + sql("ALTER TABLE dbx.tab1 PARTITION (a='1') RENAME TO PARTITION (a='100')") + sql("ALTER TABLE dbx.tab1 PARTITION (b='2') RENAME TO PARTITION (b='200')") + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(Map("a" -> "100"), Map("b" -> "200"), part3)) + // rename without explicitly specifying database + catalog.setCurrentDatabase("dbx") + sql("ALTER TABLE tab1 PARTITION (a='100') RENAME TO PARTITION (a='10')") + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(Map("a" -> "10"), Map("b" -> "200"), part3)) + // table to alter does not exist + intercept[AnalysisException] { + sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')") + } + // partition to rename does not exist + intercept[AnalysisException] { + sql("ALTER TABLE tab1 PARTITION (x='300') RENAME TO PARTITION (x='333')") + } + } + // TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext test("show tables") { @@ -487,10 +547,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(storageFormat.locationUri === Some(expected)) } } - // Optionally expect AnalysisException - def maybeWrapException[T](expectException: Boolean)(body: => T): Unit = { - if (expectException) intercept[AnalysisException] { body } else body - } // set table location sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'") verifyLocation("/path/to/your/lovely/heart") @@ -564,4 +620,102 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + private def testAddPartitions(isDatasourceTable: Boolean): Unit = { + val catalog = sqlContext.sessionState.catalog + val tableIdent = TableIdentifier("tab1", Some("dbx")) + val part1 = Map("a" -> "1") + val part2 = Map("b" -> "2") + val part3 = Map("c" -> "3") + val part4 = Map("d" -> "4") + createDatabase(catalog, "dbx") + createTable(catalog, tableIdent) + createTablePartition(catalog, part1, tableIdent) + if (isDatasourceTable) { + convertToDatasourceTable(catalog, tableIdent) + } + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1)) + maybeWrapException(isDatasourceTable) { + sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " + + "PARTITION (b='2') LOCATION 'paris' PARTITION (c='3')") + } + if (!isDatasourceTable) { + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3)) + assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty) + assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Some("paris")) + assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty) + } + // add partitions without explicitly specifying database + catalog.setCurrentDatabase("dbx") + maybeWrapException(isDatasourceTable) { + sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')") + } + if (!isDatasourceTable) { + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(part1, part2, part3, part4)) + } + // table to alter does not exist + intercept[AnalysisException] { + sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (d='4')") + } + // partition to add already exists + intercept[AnalysisException] { + sql("ALTER TABLE tab1 ADD PARTITION (d='4')") + } + maybeWrapException(isDatasourceTable) { + sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')") + } + if (!isDatasourceTable) { + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(part1, part2, part3, part4)) + } + } + + private def testDropPartitions(isDatasourceTable: Boolean): Unit = { + val catalog = sqlContext.sessionState.catalog + val tableIdent = TableIdentifier("tab1", Some("dbx")) + val part1 = Map("a" -> "1") + val part2 = Map("b" -> "2") + val part3 = Map("c" -> "3") + val part4 = Map("d" -> "4") + createDatabase(catalog, "dbx") + createTable(catalog, tableIdent) + createTablePartition(catalog, part1, tableIdent) + createTablePartition(catalog, part2, tableIdent) + createTablePartition(catalog, part3, tableIdent) + createTablePartition(catalog, part4, tableIdent) + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(part1, part2, part3, part4)) + if (isDatasourceTable) { + convertToDatasourceTable(catalog, tableIdent) + } + maybeWrapException(isDatasourceTable) { + sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (d='4'), PARTITION (c='3')") + } + if (!isDatasourceTable) { + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2)) + } + // drop partitions without explicitly specifying database + catalog.setCurrentDatabase("dbx") + maybeWrapException(isDatasourceTable) { + sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (b='2')") + } + if (!isDatasourceTable) { + assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1)) + } + // table to alter does not exist + intercept[AnalysisException] { + sql("ALTER TABLE does_not_exist DROP IF EXISTS PARTITION (b='2')") + } + // partition to drop does not exist + intercept[AnalysisException] { + sql("ALTER TABLE tab1 DROP PARTITION (x='300')") + } + maybeWrapException(isDatasourceTable) { + sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (x='300')") + } + if (!isDatasourceTable) { + assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1)) + } + } + } diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 9e3cb18d457cefacbab81bd17ccf6f55a5bf5198..f0eeda09dba5a81fb77294617ec7170c1e93eb10 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -376,7 +376,13 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Create partitioned view is not supported "create_like_view", - "describe_formatted_view_partitioned" + "describe_formatted_view_partitioned", + + // This uses CONCATENATE, which we don't support + "alter_merge_2", + + // TOUCH is not supported + "touch" ) /** @@ -392,7 +398,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "alter2", "alter3", "alter5", - "alter_merge_2", "alter_partition_format_loc", "alter_partition_with_whitelist", "alter_rename_partition", @@ -897,7 +902,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "timestamp_comparison", "timestamp_lazy", "timestamp_null", - "touch", "transform_ppr1", "transform_ppr2", "truncate_table", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index a49ce33ba157bbaf2ad4d4745b08d353a50498f6..482f47428d33a8e5b3f10bad507303a7d9944657 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -219,26 +219,7 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat parts: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean): Unit = withClient { requireTableExists(db, table) - // Note: Unfortunately Hive does not currently support `ignoreIfNotExists` so we - // need to implement it here ourselves. This is currently somewhat expensive because - // we make multiple synchronous calls to Hive for each partition we want to drop. - val partsToDrop = - if (ignoreIfNotExists) { - parts.filter { spec => - try { - getPartition(db, table, spec) - true - } catch { - // Filter out the partitions that do not actually exist - case _: AnalysisException => false - } - } - } else { - parts - } - if (partsToDrop.nonEmpty) { - client.dropPartitions(db, table, partsToDrop) - } + client.dropPartitions(db, table, parts, ignoreIfNotExists) } override def renamePartitions( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 94794b1572eecfb7341e1cdc4efb744a5cea82f3..6f7e7bf45106ffe13b6bf726d608f95af875ecdd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -120,16 +120,13 @@ private[hive] trait HiveClient { ignoreIfExists: Boolean): Unit /** - * Drop one or many partitions in the given table. - * - * Note: Unfortunately, Hive does not currently provide a way to ignore this call if the - * partitions do not already exist. The seemingly relevant flag `ifExists` in - * [[org.apache.hadoop.hive.metastore.PartitionDropOptions]] is not read anywhere. + * Drop one or many partitions in the given table, assuming they exist. */ def dropPartitions( db: String, table: String, - specs: Seq[ExternalCatalog.TablePartitionSpec]): Unit + specs: Seq[ExternalCatalog.TablePartitionSpec], + ignoreIfNotExists: Boolean): Unit /** * Rename one or many existing table partitions, assuming they exist. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index a037671ef04f5717d3827335235ef6489f1b259b..39e26acd7fe9bd3b8467e1bfbb6764494a75d077 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.cli.CliSessionState import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} +import org.apache.hadoop.hive.metastore.{PartitionDropOptions, TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceType, ResourceUri} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} @@ -367,9 +367,25 @@ private[hive] class HiveClientImpl( override def dropPartitions( db: String, table: String, - specs: Seq[ExternalCatalog.TablePartitionSpec]): Unit = withHiveState { + specs: Seq[ExternalCatalog.TablePartitionSpec], + ignoreIfNotExists: Boolean): Unit = withHiveState { // TODO: figure out how to drop multiple partitions in one call - specs.foreach { s => client.dropPartition(db, table, s.values.toList.asJava, true) } + val hiveTable = client.getTable(db, table, true /* throw exception */) + specs.foreach { s => + // The provided spec here can be a partial spec, i.e. it will match all partitions + // whose specs are supersets of this partial spec. E.g. If a table has partitions + // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both. + val matchingParts = client.getPartitions(hiveTable, s.asJava).asScala + if (matchingParts.isEmpty && !ignoreIfNotExists) { + throw new AnalysisException( + s"partition to drop '$s' does not exist in table '$table' database '$db'") + } + matchingParts.foreach { hivePartition => + val dropOptions = new PartitionDropOptions + dropOptions.ifExists = ignoreIfNotExists + client.dropPartition(db, table, hivePartition.getValues, dropOptions) + } + } } override def renamePartitions( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index a144da49976ec99ad8cdd02b91d75c702aa1a604..e8086aec327bf4445d5bec367e2c6cd6b7a7f320 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -41,6 +41,13 @@ class HiveDDLCommandSuite extends PlanTest { }.head } + private def assertUnsupported(sql: String): Unit = { + val e = intercept[ParseException] { + parser.parsePlan(sql) + } + assert(e.getMessage.toLowerCase.contains("unsupported")) + } + test("Test CTAS #1") { val s1 = """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view @@ -367,4 +374,9 @@ class HiveDDLCommandSuite extends PlanTest { parser.parsePlan(v1).isInstanceOf[HiveNativeCommand] } } + + test("MSCK repair table (not supported)") { + assertUnsupported("MSCK REPAIR TABLE tab1") + } + }