[SPARK-21441][SQL] Incorrect Codegen in SortMergeJoinExec results failures in some cases
## What changes were proposed in this pull request? https://issues.apache.org/jira/projects/SPARK/issues/SPARK-21441 This issue can be reproduced by the following example: ``` val spark = SparkSession .builder() .appName("smj-codegen") .master("local") .config("spark.sql.autoBroadcastJoinThreshold", "1") .getOrCreate() val df1 = spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("key", "int") val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"), (3, "3"))).toDF("key", "str") val df = df1.join(df2, df1("key") === df2("key")) .filter("int = 2 or reflect('java.lang.Integer', 'valueOf', str) = 1") .select("int") df.show() ``` To conclude, the issue happens when: (1) SortMergeJoin condition contains CodegenFallback expressions. (2) In PhysicalPlan tree, SortMergeJoin node is the child of root node, e.g., the Project in above example. This patch fixes the logic in `CollapseCodegenStages` rule. ## How was this patch tested? Unit test and manual verification in our cluster. Author: donnyzone <wellfengzhu@gmail.com> Closes #18656 from DonnyZone/Fix_SortMergeJoinExec.
Showing
- sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala 4 additions, 4 deletions...rg/apache/spark/sql/execution/WholeStageCodegenExec.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala 22 additions, 0 deletions...g/apache/spark/sql/execution/WholeStageCodegenSuite.scala
Please register or sign in to comment