Skip to content
Snippets Groups Projects
Commit 62b7f306 authored by Davies Liu's avatar Davies Liu Committed by Davies Liu
Browse files

[SPARK-14607] [SPARK-14484] [SQL] fix case-insensitive predicates in FileSourceStrategy

## What changes were proposed in this pull request?

When prune the partitions or push down predicates, case-sensitivity is not respected. In order to make it work with case-insensitive, this PR update the AttributeReference inside predicate to use the name from schema.

## How was this patch tested?

Add regression tests for case-insensitive.

Author: Davies Liu <davies@databricks.com>

Closes #12371 from davies/case_insensi.
parent fc3cd2f5
No related branches found
No related tags found
No related merge requests found
......@@ -64,18 +64,28 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
// - filters that need to be evaluated again after the scan
val filterSet = ExpressionSet(filters)
// The attribute name of predicate could be different than the one in schema in case of
// case insensitive, we should change them to match the one in schema, so we donot need to
// worry about case sensitivity anymore.
val normalizedFilters = filters.map { e =>
e transform {
case a: AttributeReference =>
a.withName(l.output.find(_.semanticEquals(a)).get.name)
}
}
val partitionColumns =
l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
ExpressionSet(filters.filter(_.references.subsetOf(partitionSet)))
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
val dataColumns =
l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver)
// Partition keys are not available in the statistics of the files.
val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty)
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
// Predicates with both partition keys and attributes need to be evaluated after the scan.
val afterScanFilters = filterSet -- partitionKeyFilters
......
......@@ -593,10 +593,7 @@ class HDFSFileCatalog(
}
if (partitionPruningPredicates.nonEmpty) {
val predicate =
partitionPruningPredicates
.reduceOption(expressions.And)
.getOrElse(Literal(true))
val predicate = partitionPruningPredicates.reduce(expressions.And)
val boundPredicate = InterpretedPredicate.create(predicate.transform {
case a: AttributeReference =>
......
......@@ -196,6 +196,34 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
checkDataFilters(Set(IsNotNull("c1"), EqualTo("c1", 1)))
}
test("partitioned table - case insensitive") {
withSQLConf("spark.sql.caseSensitive" -> "false") {
val table =
createTable(
files = Seq(
"p1=1/file1" -> 10,
"p1=2/file2" -> 10))
// Only one file should be read.
checkScan(table.where("P1 = 1")) { partitions =>
assert(partitions.size == 1, "when checking partitions")
assert(partitions.head.files.size == 1, "when files in partition 1")
}
// We don't need to reevaluate filters that are only on partitions.
checkDataFilters(Set.empty)
// Only one file should be read.
checkScan(table.where("P1 = 1 AND C1 = 1 AND (P1 + C1) = 1")) { partitions =>
assert(partitions.size == 1, "when checking partitions")
assert(partitions.head.files.size == 1, "when checking files in partition 1")
assert(partitions.head.files.head.partitionValues.getInt(0) == 1,
"when checking partition values")
}
// Only the filters that do not contain the partition column should be pushed down
checkDataFilters(Set(IsNotNull("c1"), EqualTo("c1", 1)))
}
}
test("partitioned table - after scan filters") {
val table =
createTable(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment