Skip to content
Snippets Groups Projects
Commit 86ff8b10 authored by Michael Armbrust's avatar Michael Armbrust Committed by Reynold Xin
Browse files

Generalize pattern for planning hash joins.

This will be helpful for [SPARK-1495](https://issues.apache.org/jira/browse/SPARK-1495) and other cases where we want to have custom hash join implementations but don't want to repeat the logic for finding the join keys.

Author: Michael Armbrust <michael@databricks.com>

Closes #418 from marmbrus/hashFilter and squashes the following commits:

d5cc79b [Michael Armbrust] Address @rxin 's comments.
366b6d9 [Michael Armbrust] style fixes
14560eb [Michael Armbrust] Generalize pattern for planning hash joins.
f4809c1 [Michael Armbrust] Move common functions to PredicateHelper.
parent cd12dd9b
No related branches found
No related tags found
No related merge requests found
......@@ -17,10 +17,11 @@
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.types.{BooleanType, StringType, TimestampType}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.types.BooleanType
object InterpretedPredicate {
def apply(expression: Expression): (Row => Boolean) = {
......@@ -37,10 +38,26 @@ trait Predicate extends Expression {
}
trait PredicateHelper {
def splitConjunctivePredicates(condition: Expression): Seq[Expression] = condition match {
case And(cond1, cond2) => splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
case other => other :: Nil
protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
condition match {
case And(cond1, cond2) =>
splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
case other => other :: Nil
}
}
/**
* Returns true if `expr` can be evaluated using only the output of `plan`. This method
* can be used to determine when is is acceptable to move expression evaluation within a query
* plan.
*
* For example consider a join between two relations R(a, b) and S(c, d).
*
* `canEvaluate(Equals(a,b), R)` returns `true` where as `canEvaluate(Equals(a,c), R)` returns
* `false`.
*/
protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
expr.references.subsetOf(plan.outputSet)
}
abstract class BinaryPredicate extends BinaryExpression with Predicate {
......
......@@ -19,7 +19,10 @@ package org.apache.spark.sql.catalyst.planning
import scala.annotation.tailrec
import org.apache.spark.sql.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
/**
......@@ -101,6 +104,55 @@ object PhysicalOperation extends PredicateHelper {
}
}
/**
* A pattern that finds joins with equality conditions that can be evaluated using hashing
* techniques. For inner joins, any filters on top of the join operator are also matched.
*/
object HashFilteredJoin extends Logging with PredicateHelper {
/** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
type ReturnType =
(JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)
def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
// All predicates can be evaluated for inner join (i.e., those that are in the ON
// clause and WHERE clause.)
case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) =>
logger.debug(s"Considering hash inner join on: ${predicates ++ condition}")
splitPredicates(predicates ++ condition, join)
case join @ Join(left, right, joinType, condition) =>
logger.debug(s"Considering hash join on: $condition")
splitPredicates(condition.toSeq, join)
case _ => None
}
// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys.
def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = {
val Join(left, right, joinType, _) = join
val (joinPredicates, otherPredicates) = allPredicates.partition {
case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
(canEvaluate(l, right) && canEvaluate(r, left)) => true
case _ => false
}
val joinKeys = joinPredicates.map {
case Equals(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
case Equals(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
}
// Do not consider this strategy if there are no join keys.
if (joinKeys.nonEmpty) {
val leftKeys = joinKeys.map(_._1)
val rightKeys = joinKeys.map(_._2)
Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
} else {
logger.debug(s"Avoiding hash join with no join keys.")
None
}
}
}
/**
* A pattern that collects all adjacent unions and returns their children as a Seq.
*/
......
......@@ -28,51 +28,16 @@ import org.apache.spark.sql.parquet._
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>
object HashJoin extends Strategy {
object HashJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case FilteredOperation(predicates, logical.Join(left, right, Inner, condition)) =>
logger.debug(s"Considering join: ${predicates ++ condition}")
// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys. Note we can only mix in the conditions with other predicates because the
// match above ensures that this is and Inner join.
val (joinPredicates, otherPredicates) = (predicates ++ condition).partition {
case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
(canEvaluate(l, right) && canEvaluate(r, left)) => true
case _ => false
}
val joinKeys = joinPredicates.map {
case Equals(l,r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
case Equals(l,r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
}
// Do not consider this strategy if there are no join keys.
if (joinKeys.nonEmpty) {
val leftKeys = joinKeys.map(_._1)
val rightKeys = joinKeys.map(_._2)
val joinOp = execution.HashJoin(
leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
// Make sure other conditions are met if present.
if (otherPredicates.nonEmpty) {
execution.Filter(combineConjunctivePredicates(otherPredicates), joinOp) :: Nil
} else {
joinOp :: Nil
}
} else {
logger.debug(s"Avoiding spark join with no join keys.")
Nil
}
// Find inner joins where at least some predicates can be evaluated by matching hash keys
// using the HashFilteredJoin pattern.
case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) =>
val hashJoin =
execution.HashJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
case _ => Nil
}
private def combineConjunctivePredicates(predicates: Seq[Expression]) =
predicates.reduceLeft(And)
/** Returns true if `expr` can be evaluated using only the output of `plan`. */
protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
expr.references subsetOf plan.outputSet
}
object PartialAggregation extends Strategy {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment