Skip to content
Snippets Groups Projects
Commit 6e7934ed authored by Takuya UESHIN's avatar Takuya UESHIN Committed by Reynold Xin
Browse files

[SPARK-1889] [SQL] Apply splitConjunctivePredicates to join condition while finding join ke...


...ys.

When tables are equi-joined by multiple-keys `HashJoin` should be used, but `CartesianProduct` and then `Filter` are used.
The join keys are paired by `And` expression so we need to apply `splitConjunctivePredicates` to join condition while finding join keys.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #836 from ueshin/issues/SPARK-1889 and squashes the following commits:

fe1c387 [Takuya UESHIN] Apply splitConjunctivePredicates to join condition while finding join keys.

(cherry picked from commit bb88875a)
Signed-off-by: default avatarReynold Xin <rxin@apache.org>
parent 30d1df5e
No related branches found
No related tags found
No related merge requests found
......@@ -129,11 +129,12 @@ object HashFilteredJoin extends Logging with PredicateHelper {
// as join keys.
def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = {
val Join(left, right, joinType, _) = join
val (joinPredicates, otherPredicates) = allPredicates.partition {
case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
(canEvaluate(l, right) && canEvaluate(r, left)) => true
case _ => false
}
val (joinPredicates, otherPredicates) =
allPredicates.flatMap(splitConjunctivePredicates).partition {
case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
(canEvaluate(l, right) && canEvaluate(r, left)) => true
case _ => false
}
val joinKeys = joinPredicates.map {
case Equals(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
......
......@@ -21,7 +21,7 @@ import org.scalatest.FunSuite
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.planner._
......@@ -57,4 +57,21 @@ class PlannerSuite extends FunSuite {
val planned = PartialAggregation(query)
assert(planned.isEmpty)
}
test("equi-join is hash-join") {
val x = testData2.as('x)
val y = testData2.as('y)
val join = x.join(y, Inner, Some("x.a".attr === "y.a".attr)).queryExecution.analyzed
val planned = planner.HashJoin(join)
assert(planned.size === 1)
}
test("multiple-key equi-join is hash-join") {
val x = testData2.as('x)
val y = testData2.as('y)
val join = x.join(y, Inner,
Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).queryExecution.analyzed
val planned = planner.HashJoin(join)
assert(planned.size === 1)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment