diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index d8bc86727e466309b056ab74c4c592c22f4938c5..d2a1af08009144bcd7fb2fbc0ccbd595dede8698 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -44,6 +44,9 @@ case class CatalogFunction( * Storage format, used to describe how a partition or a table is stored. */ case class CatalogStorageFormat( + // TODO(ekl) consider storing this field as java.net.URI for type safety. Note that this must + // be converted to/from a hadoop Path object using new Path(new URI(locationUri)) and + // path.toUri respectively before use as a filesystem path due to URI char escaping. locationUri: Option[String], inputFormat: Option[String], outputFormat: Option[String], diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index fd9dc320638723db4662e0a032d97d8c6fc92e53..1a9943bc31058cbf9e3a9ff91f651dcaa553bc31 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import java.io.IOException +import java.net.URI import java.util import scala.util.control.NonFatal @@ -833,10 +834,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // However, Hive metastore is not case preserving and will generate wrong partition location // with lower cased partition column names. Here we set the default partition location // manually to avoid this problem. - val partitionPath = p.storage.locationUri.map(new Path(_)).getOrElse { + val partitionPath = p.storage.locationUri.map(uri => new Path(new URI(uri))).getOrElse { ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath) } - p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString))) + p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri.toString))) } val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) client.createPartitions(db, table, lowerCasedParts, ignoreIfExists) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 3d9642dd1463d4e4f1902bae6db4f7ee491c1643..e561706facf0333bf03f05e7adb9062f0061a340 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -268,7 +268,8 @@ private[client] class Shim_v0_12 extends Shim with Logging { ignoreIfExists: Boolean): Unit = { val table = hive.getTable(database, tableName) parts.foreach { s => - val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull + val location = s.storage.locationUri.map( + uri => new Path(table.getPath, new Path(new URI(uri)))).orNull val params = if (s.parameters.nonEmpty) s.parameters.asJava else null val spec = s.spec.asJava if (hive.getPartition(table, spec, false) != null && ignoreIfExists) { @@ -463,7 +464,8 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { ignoreIfExists: Boolean): Unit = { val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists) parts.zipWithIndex.foreach { case (s, i) => - addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull) + addPartitionDesc.addPartition( + s.spec.asJava, s.storage.locationUri.map(u => new Path(new URI(u)).toString).orNull) if (s.parameters.nonEmpty) { addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index cace5fa95cad0d8f6e35ec34e75ddaa4a69113fe..e8e4238d1c5a40a1c5722e88263c7598064a245f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -205,6 +205,60 @@ class PartitionProviderCompatibilitySuite } } } + + test(s"SPARK-18635 special chars in partition values - partition management $enabled") { + withTable("test") { + spark.range(10) + .selectExpr("id", "id as A", "'%' as B") + .write.partitionBy("A", "B").mode("overwrite") + .saveAsTable("test") + assert(spark.sql("select * from test").count() == 10) + assert(spark.sql("select * from test where B = '%'").count() == 10) + assert(spark.sql("select * from test where B = '$'").count() == 0) + spark.range(10) + .selectExpr("id", "id as A", "'=' as B") + .write.mode("append").insertInto("test") + spark.sql("insert into test partition (A, B) select id, id, '%=' from range(10)") + assert(spark.sql("select * from test").count() == 30) + assert(spark.sql("select * from test where B = '%'").count() == 10) + assert(spark.sql("select * from test where B = '='").count() == 10) + assert(spark.sql("select * from test where B = '%='").count() == 10) + + // show partitions sanity check + val parts = spark.sql("show partitions test").collect().map(_.get(0)).toSeq + assert(parts.length == 30) + assert(parts.contains("A=0/B=%25")) + assert(parts.contains("A=0/B=%3D")) + assert(parts.contains("A=0/B=%25%3D")) + + // drop partition sanity check + spark.sql("alter table test drop partition (A=1, B='%')") + assert(spark.sql("select * from test").count() == 29) // 1 file in dropped partition + + withTempDir { dir => + // custom locations sanity check + spark.sql(s""" + |alter table test partition (A=0, B='%') + |set location '${dir.getAbsolutePath}'""".stripMargin) + assert(spark.sql("select * from test").count() == 28) // moved to empty dir + + // rename partition sanity check + spark.sql(s""" + |alter table test partition (A=5, B='%') + |rename to partition (A=100, B='%')""".stripMargin) + assert(spark.sql("select * from test where a = 5 and b = '%'").count() == 0) + assert(spark.sql("select * from test where a = 100 and b = '%'").count() == 1) + + // try with A=0 which has a custom location + spark.sql("insert into test partition (A=0, B='%') select 1") + spark.sql(s""" + |alter table test partition (A=0, B='%') + |rename to partition (A=101, B='%')""".stripMargin) + assert(spark.sql("select * from test where a = 0 and b = '%'").count() == 0) + assert(spark.sql("select * from test where a = 101 and b = '%'").count() == 1) + } + } + } } /**