From 86ff8b10270bbe2579cdb1dc2297a9f4e145973e Mon Sep 17 00:00:00 2001 From: Michael Armbrust <michael@databricks.com> Date: Thu, 24 Apr 2014 21:42:33 -0700 Subject: [PATCH] Generalize pattern for planning hash joins. This will be helpful for [SPARK-1495](https://issues.apache.org/jira/browse/SPARK-1495) and other cases where we want to have custom hash join implementations but don't want to repeat the logic for finding the join keys. Author: Michael Armbrust <michael@databricks.com> Closes #418 from marmbrus/hashFilter and squashes the following commits: d5cc79b [Michael Armbrust] Address @rxin 's comments. 366b6d9 [Michael Armbrust] style fixes 14560eb [Michael Armbrust] Generalize pattern for planning hash joins. f4809c1 [Michael Armbrust] Move common functions to PredicateHelper. --- .../sql/catalyst/expressions/predicates.scala | 29 ++++++++--- .../sql/catalyst/planning/patterns.scala | 52 +++++++++++++++++++ .../spark/sql/execution/SparkStrategies.scala | 49 +++-------------- 3 files changed, 82 insertions(+), 48 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index da5b2cf5b0..82c7af6844 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.trees -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.analysis.UnresolvedException -import org.apache.spark.sql.catalyst.types.{BooleanType, StringType, TimestampType} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees +import org.apache.spark.sql.catalyst.types.BooleanType + object InterpretedPredicate { def apply(expression: Expression): (Row => Boolean) = { @@ -37,10 +38,26 @@ trait Predicate extends Expression { } trait PredicateHelper { - def splitConjunctivePredicates(condition: Expression): Seq[Expression] = condition match { - case And(cond1, cond2) => splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2) - case other => other :: Nil + protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = { + condition match { + case And(cond1, cond2) => + splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2) + case other => other :: Nil + } } + + /** + * Returns true if `expr` can be evaluated using only the output of `plan`. This method + * can be used to determine when is is acceptable to move expression evaluation within a query + * plan. + * + * For example consider a join between two relations R(a, b) and S(c, d). + * + * `canEvaluate(Equals(a,b), R)` returns `true` where as `canEvaluate(Equals(a,c), R)` returns + * `false`. + */ + protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean = + expr.references.subsetOf(plan.outputSet) } abstract class BinaryPredicate extends BinaryExpression with Predicate { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 6dd816aa91..0e3a8a6bd3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -19,7 +19,10 @@ package org.apache.spark.sql.catalyst.planning import scala.annotation.tailrec +import org.apache.spark.sql.Logging + import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ /** @@ -101,6 +104,55 @@ object PhysicalOperation extends PredicateHelper { } } +/** + * A pattern that finds joins with equality conditions that can be evaluated using hashing + * techniques. For inner joins, any filters on top of the join operator are also matched. + */ +object HashFilteredJoin extends Logging with PredicateHelper { + /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */ + type ReturnType = + (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan) + + def unapply(plan: LogicalPlan): Option[ReturnType] = plan match { + // All predicates can be evaluated for inner join (i.e., those that are in the ON + // clause and WHERE clause.) + case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) => + logger.debug(s"Considering hash inner join on: ${predicates ++ condition}") + splitPredicates(predicates ++ condition, join) + case join @ Join(left, right, joinType, condition) => + logger.debug(s"Considering hash join on: $condition") + splitPredicates(condition.toSeq, join) + case _ => None + } + + // Find equi-join predicates that can be evaluated before the join, and thus can be used + // as join keys. + def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = { + val Join(left, right, joinType, _) = join + val (joinPredicates, otherPredicates) = allPredicates.partition { + case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) || + (canEvaluate(l, right) && canEvaluate(r, left)) => true + case _ => false + } + + val joinKeys = joinPredicates.map { + case Equals(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r) + case Equals(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l) + } + + // Do not consider this strategy if there are no join keys. + if (joinKeys.nonEmpty) { + val leftKeys = joinKeys.map(_._1) + val rightKeys = joinKeys.map(_._2) + + Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) + } else { + logger.debug(s"Avoiding hash join with no join keys.") + None + } + } +} + /** * A pattern that collects all adjacent unions and returns their children as a Seq. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 500fde1971..f763106da4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -28,51 +28,16 @@ import org.apache.spark.sql.parquet._ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SQLContext#SparkPlanner => - object HashJoin extends Strategy { + object HashJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case FilteredOperation(predicates, logical.Join(left, right, Inner, condition)) => - logger.debug(s"Considering join: ${predicates ++ condition}") - // Find equi-join predicates that can be evaluated before the join, and thus can be used - // as join keys. Note we can only mix in the conditions with other predicates because the - // match above ensures that this is and Inner join. - val (joinPredicates, otherPredicates) = (predicates ++ condition).partition { - case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) || - (canEvaluate(l, right) && canEvaluate(r, left)) => true - case _ => false - } - - val joinKeys = joinPredicates.map { - case Equals(l,r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r) - case Equals(l,r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l) - } - - // Do not consider this strategy if there are no join keys. - if (joinKeys.nonEmpty) { - val leftKeys = joinKeys.map(_._1) - val rightKeys = joinKeys.map(_._2) - - val joinOp = execution.HashJoin( - leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) - - // Make sure other conditions are met if present. - if (otherPredicates.nonEmpty) { - execution.Filter(combineConjunctivePredicates(otherPredicates), joinOp) :: Nil - } else { - joinOp :: Nil - } - } else { - logger.debug(s"Avoiding spark join with no join keys.") - Nil - } + // Find inner joins where at least some predicates can be evaluated by matching hash keys + // using the HashFilteredJoin pattern. + case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => + val hashJoin = + execution.HashJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) + condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil case _ => Nil } - - private def combineConjunctivePredicates(predicates: Seq[Expression]) = - predicates.reduceLeft(And) - - /** Returns true if `expr` can be evaluated using only the output of `plan`. */ - protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean = - expr.references subsetOf plan.outputSet } object PartialAggregation extends Strategy { -- GitLab