Skip to content
Snippets Groups Projects
Commit bb88875a authored by Takuya UESHIN's avatar Takuya UESHIN Committed by Reynold Xin
Browse files

[SPARK-1889] [SQL] Apply splitConjunctivePredicates to join condition while finding join ke...

...ys.

When tables are equi-joined by multiple-keys `HashJoin` should be used, but `CartesianProduct` and then `Filter` are used.
The join keys are paired by `And` expression so we need to apply `splitConjunctivePredicates` to join condition while finding join keys.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #836 from ueshin/issues/SPARK-1889 and squashes the following commits:

fe1c387 [Takuya UESHIN] Apply splitConjunctivePredicates to join condition while finding join keys.
parent f18fd05b
No related branches found
No related tags found
No related merge requests found
...@@ -129,11 +129,12 @@ object HashFilteredJoin extends Logging with PredicateHelper { ...@@ -129,11 +129,12 @@ object HashFilteredJoin extends Logging with PredicateHelper {
// as join keys. // as join keys.
def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = { def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = {
val Join(left, right, joinType, _) = join val Join(left, right, joinType, _) = join
val (joinPredicates, otherPredicates) = allPredicates.partition { val (joinPredicates, otherPredicates) =
case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) || allPredicates.flatMap(splitConjunctivePredicates).partition {
(canEvaluate(l, right) && canEvaluate(r, left)) => true case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
case _ => false (canEvaluate(l, right) && canEvaluate(r, left)) => true
} case _ => false
}
val joinKeys = joinPredicates.map { val joinKeys = joinPredicates.map {
case Equals(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r) case Equals(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
......
...@@ -21,7 +21,7 @@ import org.scalatest.FunSuite ...@@ -21,7 +21,7 @@ import org.scalatest.FunSuite
import org.apache.spark.sql.TestData._ import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution import org.apache.spark.sql.execution
import org.apache.spark.sql.test.TestSQLContext._ import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.planner._ import org.apache.spark.sql.test.TestSQLContext.planner._
...@@ -57,4 +57,21 @@ class PlannerSuite extends FunSuite { ...@@ -57,4 +57,21 @@ class PlannerSuite extends FunSuite {
val planned = PartialAggregation(query) val planned = PartialAggregation(query)
assert(planned.isEmpty) assert(planned.isEmpty)
} }
test("equi-join is hash-join") {
val x = testData2.as('x)
val y = testData2.as('y)
val join = x.join(y, Inner, Some("x.a".attr === "y.a".attr)).queryExecution.analyzed
val planned = planner.HashJoin(join)
assert(planned.size === 1)
}
test("multiple-key equi-join is hash-join") {
val x = testData2.as('x)
val y = testData2.as('y)
val join = x.join(y, Inner,
Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).queryExecution.analyzed
val planned = planner.HashJoin(join)
assert(planned.size === 1)
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment