Skip to content
Snippets Groups Projects
Commit 718b6bad authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-17274][SQL] Move join optimizer rules into a separate file

## What changes were proposed in this pull request?
As part of breaking Optimizer.scala apart, this patch moves various join rules into a single file.

## How was this patch tested?
This should be covered by existing tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #14846 from rxin/SPARK-17274.
parent 5aad4509
No related branches found
No related tags found
No related merge requests found
......@@ -799,112 +799,6 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
}
}
/**
* Reorder the joins and push all the conditions into join, so that the bottom ones have at least
* one condition.
*
* The order of joins will not be changed if all of them already have at least one condition.
*/
object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
/**
* Join a list of plans together and push down the conditions into them.
*
* The joined plan are picked from left to right, prefer those has at least one join condition.
*
* @param input a list of LogicalPlans to join.
* @param conditions a list of condition for join.
*/
@tailrec
def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = {
assert(input.size >= 2)
if (input.size == 2) {
val (joinConditions, others) = conditions.partition(
e => !SubqueryExpression.hasCorrelatedSubquery(e))
val join = Join(input(0), input(1), Inner, joinConditions.reduceLeftOption(And))
if (others.nonEmpty) {
Filter(others.reduceLeft(And), join)
} else {
join
}
} else {
val left :: rest = input.toList
// find out the first join that have at least one join condition
val conditionalJoin = rest.find { plan =>
val refs = left.outputSet ++ plan.outputSet
conditions.filterNot(canEvaluate(_, left)).filterNot(canEvaluate(_, plan))
.exists(_.references.subsetOf(refs))
}
// pick the next one if no condition left
val right = conditionalJoin.getOrElse(rest.head)
val joinedRefs = left.outputSet ++ right.outputSet
val (joinConditions, others) = conditions.partition(
e => e.references.subsetOf(joinedRefs) && !SubqueryExpression.hasCorrelatedSubquery(e))
val joined = Join(left, right, Inner, joinConditions.reduceLeftOption(And))
// should not have reference to same logical plan
createOrderedJoin(Seq(joined) ++ rest.filterNot(_ eq right), others)
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case j @ ExtractFiltersAndInnerJoins(input, conditions)
if input.size > 2 && conditions.nonEmpty =>
createOrderedJoin(input, conditions)
}
}
/**
* Elimination of outer joins, if the predicates can restrict the result sets so that
* all null-supplying rows are eliminated
*
* - full outer -> inner if both sides have such predicates
* - left outer -> inner if the right side has such predicates
* - right outer -> inner if the left side has such predicates
* - full outer -> left outer if only the left side has such predicates
* - full outer -> right outer if only the right side has such predicates
*
* This rule should be executed before pushing down the Filter
*/
object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
/**
* Returns whether the expression returns null or false when all inputs are nulls.
*/
private def canFilterOutNull(e: Expression): Boolean = {
if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false
val attributes = e.references.toSeq
val emptyRow = new GenericInternalRow(attributes.length)
val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
v == null || v == false
}
private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
join.joinType match {
case RightOuter if leftHasNonNullPredicate => Inner
case LeftOuter if rightHasNonNullPredicate => Inner
case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
case FullOuter if leftHasNonNullPredicate => LeftOuter
case FullOuter if rightHasNonNullPredicate => RightOuter
case o => o
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
val newJoinType = buildNewJoinType(f, j)
if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
}
}
/**
* Pushes down [[Filter]] operators where the `condition` can be
* evaluated using only the attributes of the left or right side of a join. Other
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.optimizer
import scala.annotation.tailrec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
/**
* Reorder the joins and push all the conditions into join, so that the bottom ones have at least
* one condition.
*
* The order of joins will not be changed if all of them already have at least one condition.
*/
object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
/**
* Join a list of plans together and push down the conditions into them.
*
* The joined plan are picked from left to right, prefer those has at least one join condition.
*
* @param input a list of LogicalPlans to join.
* @param conditions a list of condition for join.
*/
@tailrec
def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = {
assert(input.size >= 2)
if (input.size == 2) {
val (joinConditions, others) = conditions.partition(
e => !SubqueryExpression.hasCorrelatedSubquery(e))
val join = Join(input(0), input(1), Inner, joinConditions.reduceLeftOption(And))
if (others.nonEmpty) {
Filter(others.reduceLeft(And), join)
} else {
join
}
} else {
val left :: rest = input.toList
// find out the first join that have at least one join condition
val conditionalJoin = rest.find { plan =>
val refs = left.outputSet ++ plan.outputSet
conditions.filterNot(canEvaluate(_, left)).filterNot(canEvaluate(_, plan))
.exists(_.references.subsetOf(refs))
}
// pick the next one if no condition left
val right = conditionalJoin.getOrElse(rest.head)
val joinedRefs = left.outputSet ++ right.outputSet
val (joinConditions, others) = conditions.partition(
e => e.references.subsetOf(joinedRefs) && !SubqueryExpression.hasCorrelatedSubquery(e))
val joined = Join(left, right, Inner, joinConditions.reduceLeftOption(And))
// should not have reference to same logical plan
createOrderedJoin(Seq(joined) ++ rest.filterNot(_ eq right), others)
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case j @ ExtractFiltersAndInnerJoins(input, conditions)
if input.size > 2 && conditions.nonEmpty =>
createOrderedJoin(input, conditions)
}
}
/**
* Elimination of outer joins, if the predicates can restrict the result sets so that
* all null-supplying rows are eliminated
*
* - full outer -> inner if both sides have such predicates
* - left outer -> inner if the right side has such predicates
* - right outer -> inner if the left side has such predicates
* - full outer -> left outer if only the left side has such predicates
* - full outer -> right outer if only the right side has such predicates
*
* This rule should be executed before pushing down the Filter
*/
object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
/**
* Returns whether the expression returns null or false when all inputs are nulls.
*/
private def canFilterOutNull(e: Expression): Boolean = {
if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false
val attributes = e.references.toSeq
val emptyRow = new GenericInternalRow(attributes.length)
val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
v == null || v == false
}
private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
join.joinType match {
case RightOuter if leftHasNonNullPredicate => Inner
case LeftOuter if rightHasNonNullPredicate => Inner
case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
case FullOuter if leftHasNonNullPredicate => LeftOuter
case FullOuter if rightHasNonNullPredicate => RightOuter
case o => o
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
val newJoinType = buildNewJoinType(f, j)
if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment