diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 32aa4713ebdbb75be2eebe663a020fa68fd1dc87..67491302a984854e4e4fa9f03729153cf98cb18a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -130,7 +130,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { createNonBucketedReadRDD(readFile, selectedPartitions, fsRelation) } + // These metadata values make scan plans uniquely identifiable for equality checking. val meta = Map( + "PartitionFilters" -> partitionKeyFilters.mkString("[", ", ", "]"), "Format" -> fsRelation.fileFormat.toString, "ReadSchema" -> prunedDataSchema.simpleString, PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 2f551b1a017c9321c9e9ad3b399a049b567ef956..18246500f7acf4b119455f71821368139fecc666 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper} import org.apache.spark.sql.catalyst.util -import org.apache.spark.sql.execution.DataSourceScanExec +import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -408,6 +408,39 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } } + test("[SPARK-16818] partition pruned file scans implement sameResult correctly") { + withTempPath { path => + val tempDir = path.getCanonicalPath + spark.range(100) + .selectExpr("id", "id as b") + .write + .partitionBy("id") + .parquet(tempDir) + val df = spark.read.parquet(tempDir) + def getPlan(df: DataFrame): SparkPlan = { + df.queryExecution.executedPlan + } + assert(getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 2")))) + assert(!getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 3")))) + } + } + + test("[SPARK-16818] exchange reuse respects differences in partition pruning") { + spark.conf.set("spark.sql.exchange.reuse", true) + withTempPath { path => + val tempDir = path.getCanonicalPath + spark.range(10) + .selectExpr("id % 2 as a", "id % 3 as b", "id as c") + .write + .partitionBy("a") + .parquet(tempDir) + val df = spark.read.parquet(tempDir) + val df1 = df.where("a = 0").groupBy("b").agg("c" -> "sum") + val df2 = df.where("a = 1").groupBy("b").agg("c" -> "sum") + checkAnswer(df1.join(df2, "b"), Row(0, 6, 12) :: Row(1, 4, 8) :: Row(2, 10, 5) :: Nil) + } + } + // Helpers for checking the arguments passed to the FileFormat. protected val checkPartitionSchema =