-
- Downloads
[SPARK-13523] [SQL] Reuse exchanges in a query
## What changes were proposed in this pull request? It’s possible to have common parts in a query, for example, self join, it will be good to avoid the duplicated part to same CPUs and memory (Broadcast or cache). Exchange will materialize the underlying RDD by shuffle or collect, it’s a great point to check duplicates and reuse them. Duplicated exchanges means they generate exactly the same result inside a query. In order to find out the duplicated exchanges, we should be able to compare SparkPlan to check that they have same results or not. We already have that for LogicalPlan, so we should move that into QueryPlan to make it available for SparkPlan. Once we can find the duplicated exchanges, we should replace all of them with same SparkPlan object (could be wrapped by ReusedExchage for explain), then the plan tree become a DAG. Since all the planner only work with tree, so this rule should be the last one for the entire planning. After the rule, the plan will looks like: ``` WholeStageCodegen : +- Project [id#0L] : +- BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, None : :- Project [id#0L] : : +- BroadcastHashJoin [id#0L], [id#1L], Inner, BuildRight, None : : :- Range 0, 1, 4, 1024, [id#0L] : : +- INPUT : +- INPUT :- BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L)) : +- WholeStageCodegen : : +- Range 0, 1, 4, 1024, [id#1L] +- ReusedExchange [id#2L], BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L)) ```  For three ways SortMergeJoin, ``` == Physical Plan == WholeStageCodegen : +- Project [id#0L] : +- SortMergeJoin [id#0L], [id#4L], None : :- INPUT : +- INPUT :- WholeStageCodegen : : +- Project [id#0L] : : +- SortMergeJoin [id#0L], [id#3L], None : : :- INPUT : : +- INPUT : :- WholeStageCodegen : : : +- Sort [id#0L ASC], false, 0 : : : +- INPUT : : +- Exchange hashpartitioning(id#0L, 200), None : : +- WholeStageCodegen : : : +- Range 0, 1, 4, 33554432, [id#0L] : +- WholeStageCodegen : : +- Sort [id#3L ASC], false, 0 : : +- INPUT : +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200), None +- WholeStageCodegen : +- Sort [id#4L ASC], false, 0 : +- INPUT +- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200), None ```  If the same ShuffleExchange or BroadcastExchange, execute()/executeBroadcast() will be called by different parents, they should cached the RDD/Broadcast, return the same one for all the parents. ## How was this patch tested? Added some unit tests for this. Had done some manual tests on TPCDS query Q59 and Q64, we can see some exchanges are re-used (this requires a change in PhysicalRDD to for sameResult, is be done in #11514 ). Author: Davies Liu <davies@databricks.com> Closes #11403 from davies/dedup.
Showing
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala 62 additions, 1 deletion...scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala 1 addition, 54 deletions...apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala 9 additions, 0 deletions...che/spark/sql/catalyst/plans/physical/broadcastMode.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala 19 additions, 3 deletions.../scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala 4 additions, 0 deletions...che/spark/sql/execution/aggregate/TungstenAggregate.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 3 additions, 0 deletions...scala/org/apache/spark/sql/execution/basicOperators.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala 7 additions, 3 deletions...ache/spark/sql/execution/exchange/BroadcastExchange.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 92 additions, 0 deletions...la/org/apache/spark/sql/execution/exchange/Exchange.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala 18 additions, 11 deletions...apache/spark/sql/execution/exchange/ShuffleExchange.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala 14 additions, 1 deletion...org/apache/spark/sql/execution/joins/HashedRelation.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala 13 additions, 7 deletions...la/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
- sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 6 additions, 0 deletions...rc/main/scala/org/apache/spark/sql/internal/SQLConf.scala
- sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 3 additions, 3 deletions...in/scala/org/apache/spark/sql/internal/SessionState.scala
- sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 36 additions, 2 deletions.../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala 70 additions, 2 deletions.../scala/org/apache/spark/sql/execution/ExchangeSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 46 additions, 3 deletions...t/scala/org/apache/spark/sql/execution/PlannerSuite.scala
Loading
Please register or sign in to comment