Skip to content
Snippets Groups Projects
Commit 957a8ab3 authored by Eric Liang's avatar Eric Liang Committed by Reynold Xin
Browse files

[SPARK-16818] Exchange reuse incorrectly reuses scans over different sets of partitions

## What changes were proposed in this pull request?

This fixes a bug wherethe file scan operator does not take into account partition pruning in its implementation of `sameResult()`. As a result, executions may be incorrect on self-joins over the same base file relation.

The patch here is minimal, but we should reconsider relying on `metadata` for implementing sameResult() in the future, as string representations may not be uniquely identifying.

cc rxin

## How was this patch tested?

Unit tests.

Author: Eric Liang <ekl@databricks.com>

Closes #14425 from ericl/spark-16818.
parent a6290e51
No related branches found
No related tags found
No related merge requests found
......@@ -130,7 +130,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
createNonBucketedReadRDD(readFile, selectedPartitions, fsRelation)
}
// These metadata values make scan plans uniquely identifiable for equality checking.
val meta = Map(
"PartitionFilters" -> partitionKeyFilters.mkString("[", ", ", "]"),
"Format" -> fsRelation.fileFormat.toString,
"ReadSchema" -> prunedDataSchema.simpleString,
PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"),
......
......@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
import org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
......@@ -408,6 +408,39 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
}
}
test("[SPARK-16818] partition pruned file scans implement sameResult correctly") {
withTempPath { path =>
val tempDir = path.getCanonicalPath
spark.range(100)
.selectExpr("id", "id as b")
.write
.partitionBy("id")
.parquet(tempDir)
val df = spark.read.parquet(tempDir)
def getPlan(df: DataFrame): SparkPlan = {
df.queryExecution.executedPlan
}
assert(getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 2"))))
assert(!getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 3"))))
}
}
test("[SPARK-16818] exchange reuse respects differences in partition pruning") {
spark.conf.set("spark.sql.exchange.reuse", true)
withTempPath { path =>
val tempDir = path.getCanonicalPath
spark.range(10)
.selectExpr("id % 2 as a", "id % 3 as b", "id as c")
.write
.partitionBy("a")
.parquet(tempDir)
val df = spark.read.parquet(tempDir)
val df1 = df.where("a = 0").groupBy("b").agg("c" -> "sum")
val df2 = df.where("a = 1").groupBy("b").agg("c" -> "sum")
checkAnswer(df1.join(df2, "b"), Row(0, 6, 12) :: Row(1, 4, 8) :: Row(2, 10, 5) :: Nil)
}
}
// Helpers for checking the arguments passed to the FileFormat.
protected val checkPartitionSchema =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment