Skip to content
Snippets Groups Projects
  • Dongjoon Hyun's avatar
    dff73bfa
    [SPARK-16052][SQL] Improve `CollapseRepartition` optimizer for Repartition/RepartitionBy · dff73bfa
    Dongjoon Hyun authored
    ## What changes were proposed in this pull request?
    
    This PR improves `CollapseRepartition` to optimize the adjacent combinations of **Repartition** and **RepartitionBy**. Also, this PR adds a testsuite for this optimizer.
    
    **Target Scenario**
    ```scala
    scala> val dsView1 = spark.range(8).repartition(8, $"id")
    scala> dsView1.createOrReplaceTempView("dsView1")
    scala> sql("select id from dsView1 distribute by id").explain(true)
    ```
    
    **Before**
    ```scala
    scala> sql("select id from dsView1 distribute by id").explain(true)
    == Parsed Logical Plan ==
    'RepartitionByExpression ['id]
    +- 'Project ['id]
       +- 'UnresolvedRelation `dsView1`
    
    == Analyzed Logical Plan ==
    id: bigint
    RepartitionByExpression [id#0L]
    +- Project [id#0L]
       +- SubqueryAlias dsview1
          +- RepartitionByExpression [id#0L], 8
             +- Range (0, 8, splits=8)
    
    == Optimized Logical Plan ==
    RepartitionByExpression [id#0L]
    +- RepartitionByExpression [id#0L], 8
       +- Range (0, 8, splits=8)
    
    == Physical Plan ==
    Exchange hashpartitioning(id#0L, 200)
    +- Exchange hashpartitioning(id#0L, 8)
       +- *Range (0, 8, splits=8)
    ```
    
    **After**
    ```scala
    scala> sql("select id from dsView1 distribute by id").explain(true)
    == Parsed Logical Plan ==
    'RepartitionByExpression ['id]
    +- 'Project ['id]
       +- 'UnresolvedRelation `dsView1`
    
    == Analyzed Logical Plan ==
    id: bigint
    RepartitionByExpression [id#0L]
    +- Project [id#0L]
       +- SubqueryAlias dsview1
          +- RepartitionByExpression [id#0L], 8
             +- Range (0, 8, splits=8)
    
    == Optimized Logical Plan ==
    RepartitionByExpression [id#0L]
    +- Range (0, 8, splits=8)
    
    == Physical Plan ==
    Exchange hashpartitioning(id#0L, 200)
    +- *Range (0, 8, splits=8)
    ```
    
    ## How was this patch tested?
    
    Pass the Jenkins tests (including a new testsuite).
    
    Author: Dongjoon Hyun <dongjoon@apache.org>
    
    Closes #13765 from dongjoon-hyun/SPARK-16052.
    dff73bfa
    History
    [SPARK-16052][SQL] Improve `CollapseRepartition` optimizer for Repartition/RepartitionBy
    Dongjoon Hyun authored
    ## What changes were proposed in this pull request?
    
    This PR improves `CollapseRepartition` to optimize the adjacent combinations of **Repartition** and **RepartitionBy**. Also, this PR adds a testsuite for this optimizer.
    
    **Target Scenario**
    ```scala
    scala> val dsView1 = spark.range(8).repartition(8, $"id")
    scala> dsView1.createOrReplaceTempView("dsView1")
    scala> sql("select id from dsView1 distribute by id").explain(true)
    ```
    
    **Before**
    ```scala
    scala> sql("select id from dsView1 distribute by id").explain(true)
    == Parsed Logical Plan ==
    'RepartitionByExpression ['id]
    +- 'Project ['id]
       +- 'UnresolvedRelation `dsView1`
    
    == Analyzed Logical Plan ==
    id: bigint
    RepartitionByExpression [id#0L]
    +- Project [id#0L]
       +- SubqueryAlias dsview1
          +- RepartitionByExpression [id#0L], 8
             +- Range (0, 8, splits=8)
    
    == Optimized Logical Plan ==
    RepartitionByExpression [id#0L]
    +- RepartitionByExpression [id#0L], 8
       +- Range (0, 8, splits=8)
    
    == Physical Plan ==
    Exchange hashpartitioning(id#0L, 200)
    +- Exchange hashpartitioning(id#0L, 8)
       +- *Range (0, 8, splits=8)
    ```
    
    **After**
    ```scala
    scala> sql("select id from dsView1 distribute by id").explain(true)
    == Parsed Logical Plan ==
    'RepartitionByExpression ['id]
    +- 'Project ['id]
       +- 'UnresolvedRelation `dsView1`
    
    == Analyzed Logical Plan ==
    id: bigint
    RepartitionByExpression [id#0L]
    +- Project [id#0L]
       +- SubqueryAlias dsview1
          +- RepartitionByExpression [id#0L], 8
             +- Range (0, 8, splits=8)
    
    == Optimized Logical Plan ==
    RepartitionByExpression [id#0L]
    +- Range (0, 8, splits=8)
    
    == Physical Plan ==
    Exchange hashpartitioning(id#0L, 200)
    +- *Range (0, 8, splits=8)
    ```
    
    ## How was this patch tested?
    
    Pass the Jenkins tests (including a new testsuite).
    
    Author: Dongjoon Hyun <dongjoon@apache.org>
    
    Closes #13765 from dongjoon-hyun/SPARK-16052.
dataframe.py 57.23 KiB