diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 816e4af2df666a1d38a98323355f9a1521d5b0a5..15aed5f9b1bdf40607ca40395a6c6baffc4743e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -229,9 +229,22 @@ class InMemoryCatalog( if (tableExists(db, table)) { val tableMeta = getTable(db, table) if (tableMeta.tableType == CatalogTableType.MANAGED) { + // Delete the data/directory for each partition + val locationAllParts = catalog(db).tables(table).partitions.values.toSeq.map(_.location) + locationAllParts.foreach { loc => + val partitionPath = new Path(loc) + try { + val fs = partitionPath.getFileSystem(hadoopConfig) + fs.delete(partitionPath, true) + } catch { + case e: IOException => + throw new SparkException(s"Unable to delete partition path $partitionPath", e) + } + } assert(tableMeta.storage.locationUri.isDefined, "Managed table should always have table location, as we will assign a default location " + "to it if it doesn't have one.") + // Delete the data/directory of the table val dir = new Path(tableMeta.location) try { val fs = dir.getFileSystem(hadoopConfig) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 742f90084042468aa7bb0b712f001d650f1e926c..176cccce65f0753dcb41f2e54388cd54359c8c58 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -324,7 +324,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val table = CatalogTable( identifier = TableIdentifier("tbl", Some("db1")), tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + storage = CatalogStorageFormat.empty, schema = new StructType() .add("col1", "int") .add("col2", "string") @@ -346,6 +346,46 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(new Path(partitionLocation) == defaultPartitionLocation) } + test("create/drop partitions in managed tables with location") { + val catalog = newBasicCatalog() + val table = CatalogTable( + identifier = TableIdentifier("tbl", Some("db1")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType() + .add("col1", "int") + .add("col2", "string") + .add("partCol1", "int") + .add("partCol2", "string"), + provider = Some("hive"), + partitionColumnNames = Seq("partCol1", "partCol2")) + catalog.createTable(table, ignoreIfExists = false) + + val newLocationPart1 = newUriForDatabase() + val newLocationPart2 = newUriForDatabase() + + val partition1 = + CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"), + storageFormat.copy(locationUri = Some(newLocationPart1))) + val partition2 = + CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"), + storageFormat.copy(locationUri = Some(newLocationPart2))) + catalog.createPartitions("db1", "tbl", Seq(partition1), ignoreIfExists = false) + catalog.createPartitions("db1", "tbl", Seq(partition2), ignoreIfExists = false) + + assert(exists(newLocationPart1)) + assert(exists(newLocationPart2)) + + // the corresponding directory is dropped. + catalog.dropPartitions("db1", "tbl", Seq(partition1.spec), + ignoreIfNotExists = false, purge = false, retainData = false) + assert(!exists(newLocationPart1)) + + // all the remaining directories are dropped. + catalog.dropTable("db1", "tbl", ignoreIfNotExists = false, purge = false) + assert(!exists(newLocationPart2)) + } + test("list partition names") { val catalog = newBasicCatalog() val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "%="), storageFormat) @@ -459,7 +499,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val table = CatalogTable( identifier = TableIdentifier("tbl", Some("db1")), tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + storage = CatalogStorageFormat.empty, schema = new StructType() .add("col1", "int") .add("col2", "string") @@ -684,7 +724,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val table = CatalogTable( identifier = TableIdentifier("my_table", Some("db1")), tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + storage = CatalogStorageFormat.empty, schema = new StructType().add("a", "int").add("b", "string"), provider = Some("hive") ) @@ -717,7 +757,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val table = CatalogTable( identifier = TableIdentifier("tbl", Some("db1")), tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + storage = CatalogStorageFormat.empty, schema = new StructType() .add("col1", "int") .add("col2", "string") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 522158b6416739bec75dbad4ced3aaca654c87fb..59a29e884739a89a02a5a209d3c96c56f81250e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -400,13 +400,12 @@ case class AlterTableSerDePropertiesCommand( /** * Add Partition in ALTER TABLE: add the table partitions. * - * 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE, - * EXCEPT that it is ILLEGAL to specify a LOCATION clause. * An error message will be issued if the partition exists, unless 'ifNotExists' is true. * * The syntax of this command is: * {{{ - * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1'] + * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec1 [LOCATION 'loc1'] + * PARTITION spec2 [LOCATION 'loc2'] * }}} */ case class AlterTableAddPartitionCommand( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index f313db641b15204cc9744243b032acafecd44eb3..8b3421953025919c32d7fa2099b63ceeed194d16 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -199,6 +199,52 @@ class HiveDDLSuite assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a") } + test("add/drop partition with location - managed table") { + val tab = "tab_with_partitions" + withTempDir { tmpDir => + val basePath = new File(tmpDir.getCanonicalPath) + val part1Path = new File(basePath + "/part1") + val part2Path = new File(basePath + "/part2") + val dirSet = part1Path :: part2Path :: Nil + + // Before data insertion, all the directory are empty + assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty)) + + withTable(tab) { + sql( + s""" + |CREATE TABLE $tab (key INT, value STRING) + |PARTITIONED BY (ds STRING, hr STRING) + """.stripMargin) + sql( + s""" + |ALTER TABLE $tab ADD + |PARTITION (ds='2008-04-08', hr=11) LOCATION '$part1Path' + |PARTITION (ds='2008-04-08', hr=12) LOCATION '$part2Path' + """.stripMargin) + assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty)) + + sql(s"INSERT OVERWRITE TABLE $tab partition (ds='2008-04-08', hr=11) SELECT 1, 'a'") + sql(s"INSERT OVERWRITE TABLE $tab partition (ds='2008-04-08', hr=12) SELECT 2, 'b'") + // add partition will not delete the data + assert(dirSet.forall(dir => dir.listFiles.nonEmpty)) + checkAnswer( + spark.table(tab), + Row(1, "a", "2008-04-08", "11") :: Row(2, "b", "2008-04-08", "12") :: Nil + ) + + sql(s"ALTER TABLE $tab DROP PARTITION (ds='2008-04-08', hr=11)") + // drop partition will delete the data + assert(part1Path.listFiles == null || part1Path.listFiles.isEmpty) + assert(part2Path.listFiles.nonEmpty) + + sql(s"DROP TABLE $tab") + // drop table will delete the data of the managed table + assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty)) + } + } + } + test("add/drop partitions - external table") { val catalog = spark.sessionState.catalog withTempDir { tmpDir => @@ -257,9 +303,15 @@ class HiveDDLSuite // drop partition will not delete the data of external table assert(dirSet.forall(dir => dir.listFiles.nonEmpty)) - sql(s"ALTER TABLE $externalTab ADD PARTITION (ds='2008-04-08', hr='12')") + sql( + s""" + |ALTER TABLE $externalTab ADD PARTITION (ds='2008-04-08', hr='12') + |PARTITION (ds='2008-04-08', hr=11) + """.stripMargin) assert(catalog.listPartitions(TableIdentifier(externalTab)).map(_.spec).toSet == - Set(Map("ds" -> "2008-04-08", "hr" -> "12"), Map("ds" -> "2008-04-09", "hr" -> "11"))) + Set(Map("ds" -> "2008-04-08", "hr" -> "11"), + Map("ds" -> "2008-04-08", "hr" -> "12"), + Map("ds" -> "2008-04-09", "hr" -> "11"))) // add partition will not delete the data assert(dirSet.forall(dir => dir.listFiles.nonEmpty))