From 6e7934ed41fe284a4f151cc0cfb51b18b5c89bc2 Mon Sep 17 00:00:00 2001
From: Takuya UESHIN <ueshin@happy-camper.st>
Date: Wed, 21 May 2014 15:37:47 -0700
Subject: [PATCH] [SPARK-1889] [SQL] Apply splitConjunctivePredicates to join
 condition while finding join ke...

...ys.

When tables are equi-joined by multiple-keys `HashJoin` should be used, but `CartesianProduct` and then `Filter` are used.
The join keys are paired by `And` expression so we need to apply `splitConjunctivePredicates` to join condition while finding join keys.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #836 from ueshin/issues/SPARK-1889 and squashes the following commits:

fe1c387 [Takuya UESHIN] Apply splitConjunctivePredicates to join condition while finding join keys.

(cherry picked from commit bb88875ad52e8209c25e8350af1fe4b7159086ae)
Signed-off-by: Reynold Xin <rxin@apache.org>
---
 .../sql/catalyst/planning/patterns.scala      | 11 ++++++-----
 .../spark/sql/execution/PlannerSuite.scala    | 19 ++++++++++++++++++-
 2 files changed, 24 insertions(+), 6 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 0e3a8a6bd3..4544b32958 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -129,11 +129,12 @@ object HashFilteredJoin extends Logging with PredicateHelper {
   // as join keys.
   def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = {
     val Join(left, right, joinType, _) = join
-    val (joinPredicates, otherPredicates) = allPredicates.partition {
-      case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
-        (canEvaluate(l, right) && canEvaluate(r, left)) => true
-      case _ => false
-    }
+    val (joinPredicates, otherPredicates) =
+      allPredicates.flatMap(splitConjunctivePredicates).partition {
+        case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
+          (canEvaluate(l, right) && canEvaluate(r, left)) => true
+        case _ => false
+      }
 
     val joinKeys = joinPredicates.map {
       case Equals(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index e24c74a7a5..c563d63627 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -21,7 +21,7 @@ import org.scalatest.FunSuite
 
 import org.apache.spark.sql.TestData._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.execution
 import org.apache.spark.sql.test.TestSQLContext._
 import org.apache.spark.sql.test.TestSQLContext.planner._
@@ -57,4 +57,21 @@ class PlannerSuite extends FunSuite {
     val planned = PartialAggregation(query)
     assert(planned.isEmpty)
   }
+
+  test("equi-join is hash-join") {
+    val x = testData2.as('x)
+    val y = testData2.as('y)
+    val join = x.join(y, Inner, Some("x.a".attr === "y.a".attr)).queryExecution.analyzed
+    val planned = planner.HashJoin(join)
+    assert(planned.size === 1)
+  }
+
+  test("multiple-key equi-join is hash-join") {
+    val x = testData2.as('x)
+    val y = testData2.as('y)
+    val join = x.join(y, Inner,
+      Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).queryExecution.analyzed
+    val planned = planner.HashJoin(join)
+    assert(planned.size === 1)
+  }
 }
-- 
GitLab