Skip to content
Snippets Groups Projects
  • Davies Liu's avatar
    5389013a
    [SPARK-15888] [SQL] fix Python UDF with aggregate · 5389013a
    Davies Liu authored
    ## What changes were proposed in this pull request?
    
    After we move the ExtractPythonUDF rule into physical plan, Python UDF can't work on top of aggregate anymore, because they can't be evaluated before aggregate, should be evaluated after aggregate. This PR add another rule to extract these kind of Python UDF from logical aggregate, create a Project on top of Aggregate.
    
    ## How was this patch tested?
    
    Added regression tests. The plan of added test query looks like this:
    ```
    == Parsed Logical Plan ==
    'Project [<lambda>('k, 's) AS t#26]
    +- Aggregate [<lambda>(key#5L)], [<lambda>(key#5L) AS k#17, sum(cast(<lambda>(value#6) as bigint)) AS s#22L]
       +- LogicalRDD [key#5L, value#6]
    
    == Analyzed Logical Plan ==
    t: int
    Project [<lambda>(k#17, s#22L) AS t#26]
    +- Aggregate [<lambda>(key#5L)], [<lambda>(key#5L) AS k#17, sum(cast(<lambda>(value#6) as bigint)) AS s#22L]
       +- LogicalRDD [key#5L, value#6]
    
    == Optimized Logical Plan ==
    Project [<lambda>(agg#29, agg#30L) AS t#26]
    +- Aggregate [<lambda>(key#5L)], [<lambda>(key#5L) AS agg#29, sum(cast(<lambda>(value#6) as bigint)) AS agg#30L]
       +- LogicalRDD [key#5L, value#6]
    
    == Physical Plan ==
    *Project [pythonUDF0#37 AS t#26]
    +- BatchEvalPython [<lambda>(agg#29, agg#30L)], [agg#29, agg#30L, pythonUDF0#37]
       +- *HashAggregate(key=[<lambda>(key#5L)#31], functions=[sum(cast(<lambda>(value#6) as bigint))], output=[agg#29,agg#30L])
          +- Exchange hashpartitioning(<lambda>(key#5L)#31, 200)
             +- *HashAggregate(key=[pythonUDF0#34 AS <lambda>(key#5L)#31], functions=[partial_sum(cast(pythonUDF1#35 as bigint))], output=[<lambda>(key#5L)#31,sum#33L])
                +- BatchEvalPython [<lambda>(key#5L), <lambda>(value#6)], [key#5L, value#6, pythonUDF0#34, pythonUDF1#35]
                   +- Scan ExistingRDD[key#5L,value#6]
    ```
    
    Author: Davies Liu <davies@databricks.com>
    
    Closes #13682 from davies/fix_py_udf.
    5389013a
    History
    [SPARK-15888] [SQL] fix Python UDF with aggregate
    Davies Liu authored
    ## What changes were proposed in this pull request?
    
    After we move the ExtractPythonUDF rule into physical plan, Python UDF can't work on top of aggregate anymore, because they can't be evaluated before aggregate, should be evaluated after aggregate. This PR add another rule to extract these kind of Python UDF from logical aggregate, create a Project on top of Aggregate.
    
    ## How was this patch tested?
    
    Added regression tests. The plan of added test query looks like this:
    ```
    == Parsed Logical Plan ==
    'Project [<lambda>('k, 's) AS t#26]
    +- Aggregate [<lambda>(key#5L)], [<lambda>(key#5L) AS k#17, sum(cast(<lambda>(value#6) as bigint)) AS s#22L]
       +- LogicalRDD [key#5L, value#6]
    
    == Analyzed Logical Plan ==
    t: int
    Project [<lambda>(k#17, s#22L) AS t#26]
    +- Aggregate [<lambda>(key#5L)], [<lambda>(key#5L) AS k#17, sum(cast(<lambda>(value#6) as bigint)) AS s#22L]
       +- LogicalRDD [key#5L, value#6]
    
    == Optimized Logical Plan ==
    Project [<lambda>(agg#29, agg#30L) AS t#26]
    +- Aggregate [<lambda>(key#5L)], [<lambda>(key#5L) AS agg#29, sum(cast(<lambda>(value#6) as bigint)) AS agg#30L]
       +- LogicalRDD [key#5L, value#6]
    
    == Physical Plan ==
    *Project [pythonUDF0#37 AS t#26]
    +- BatchEvalPython [<lambda>(agg#29, agg#30L)], [agg#29, agg#30L, pythonUDF0#37]
       +- *HashAggregate(key=[<lambda>(key#5L)#31], functions=[sum(cast(<lambda>(value#6) as bigint))], output=[agg#29,agg#30L])
          +- Exchange hashpartitioning(<lambda>(key#5L)#31, 200)
             +- *HashAggregate(key=[pythonUDF0#34 AS <lambda>(key#5L)#31], functions=[partial_sum(cast(pythonUDF1#35 as bigint))], output=[<lambda>(key#5L)#31,sum#33L])
                +- BatchEvalPython [<lambda>(key#5L), <lambda>(value#6)], [key#5L, value#6, pythonUDF0#34, pythonUDF1#35]
                   +- Scan ExistingRDD[key#5L,value#6]
    ```
    
    Author: Davies Liu <davies@databricks.com>
    
    Closes #13682 from davies/fix_py_udf.