Skip to content
Snippets Groups Projects
Commit 540e9128 authored by Sameer Agarwal's avatar Sameer Agarwal Committed by Yin Huai
Browse files

[SPARK-17244] Catalyst should not pushdown non-deterministic join conditions

## What changes were proposed in this pull request?

Given that non-deterministic expressions can be stateful, pushing them down the query plan during the optimization phase can cause incorrect behavior. This patch fixes that issue by explicitly disabling that.

## How was this patch tested?

A new test in `FilterPushdownSuite` that checks catalyst behavior for both deterministic and non-deterministic join conditions.

Author: Sameer Agarwal <sameerag@cs.berkeley.edu>

Closes #14815 from sameeragarwal/constraint-inputfile.
parent f64a1ddd
No related branches found
No related tags found
No related merge requests found
......@@ -1379,18 +1379,25 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
*/
object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
/**
* Splits join condition expressions into three categories based on the attributes required
* to evaluate them.
* Splits join condition expressions or filter predicates (on a given join's output) into three
* categories based on the attributes required to evaluate them. Note that we explicitly exclude
* on-deterministic (i.e., stateful) condition expressions in canEvaluateInLeft or
* canEvaluateInRight to prevent pushing these predicates on either side of the join.
*
* @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
*/
private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
// Note: In order to ensure correctness, it's important to not change the relative ordering of
// any deterministic expression that follows a non-deterministic expression. To achieve this,
// we only consider pushing down those expressions that precede the first non-deterministic
// expression in the condition.
val (pushDownCandidates, containingNonDeterministic) = condition.span(_.deterministic)
val (leftEvaluateCondition, rest) =
condition.partition(_.references subsetOf left.outputSet)
pushDownCandidates.partition(_.references.subsetOf(left.outputSet))
val (rightEvaluateCondition, commonCondition) =
rest.partition(_.references subsetOf right.outputSet)
rest.partition(expr => expr.references.subsetOf(right.outputSet))
(leftEvaluateCondition, rightEvaluateCondition, commonCondition)
(leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ containingNonDeterministic)
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
......@@ -1441,7 +1448,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
}
// push down the join filter into sub query scanning if applicable
case f @ Join(left, right, joinType, joinCondition) =>
case j @ Join(left, right, joinType, joinCondition) =>
val (leftJoinConditions, rightJoinConditions, commonJoinCondition) =
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
......@@ -1471,7 +1478,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
Join(newLeft, newRight, LeftOuter, newJoinCond)
case FullOuter => f
case FullOuter => j
case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node")
case UsingJoin(_, _) => sys.error("Untransformed Using join node")
}
......
......@@ -987,4 +987,18 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
}
test("join condition pushdown: deterministic and non-deterministic") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
// Verify that all conditions preceding the first non-deterministic condition are pushed down
// by the optimizer and others are not.
val originalQuery = x.join(y, condition = Some("x.a".attr === 5 && "y.a".attr === 5 &&
"x.a".attr === Rand(10) && "y.b".attr === 5))
val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5),
condition = Some("x.a".attr === Rand(10) && "y.b".attr === 5))
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment