Skip to content
Snippets Groups Projects
Commit 9cde7d5f authored by Davies Liu's avatar Davies Liu Committed by Davies Liu
Browse files

[SPARK-12032] [SQL] Re-order inner joins to do join with conditions first

Currently, the order of joins is exactly the same as SQL query, some conditions may not pushed down to the correct join, then those join will become cross product and is extremely slow.

This patch try to re-order the inner joins (which are common in SQL query), pick the joins that have self-contain conditions first, delay those that does not have conditions.

After this patch, the TPCDS query Q64/65 can run hundreds times faster.

cc marmbrus nongli

Author: Davies Liu <davies@databricks.com>

Closes #10073 from davies/reorder_joins.
parent 6fd9e70e
No related branches found
No related tags found
No related merge requests found
...@@ -18,14 +18,12 @@ ...@@ -18,14 +18,12 @@
package org.apache.spark.sql.catalyst.optimizer package org.apache.spark.sql.catalyst.optimizer
import scala.collection.immutable.HashSet import scala.collection.immutable.HashSet
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries} import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries}
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
import org.apache.spark.sql.catalyst.plans.FullOuter import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.LeftOuter
import org.apache.spark.sql.catalyst.plans.RightOuter
import org.apache.spark.sql.catalyst.plans.LeftSemi
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
...@@ -44,6 +42,7 @@ object DefaultOptimizer extends Optimizer { ...@@ -44,6 +42,7 @@ object DefaultOptimizer extends Optimizer {
// Operator push down // Operator push down
SetOperationPushDown, SetOperationPushDown,
SamplePushDown, SamplePushDown,
ReorderJoin,
PushPredicateThroughJoin, PushPredicateThroughJoin,
PushPredicateThroughProject, PushPredicateThroughProject,
PushPredicateThroughGenerate, PushPredicateThroughGenerate,
...@@ -711,6 +710,53 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel ...@@ -711,6 +710,53 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel
} }
} }
/**
* Reorder the joins and push all the conditions into join, so that the bottom ones have at least
* one condition.
*
* The order of joins will not be changed if all of them already have at least one condition.
*/
object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
/**
* Join a list of plans together and push down the conditions into them.
*
* The joined plan are picked from left to right, prefer those has at least one join condition.
*
* @param input a list of LogicalPlans to join.
* @param conditions a list of condition for join.
*/
def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = {
assert(input.size >= 2)
if (input.size == 2) {
Join(input(0), input(1), Inner, conditions.reduceLeftOption(And))
} else {
val left :: rest = input.toList
// find out the first join that have at least one join condition
val conditionalJoin = rest.find { plan =>
val refs = left.outputSet ++ plan.outputSet
conditions.filterNot(canEvaluate(_, left)).filterNot(canEvaluate(_, plan))
.exists(_.references.subsetOf(refs))
}
// pick the next one if no condition left
val right = conditionalJoin.getOrElse(rest.head)
val joinedRefs = left.outputSet ++ right.outputSet
val (joinConditions, others) = conditions.partition(_.references.subsetOf(joinedRefs))
val joined = Join(left, right, Inner, joinConditions.reduceLeftOption(And))
// should not have reference to same logical plan
createOrderedJoin(Seq(joined) ++ rest.filterNot(_ eq right), others)
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case j @ ExtractFiltersAndInnerJoins(input, conditions)
if input.size > 2 && conditions.nonEmpty =>
createOrderedJoin(input, conditions)
}
}
/** /**
* Pushes down [[Filter]] operators where the `condition` can be * Pushes down [[Filter]] operators where the `condition` can be
* evaluated using only the attributes of the left or right side of a join. Other * evaluated using only the attributes of the left or right side of a join. Other
......
...@@ -21,7 +21,6 @@ import org.apache.spark.Logging ...@@ -21,7 +21,6 @@ import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
/** /**
* A pattern that matches any number of project or filter operations on top of another relational * A pattern that matches any number of project or filter operations on top of another relational
...@@ -132,6 +131,45 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { ...@@ -132,6 +131,45 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
} }
} }
/**
* A pattern that collects the filter and inner joins.
*
* Filter
* |
* inner Join
* / \ ----> (Seq(plan0, plan1, plan2), conditions)
* Filter plan2
* |
* inner join
* / \
* plan0 plan1
*
* Note: This pattern currently only works for left-deep trees.
*/
object ExtractFiltersAndInnerJoins extends PredicateHelper {
// flatten all inner joins, which are next to each other
def flattenJoin(plan: LogicalPlan): (Seq[LogicalPlan], Seq[Expression]) = plan match {
case Join(left, right, Inner, cond) =>
val (plans, conditions) = flattenJoin(left)
(plans ++ Seq(right), conditions ++ cond.toSeq)
case Filter(filterCondition, j @ Join(left, right, Inner, joinCondition)) =>
val (plans, conditions) = flattenJoin(j)
(plans, conditions ++ splitConjunctivePredicates(filterCondition))
case _ => (Seq(plan), Seq())
}
def unapply(plan: LogicalPlan): Option[(Seq[LogicalPlan], Seq[Expression])] = plan match {
case f @ Filter(filterCondition, j @ Join(_, _, Inner, _)) =>
Some(flattenJoin(f))
case j @ Join(_, _, Inner, _) =>
Some(flattenJoin(j))
case _ => None
}
}
/** /**
* A pattern that collects all adjacent unions and returns their children as a Seq. * A pattern that collects all adjacent unions and returns their children as a Seq.
*/ */
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
class JoinOrderSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubQueries) ::
Batch("Filter Pushdown", Once,
CombineFilters,
PushPredicateThroughProject,
BooleanSimplification,
ReorderJoin,
PushPredicateThroughJoin,
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
ColumnPruning,
ProjectCollapsing) :: Nil
}
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
val testRelation1 = LocalRelation('d.int)
test("extract filters and joins") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)
val z = testRelation.subquery('z)
def testExtract(plan: LogicalPlan, expected: Option[(Seq[LogicalPlan], Seq[Expression])]) {
assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected)
}
testExtract(x, None)
testExtract(x.where("x.b".attr === 1), None)
testExtract(x.join(y), Some(Seq(x, y), Seq()))
testExtract(x.join(y, condition = Some("x.b".attr === "y.d".attr)),
Some(Seq(x, y), Seq("x.b".attr === "y.d".attr)))
testExtract(x.join(y).where("x.b".attr === "y.d".attr),
Some(Seq(x, y), Seq("x.b".attr === "y.d".attr)))
testExtract(x.join(y).join(z), Some(Seq(x, y, z), Seq()))
testExtract(x.join(y).where("x.b".attr === "y.d".attr).join(z),
Some(Seq(x, y, z), Seq("x.b".attr === "y.d".attr)))
testExtract(x.join(y).join(x.join(z)), Some(Seq(x, y, x.join(z)), Seq()))
testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr),
Some(Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr)))
}
test("reorder inner joins") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)
val z = testRelation.subquery('z)
val originalQuery = {
x.join(y).join(z)
.where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr))
}
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
x.join(z, condition = Some("x.b".attr === "z.b".attr))
.join(y, condition = Some("y.d".attr === "z.a".attr))
.analyze
comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment