Skip to content
Snippets Groups Projects
Commit 50a99126 authored by Wenchen Fan's avatar Wenchen Fan Committed by gatorsmile
Browse files

[SPARK-19359][SQL] renaming partition should not leave useless directories

## What changes were proposed in this pull request?

Hive metastore is not case-preserving and keep partition columns with lower case names. If Spark SQL creates a table with upper-case partition column names using `HiveExternalCatalog`, when we rename partition, it first calls the HiveClient to renamePartition, which will create a new lower case partition path, then Spark SQL renames the lower case path to upper-case.

However, when we rename a nested path, different file systems have different behaviors. e.g. in jenkins, renaming `a=1/b=2` to `A=2/B=2` will success, but leave an empty directory `a=1`. in mac os, the renaming doesn't work as expected and result to `a=1/B=2`.

This PR renames the partition directory recursively from the first partition column in `HiveExternalCatalog`, to be most compatible with different file systems.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16837 from cloud-fan/partition.
parent 64cae22f
No related branches found
No related tags found
No related merge requests found
......@@ -108,18 +108,21 @@ object ExternalCatalogUtils {
partitionColumnNames: Seq[String],
tablePath: Path): Path = {
val partitionPathStrings = partitionColumnNames.map { col =>
val partitionValue = spec(col)
val partitionString = if (partitionValue == null) {
DEFAULT_PARTITION_NAME
} else {
escapePathName(partitionValue)
}
escapePathName(col) + "=" + partitionString
getPartitionPathString(col, spec(col))
}
partitionPathStrings.foldLeft(tablePath) { (totalPath, nextPartPath) =>
new Path(totalPath, nextPartPath)
}
}
def getPartitionPathString(col: String, value: String): String = {
val partitionString = if (value == null) {
DEFAULT_PARTITION_NAME
} else {
escapePathName(value)
}
escapePathName(col) + "=" + partitionString
}
}
object CatalogUtils {
......
......@@ -892,21 +892,58 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val hasUpperCasePartitionColumn = partitionColumnNames.exists(col => col.toLowerCase != col)
if (tableMeta.tableType == MANAGED && hasUpperCasePartitionColumn) {
val tablePath = new Path(tableMeta.location)
val fs = tablePath.getFileSystem(hadoopConf)
val newParts = newSpecs.map { spec =>
val rightPath = renamePartitionDirectory(fs, tablePath, partitionColumnNames, spec)
val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec))
val wrongPath = new Path(partition.location)
val rightPath = ExternalCatalogUtils.generatePartitionPath(
spec, partitionColumnNames, tablePath)
partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toString)))
}
alterPartitions(db, table, newParts)
}
}
/**
* Rename the partition directory w.r.t. the actual partition columns.
*
* It will recursively rename the partition directory from the first partition column, to be most
* compatible with different file systems. e.g. in some file systems, renaming `a=1/b=2` to
* `A=1/B=2` will result to `a=1/B=2`, while in some other file systems, the renaming works, but
* will leave an empty directory `a=1`.
*/
private def renamePartitionDirectory(
fs: FileSystem,
tablePath: Path,
partCols: Seq[String],
newSpec: TablePartitionSpec): Path = {
import ExternalCatalogUtils.getPartitionPathString
var currentFullPath = tablePath
partCols.foreach { col =>
val partValue = newSpec(col)
val expectedPartitionString = getPartitionPathString(col, partValue)
val expectedPartitionPath = new Path(currentFullPath, expectedPartitionString)
if (fs.exists(expectedPartitionPath)) {
// It is possible that some parental partition directories already exist or doesn't need to
// be renamed. e.g. the partition columns are `a` and `B`, then we don't need to rename
// `/table_path/a=1`. Or we already have a partition directory `A=1/B=2`, and we rename
// another partition to `A=1/B=3`, then we will have `A=1/B=2` and `a=1/b=3`, and we should
// just move `a=1/b=3` into `A=1` with new name `B=3`.
} else {
val actualPartitionString = getPartitionPathString(col.toLowerCase, partValue)
val actualPartitionPath = new Path(currentFullPath, actualPartitionString)
try {
tablePath.getFileSystem(hadoopConf).rename(wrongPath, rightPath)
fs.rename(actualPartitionPath, expectedPartitionPath)
} catch {
case e: IOException => throw new SparkException(
s"Unable to rename partition path from $wrongPath to $rightPath", e)
case e: IOException =>
throw new SparkException("Unable to rename partition path from " +
s"$actualPartitionPath to $expectedPartitionPath", e)
}
partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toString)))
}
alterPartitions(db, table, newParts)
currentFullPath = expectedPartitionPath
}
currentFullPath
}
override def alterPartitions(
......
......@@ -19,8 +19,11 @@ package org.apache.spark.sql.hive
import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
......@@ -481,4 +484,30 @@ class PartitionProviderCompatibilitySuite
assert(spark.sql("show partitions test").count() == 5)
}
}
test("SPARK-19359: renaming partition should not leave useless directories") {
withTable("t", "t1") {
Seq((1, 2, 3)).toDF("id", "A", "B").write.partitionBy("A", "B").saveAsTable("t")
spark.sql("alter table t partition(A=2, B=3) rename to partition(A=4, B=5)")
var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
var tablePath = new Path(table.location)
val fs = tablePath.getFileSystem(spark.sessionState.newHadoopConf())
// the `A=2` directory is still there, we follow this behavior from hive.
assert(fs.listStatus(tablePath)
.filterNot(_.getPath.toString.contains("A=2")).count(_.isDirectory) == 1)
assert(fs.listStatus(new Path(tablePath, "A=4")).count(_.isDirectory) == 1)
Seq((1, 2, 3, 4)).toDF("id", "A", "b", "C").write.partitionBy("A", "b", "C").saveAsTable("t1")
spark.sql("alter table t1 partition(A=2, b=3, C=4) rename to partition(A=4, b=5, C=6)")
table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
tablePath = new Path(table.location)
// the `A=2` directory is still there, we follow this behavior from hive.
assert(fs.listStatus(tablePath)
.filterNot(_.getPath.toString.contains("A=2")).count(_.isDirectory) == 1)
assert(fs.listStatus(new Path(tablePath, "A=4")).count(_.isDirectory) == 1)
assert(fs.listStatus(new Path(new Path(tablePath, "A=4"), "b=5")).count(_.isDirectory) == 1)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment