Skip to content
Snippets Groups Projects
Commit c8da5356 authored by wangzhenhua's avatar wangzhenhua Committed by Wenchen Fan
Browse files

[SPARK-20718][SQL] FileSourceScanExec with different filter orders should be...

[SPARK-20718][SQL] FileSourceScanExec with different filter orders should be the same after canonicalization

## What changes were proposed in this pull request?

Since `constraints` in `QueryPlan` is a set, the order of filters can differ. Usually this is ok because of canonicalization. However, in `FileSourceScanExec`, its data filters and partition filters are sequences, and their orders are not canonicalized. So `def sameResult` returns different results for different orders of data/partition filters. This leads to, e.g. different decision for `ReuseExchange`, and thus results in unstable performance.

## How was this patch tested?

Added a new test for `FileSourceScanExec.sameResult`.

Author: wangzhenhua <wangzhenhua@huawei.com>

Closes #17959 from wzhfy/canonicalizeFileSourceScanExec.
parent 2b36eb69
No related branches found
No related tags found
No related merge requests found
......@@ -38,7 +38,7 @@ import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
trait DataSourceScanExec extends LeafExecNode with CodegenSupport with PredicateHelper {
val relation: BaseRelation
val metastoreTableIdentifier: Option[TableIdentifier]
......@@ -519,8 +519,18 @@ case class FileSourceScanExec(
relation,
output.map(QueryPlan.normalizeExprId(_, output)),
requiredSchema,
partitionFilters.map(QueryPlan.normalizeExprId(_, output)),
dataFilters.map(QueryPlan.normalizeExprId(_, output)),
canonicalizeFilters(partitionFilters, output),
canonicalizeFilters(dataFilters, output),
None)
}
private def canonicalizeFilters(filters: Seq[Expression], output: Seq[Attribute])
: Seq[Expression] = {
if (filters.nonEmpty) {
val normalizedFilters = QueryPlan.normalizeExprId(filters.reduce(And), output)
splitConjunctivePredicates(normalizedFilters)
} else {
Nil
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution
import org.apache.spark.sql.{DataFrame, QueryTest}
import org.apache.spark.sql.test.SharedSQLContext
/**
* Tests for the sameResult function for [[SparkPlan]]s.
*/
class SameResultSuite extends QueryTest with SharedSQLContext {
test("FileSourceScanExec: different orders of data filters and partition filters") {
withTempPath { path =>
val tmpDir = path.getCanonicalPath
spark.range(10)
.selectExpr("id as a", "id + 1 as b", "id + 2 as c", "id + 3 as d")
.write
.partitionBy("a", "b")
.parquet(tmpDir)
val df = spark.read.parquet(tmpDir)
// partition filters: a > 1 AND b < 9
// data filters: c > 1 AND d < 9
val plan1 = getFileSourceScanExec(df.where("a > 1 AND b < 9 AND c > 1 AND d < 9"))
val plan2 = getFileSourceScanExec(df.where("b < 9 AND a > 1 AND d < 9 AND c > 1"))
assert(plan1.sameResult(plan2))
}
}
private def getFileSourceScanExec(df: DataFrame): FileSourceScanExec = {
df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
.asInstanceOf[FileSourceScanExec]
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment