Skip to content
Snippets Groups Projects
Commit a0ce845d authored by Wenchen Fan's avatar Wenchen Fan
Browse files

[SPARK-19887][SQL] dynamic partition keys can be null or empty string

When dynamic partition value is null or empty string, we should write the data to a directory like `a=__HIVE_DEFAULT_PARTITION__`, when we read the data back, we should respect this special directory name and treat it as null.

This is the same behavior of impala, see https://issues.apache.org/jira/browse/IMPALA-252



new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17277 from cloud-fan/partition.

(cherry picked from commit dacc382f)
Signed-off-by: default avatarWenchen Fan <wenchen@databricks.com>
parent 45457825
No related branches found
No related tags found
No related merge requests found
...@@ -108,18 +108,21 @@ object ExternalCatalogUtils { ...@@ -108,18 +108,21 @@ object ExternalCatalogUtils {
partitionColumnNames: Seq[String], partitionColumnNames: Seq[String],
tablePath: Path): Path = { tablePath: Path): Path = {
val partitionPathStrings = partitionColumnNames.map { col => val partitionPathStrings = partitionColumnNames.map { col =>
val partitionValue = spec(col) getPartitionPathString(col, spec(col))
val partitionString = if (partitionValue == null) {
DEFAULT_PARTITION_NAME
} else {
escapePathName(partitionValue)
}
escapePathName(col) + "=" + partitionString
} }
partitionPathStrings.foldLeft(tablePath) { (totalPath, nextPartPath) => partitionPathStrings.foldLeft(tablePath) { (totalPath, nextPartPath) =>
new Path(totalPath, nextPartPath) new Path(totalPath, nextPartPath)
} }
} }
def getPartitionPathString(col: String, value: String): String = {
val partitionString = if (value == null || value.isEmpty) {
DEFAULT_PARTITION_NAME
} else {
escapePathName(value)
}
escapePathName(col) + "=" + partitionString
}
} }
object CatalogUtils { object CatalogUtils {
......
...@@ -111,7 +111,12 @@ case class CatalogTablePartition( ...@@ -111,7 +111,12 @@ case class CatalogTablePartition(
*/ */
def toRow(partitionSchema: StructType): InternalRow = { def toRow(partitionSchema: StructType): InternalRow = {
InternalRow.fromSeq(partitionSchema.map { field => InternalRow.fromSeq(partitionSchema.map { field =>
Cast(Literal(spec(field.name)), field.dataType).eval() val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
null
} else {
spec(field.name)
}
Cast(Literal(partValue), field.dataType).eval()
}) })
} }
} }
...@@ -158,7 +163,7 @@ case class BucketSpec( ...@@ -158,7 +163,7 @@ case class BucketSpec(
* @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the * @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
* catalog. If false, it is inferred automatically based on file * catalog. If false, it is inferred automatically based on file
* structure. * structure.
* @param schemaPresevesCase Whether or not the schema resolved for this table is case-sensitive. * @param schemaPreservesCase Whether or not the schema resolved for this table is case-sensitive.
* When using a Hive Metastore, this flag is set to false if a case- * When using a Hive Metastore, this flag is set to false if a case-
* sensitive schema was unable to be read from the table properties. * sensitive schema was unable to be read from the table properties.
* Used to trigger case-sensitive schema inference at query time, when * Used to trigger case-sensitive schema inference at query time, when
......
...@@ -319,7 +319,7 @@ case class FileSourceScanExec( ...@@ -319,7 +319,7 @@ case class FileSourceScanExec(
val input = ctx.freshName("input") val input = ctx.freshName("input")
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
val exprRows = output.zipWithIndex.map{ case (a, i) => val exprRows = output.zipWithIndex.map{ case (a, i) =>
new BoundReference(i, a.dataType, a.nullable) BoundReference(i, a.dataType, a.nullable)
} }
val row = ctx.freshName("row") val row = ctx.freshName("row")
ctx.INPUT_ROW = row ctx.INPUT_ROW = row
......
...@@ -285,14 +285,11 @@ object FileFormatWriter extends Logging { ...@@ -285,14 +285,11 @@ object FileFormatWriter extends Logging {
/** Expressions that given a partition key build a string like: col1=val/col2=val/... */ /** Expressions that given a partition key build a string like: col1=val/col2=val/... */
private def partitionStringExpression: Seq[Expression] = { private def partitionStringExpression: Seq[Expression] = {
description.partitionColumns.zipWithIndex.flatMap { case (c, i) => description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
val escaped = ScalaUDF( val partitionName = ScalaUDF(
ExternalCatalogUtils.escapePathName _, ExternalCatalogUtils.getPartitionPathString _,
StringType, StringType,
Seq(Cast(c, StringType)), Seq(Literal(c.name), Cast(c, StringType)))
Seq(StringType)) if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR), partitionName)
val str = If(IsNull(c), Literal(ExternalCatalogUtils.DEFAULT_PARTITION_NAME), escaped)
val partitionName = Literal(c.name + "=") :: str :: Nil
if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
} }
} }
......
...@@ -118,7 +118,7 @@ object PartitioningUtils { ...@@ -118,7 +118,7 @@ object PartitioningUtils {
// "hdfs://host:9000/invalidPath" // "hdfs://host:9000/invalidPath"
// "hdfs://host:9000/path" // "hdfs://host:9000/path"
// TODO: Selective case sensitivity. // TODO: Selective case sensitivity.
val discoveredBasePaths = optDiscoveredBasePaths.flatMap(x => x).map(_.toString.toLowerCase()) val discoveredBasePaths = optDiscoveredBasePaths.flatten.map(_.toString.toLowerCase())
assert( assert(
discoveredBasePaths.distinct.size == 1, discoveredBasePaths.distinct.size == 1,
"Conflicting directory structures detected. Suspicious paths:\b" + "Conflicting directory structures detected. Suspicious paths:\b" +
......
...@@ -968,8 +968,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat ...@@ -968,8 +968,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName) val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName)
val clientPartitionNames = val clientPartitionNames =
client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec)) client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec))
clientPartitionNames.map { partName => clientPartitionNames.map { partitionPath =>
val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partName) val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partitionPath)
partSpec.map { case (partName, partValue) => partSpec.map { case (partName, partValue) =>
partColNameMap(partName.toLowerCase) + "=" + escapePathName(partValue) partColNameMap(partName.toLowerCase) + "=" + escapePathName(partValue)
}.mkString("/") }.mkString("/")
......
...@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive ...@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
import java.io.File import java.io.File
import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.test.SQLTestUtils
...@@ -28,6 +28,7 @@ import org.apache.spark.util.Utils ...@@ -28,6 +28,7 @@ import org.apache.spark.util.Utils
class PartitionProviderCompatibilitySuite class PartitionProviderCompatibilitySuite
extends QueryTest with TestHiveSingleton with SQLTestUtils { extends QueryTest with TestHiveSingleton with SQLTestUtils {
import testImplicits._
private def setupPartitionedDatasourceTable(tableName: String, dir: File): Unit = { private def setupPartitionedDatasourceTable(tableName: String, dir: File): Unit = {
spark.range(5).selectExpr("id as fieldOne", "id as partCol").write spark.range(5).selectExpr("id as fieldOne", "id as partCol").write
...@@ -294,6 +295,28 @@ class PartitionProviderCompatibilitySuite ...@@ -294,6 +295,28 @@ class PartitionProviderCompatibilitySuite
} }
} }
} }
test(s"SPARK-19887 partition value is null - partition management $enabled") {
withTable("test") {
Seq((1, "p", 1), (2, null, 2)).toDF("a", "b", "c")
.write.partitionBy("b", "c").saveAsTable("test")
checkAnswer(spark.table("test"),
Row(1, "p", 1) :: Row(2, null, 2) :: Nil)
Seq((3, null: String, 3)).toDF("a", "b", "c")
.write.mode("append").partitionBy("b", "c").saveAsTable("test")
checkAnswer(spark.table("test"),
Row(1, "p", 1) :: Row(2, null, 2) :: Row(3, null, 3) :: Nil)
// make sure partition pruning also works.
checkAnswer(spark.table("test").filter($"b".isNotNull), Row(1, "p", 1))
// empty string is an invalid partition value and we treat it as null when read back.
Seq((4, "", 4)).toDF("a", "b", "c")
.write.mode("append").partitionBy("b", "c").saveAsTable("test")
checkAnswer(spark.table("test"),
Row(1, "p", 1) :: Row(2, null, 2) :: Row(3, null, 3) :: Row(4, null, 4) :: Nil)
}
}
} }
/** /**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment