From 67bd8e3c217a80c3117a6e3853aa60fe13d08c91 Mon Sep 17 00:00:00 2001 From: Yin Huai <huai@cse.ohio-state.edu> Date: Sat, 2 Aug 2014 13:16:41 -0700 Subject: [PATCH] [SQL] Set outputPartitioning of BroadcastHashJoin correctly. I think we will not generate the plan triggering this bug at this moment. But, let me explain it... Right now, we are using `left.outputPartitioning` as the `outputPartitioning` of a `BroadcastHashJoin`. We may have a wrong physical plan for cases like... ```sql SELECT l.key, count(*) FROM (SELECT key, count(*) as cnt FROM src GROUP BY key) l // This is buildPlan JOIN r // This is the streamedPlan ON (l.cnt = r.value) GROUP BY l.key ``` Let's say we have a `BroadcastHashJoin` on `l` and `r`. For this case, we will pick `l`'s `outputPartitioning` for the `outputPartitioning`of the `BroadcastHashJoin` on `l` and `r`. Also, because the last `GROUP BY` is using `l.key` as the key, we will not introduce an `Exchange` for this aggregation. However, `r`'s outputPartitioning may not match the required distribution of the last `GROUP BY` and we fail to group data correctly. JIRA is being reindexed. I will create a JIRA ticket once it is back online. Author: Yin Huai <huai@cse.ohio-state.edu> Closes #1735 from yhuai/BroadcastHashJoin and squashes the following commits: 96d9cb3 [Yin Huai] Set outputPartitioning correctly. --- .../src/main/scala/org/apache/spark/sql/execution/joins.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index cc138c7499..51bb615307 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -405,8 +405,7 @@ case class BroadcastHashJoin( left: SparkPlan, right: SparkPlan) extends BinaryNode with HashJoin { - - override def outputPartitioning: Partitioning = left.outputPartitioning + override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning override def requiredChildDistribution = UnspecifiedDistribution :: UnspecifiedDistribution :: Nil -- GitLab