Skip to content
Snippets Groups Projects
Commit 9dc3ef6e authored by Eric Liang's avatar Eric Liang Committed by Wenchen Fan
Browse files

[SPARK-18635][SQL] Partition name/values not escaped correctly in some cases


## What changes were proposed in this pull request?

Due to confusion between URI vs paths, in certain cases we escape partition values too many times, which causes some Hive client operations to fail or write data to the wrong location. This PR fixes at least some of these cases.

To my understanding this is how values, filesystem paths, and URIs interact.
- Hive stores raw (unescaped) partition values that are returned to you directly when you call listPartitions.
- Internally, we convert these raw values to filesystem paths via `ExternalCatalogUtils.[un]escapePathName`.
- In some circumstances we store URIs instead of filesystem paths. When a path is converted to a URI via `path.toURI`, the escaped partition values are further URI-encoded. This means that to get a path back from a URI, you must call `new Path(new URI(uriTxt))` in order to decode the URI-encoded string.
- In `CatalogStorageFormat` we store URIs as strings. This makes it easy to forget to URI-decode the value before converting it into a path.
- Finally, the Hive client itself uses mostly Paths for representing locations, and only URIs occasionally.

In the future we should probably clean this up, perhaps by dropping use of URIs when unnecessary. We should also try fixing escaping for partition names as well as values, though names are unlikely to contain special characters.

cc mallman cloud-fan yhuai

## How was this patch tested?

Unit tests.

Author: Eric Liang <ekl@databricks.com>

Closes #16071 from ericl/spark-18635.

(cherry picked from commit 88f559f2)
Signed-off-by: default avatarWenchen Fan <wenchen@databricks.com>
parent e8d8e350
No related branches found
No related tags found
No related merge requests found
......@@ -44,6 +44,9 @@ case class CatalogFunction(
* Storage format, used to describe how a partition or a table is stored.
*/
case class CatalogStorageFormat(
// TODO(ekl) consider storing this field as java.net.URI for type safety. Note that this must
// be converted to/from a hadoop Path object using new Path(new URI(locationUri)) and
// path.toUri respectively before use as a filesystem path due to URI char escaping.
locationUri: Option[String],
inputFormat: Option[String],
outputFormat: Option[String],
......
......@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive
import java.io.IOException
import java.net.URI
import java.util
import scala.util.control.NonFatal
......@@ -833,10 +834,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// However, Hive metastore is not case preserving and will generate wrong partition location
// with lower cased partition column names. Here we set the default partition location
// manually to avoid this problem.
val partitionPath = p.storage.locationUri.map(new Path(_)).getOrElse {
val partitionPath = p.storage.locationUri.map(uri => new Path(new URI(uri))).getOrElse {
ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath)
}
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString)))
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri.toString)))
}
val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
......
......@@ -268,7 +268,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
ignoreIfExists: Boolean): Unit = {
val table = hive.getTable(database, tableName)
parts.foreach { s =>
val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull
val location = s.storage.locationUri.map(
uri => new Path(table.getPath, new Path(new URI(uri)))).orNull
val params = if (s.parameters.nonEmpty) s.parameters.asJava else null
val spec = s.spec.asJava
if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
......@@ -463,7 +464,8 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
ignoreIfExists: Boolean): Unit = {
val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
parts.zipWithIndex.foreach { case (s, i) =>
addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
addPartitionDesc.addPartition(
s.spec.asJava, s.storage.locationUri.map(u => new Path(new URI(u)).toString).orNull)
if (s.parameters.nonEmpty) {
addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
}
......
......@@ -205,6 +205,60 @@ class PartitionProviderCompatibilitySuite
}
}
}
test(s"SPARK-18635 special chars in partition values - partition management $enabled") {
withTable("test") {
spark.range(10)
.selectExpr("id", "id as A", "'%' as B")
.write.partitionBy("A", "B").mode("overwrite")
.saveAsTable("test")
assert(spark.sql("select * from test").count() == 10)
assert(spark.sql("select * from test where B = '%'").count() == 10)
assert(spark.sql("select * from test where B = '$'").count() == 0)
spark.range(10)
.selectExpr("id", "id as A", "'=' as B")
.write.mode("append").insertInto("test")
spark.sql("insert into test partition (A, B) select id, id, '%=' from range(10)")
assert(spark.sql("select * from test").count() == 30)
assert(spark.sql("select * from test where B = '%'").count() == 10)
assert(spark.sql("select * from test where B = '='").count() == 10)
assert(spark.sql("select * from test where B = '%='").count() == 10)
// show partitions sanity check
val parts = spark.sql("show partitions test").collect().map(_.get(0)).toSeq
assert(parts.length == 30)
assert(parts.contains("A=0/B=%25"))
assert(parts.contains("A=0/B=%3D"))
assert(parts.contains("A=0/B=%25%3D"))
// drop partition sanity check
spark.sql("alter table test drop partition (A=1, B='%')")
assert(spark.sql("select * from test").count() == 29) // 1 file in dropped partition
withTempDir { dir =>
// custom locations sanity check
spark.sql(s"""
|alter table test partition (A=0, B='%')
|set location '${dir.getAbsolutePath}'""".stripMargin)
assert(spark.sql("select * from test").count() == 28) // moved to empty dir
// rename partition sanity check
spark.sql(s"""
|alter table test partition (A=5, B='%')
|rename to partition (A=100, B='%')""".stripMargin)
assert(spark.sql("select * from test where a = 5 and b = '%'").count() == 0)
assert(spark.sql("select * from test where a = 100 and b = '%'").count() == 1)
// try with A=0 which has a custom location
spark.sql("insert into test partition (A=0, B='%') select 1")
spark.sql(s"""
|alter table test partition (A=0, B='%')
|rename to partition (A=101, B='%')""".stripMargin)
assert(spark.sql("select * from test where a = 0 and b = '%'").count() == 0)
assert(spark.sql("select * from test where a = 101 and b = '%'").count() == 1)
}
}
}
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment