Skip to content
Snippets Groups Projects
Commit 67bd8e3c authored by Yin Huai's avatar Yin Huai Committed by Michael Armbrust
Browse files

[SQL] Set outputPartitioning of BroadcastHashJoin correctly.

I think we will not generate the plan triggering this bug at this moment. But, let me explain it...

Right now, we are using `left.outputPartitioning` as the `outputPartitioning` of a `BroadcastHashJoin`. We may have a wrong physical plan for cases like...
```sql
SELECT l.key, count(*)
FROM (SELECT key, count(*) as cnt
      FROM src
      GROUP BY key) l // This is buildPlan
JOIN r // This is the streamedPlan
ON (l.cnt = r.value)
GROUP BY l.key
```
Let's say we have a `BroadcastHashJoin` on `l` and `r`. For this case, we will pick `l`'s `outputPartitioning` for the `outputPartitioning`of the `BroadcastHashJoin` on `l` and `r`. Also, because the last `GROUP BY` is using `l.key` as the key, we will not introduce an `Exchange` for this aggregation. However, `r`'s outputPartitioning may not match the required distribution of the last `GROUP BY` and we fail to group data correctly.

JIRA is being reindexed. I will create a JIRA ticket once it is back online.

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1735 from yhuai/BroadcastHashJoin and squashes the following commits:

96d9cb3 [Yin Huai] Set outputPartitioning correctly.
parent 3f67382e
No related branches found
No related tags found
No related merge requests found
......@@ -405,8 +405,7 @@ case class BroadcastHashJoin(
left: SparkPlan,
right: SparkPlan) extends BinaryNode with HashJoin {
override def outputPartitioning: Partitioning = left.outputPartitioning
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
override def requiredChildDistribution =
UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment