Skip to content
Snippets Groups Projects
Commit cdcccd7b authored by KaiXinXiaoLei's avatar KaiXinXiaoLei Committed by Wenchen Fan
Browse files

[SPARK-23405] Generate additional constraints for Join's children

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)
I run a sql: `select ls.cs_order_number from ls left semi join catalog_sales cs on ls.cs_order_number = cs.cs_order_number`, The `ls` table is a small table ,and the number is one. The `catalog_sales` table is a big table,  and the number is 10 billion. The task will be hang up. And i find the many null values of `cs_order_number` in the `catalog_sales` table. I think the null value should be removed in the logical plan.

>== Optimized Logical Plan ==
>Join LeftSemi, (cs_order_number#1 = cs_order_number#22)
>:- Project cs_order_number#1
>   : +- Filter isnotnull(cs_order_number#1)
>      : +- MetastoreRelation 100t, ls
>+- Project cs_order_number#22
>   +- MetastoreRelation 100t, catalog_sales

Now, use this patch, the plan will be:
>== Optimized Logical Plan ==
>Join LeftSemi, (cs_order_number#1 = cs_order_number#22)
>:- Project cs_order_number#1
>   : +- Filter isnotnull(cs_order_number#1)
>      : +- MetastoreRelation 100t, ls
>+- Project cs_order_number#22
>   : **+- Filter isnotnull(cs_order_number#22)**
>     :+- MetastoreRelation 100t, catalog_sales

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: KaiXinXiaoLei <584620569@qq.com>
Author: hanghang <584620569@qq.com>

Closes #20670 from KaiXinXiaoLei/Spark-23405.
parent ff148018
Branches master
No related tags found
No related merge requests found
...@@ -661,7 +661,7 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe ...@@ -661,7 +661,7 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
case join @ Join(left, right, joinType, conditionOpt) => case join @ Join(left, right, joinType, conditionOpt) =>
// Only consider constraints that can be pushed down completely to either the left or the // Only consider constraints that can be pushed down completely to either the left or the
// right child // right child
val constraints = join.constraints.filter { c => val constraints = join.allConstraints.filter { c =>
c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet) c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)
} }
// Remove those constraints that are already enforced by either the left or the right child // Remove those constraints that are already enforced by either the left or the right child
......
...@@ -23,25 +23,28 @@ import org.apache.spark.sql.catalyst.expressions._ ...@@ -23,25 +23,28 @@ import org.apache.spark.sql.catalyst.expressions._
trait QueryPlanConstraints { self: LogicalPlan => trait QueryPlanConstraints { self: LogicalPlan =>
/** /**
* An [[ExpressionSet]] that contains invariants about the rows output by this operator. For * An [[ExpressionSet]] that contains an additional set of constraints, such as equality
* example, if this set contains the expression `a = 2` then that expression is guaranteed to * constraints and `isNotNull` constraints, etc.
* evaluate to `true` for all rows produced.
*/ */
lazy val constraints: ExpressionSet = { lazy val allConstraints: ExpressionSet = {
if (conf.constraintPropagationEnabled) { if (conf.constraintPropagationEnabled) {
ExpressionSet( ExpressionSet(validConstraints
validConstraints .union(inferAdditionalConstraints(validConstraints))
.union(inferAdditionalConstraints(validConstraints)) .union(constructIsNotNullConstraints(validConstraints)))
.union(constructIsNotNullConstraints(validConstraints))
.filter { c =>
c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
}
)
} else { } else {
ExpressionSet(Set.empty) ExpressionSet(Set.empty)
} }
} }
/**
* An [[ExpressionSet]] that contains invariants about the rows output by this operator. For
* example, if this set contains the expression `a = 2` then that expression is guaranteed to
* evaluate to `true` for all rows produced.
*/
lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter { c =>
c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
})
/** /**
* This method can be overridden by any child class of QueryPlan to specify a set of constraints * This method can be overridden by any child class of QueryPlan to specify a set of constraints
* based on the given operator's constraint propagation logic. These constraints are then * based on the given operator's constraint propagation logic. These constraints are then
......
...@@ -192,4 +192,16 @@ class InferFiltersFromConstraintsSuite extends PlanTest { ...@@ -192,4 +192,16 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
comparePlans(Optimize.execute(original.analyze), correct.analyze) comparePlans(Optimize.execute(original.analyze), correct.analyze)
} }
test("SPARK-23405: left-semi equal-join should filter out null join keys on both sides") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val condition = Some("x.a".attr === "y.a".attr)
val originalQuery = x.join(y, LeftSemi, condition).analyze
val left = x.where(IsNotNull('a))
val right = y.where(IsNotNull('a))
val correctAnswer = left.join(right, LeftSemi, condition).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment