Skip to content
Snippets Groups Projects
Commit bbfd6b5d authored by Xiao Li's avatar Xiao Li Committed by Wenchen Fan
Browse files

[SPARK-21647][SQL] Fix SortMergeJoin when using CROSS

### What changes were proposed in this pull request?
author: BoleynSu
closes https://github.com/apache/spark/pull/18836

```Scala
val df = Seq((1, 1)).toDF("i", "j")
df.createOrReplaceTempView("T")
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
  sql("select * from (select a.i from T a cross join T t where t.i = a.i) as t1 " +
    "cross join T t2 where t2.i = t1.i").explain(true)
}
```
The above code could cause the following exception:
```
SortMergeJoinExec should not take Cross as the JoinType
java.lang.IllegalArgumentException: SortMergeJoinExec should not take Cross as the JoinType
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputOrdering(SortMergeJoinExec.scala:100)
```

Our SortMergeJoinExec supports CROSS. We should not hit such an exception. This PR is to fix the issue.

### How was this patch tested?
Modified the two existing test cases.

Author: Xiao Li <gatorsmile@gmail.com>
Author: Boleyn Su <boleyn.su@gmail.com>

Closes #18863 from gatorsmile/pr-18836.
parent 8b69b17f
No related branches found
No related tags found
No related merge requests found
......@@ -82,7 +82,7 @@ case class SortMergeJoinExec(
override def outputOrdering: Seq[SortOrder] = joinType match {
// For inner join, orders of both sides keys should be kept.
case Inner =>
case _: InnerLike =>
val leftKeyOrdering = getKeyOrdering(leftKeys, left.outputOrdering)
val rightKeyOrdering = getKeyOrdering(rightKeys, right.outputOrdering)
leftKeyOrdering.zip(rightKeyOrdering).map { case (lKey, rKey) =>
......
......@@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{execution, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.columnar.InMemoryRelation
......@@ -513,26 +513,30 @@ class PlannerSuite extends SharedSQLContext {
}
test("EnsureRequirements skips sort when either side of join keys is required after inner SMJ") {
val innerSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, Inner, None, planA, planB)
// Both left and right keys should be sorted after the SMJ.
Seq(orderingA, orderingB).foreach { ordering =>
assertSortRequirementsAreSatisfied(
childPlan = innerSmj,
requiredOrdering = Seq(ordering),
shouldHaveSort = false)
Seq(Inner, Cross).foreach { joinType =>
val innerSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, joinType, None, planA, planB)
// Both left and right keys should be sorted after the SMJ.
Seq(orderingA, orderingB).foreach { ordering =>
assertSortRequirementsAreSatisfied(
childPlan = innerSmj,
requiredOrdering = Seq(ordering),
shouldHaveSort = false)
}
}
}
test("EnsureRequirements skips sort when key order of a parent SMJ is propagated from its " +
"child SMJ") {
val childSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, Inner, None, planA, planB)
val parentSmj = SortMergeJoinExec(exprB :: Nil, exprC :: Nil, Inner, None, childSmj, planC)
// After the second SMJ, exprA, exprB and exprC should all be sorted.
Seq(orderingA, orderingB, orderingC).foreach { ordering =>
assertSortRequirementsAreSatisfied(
childPlan = parentSmj,
requiredOrdering = Seq(ordering),
shouldHaveSort = false)
Seq(Inner, Cross).foreach { joinType =>
val childSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, joinType, None, planA, planB)
val parentSmj = SortMergeJoinExec(exprB :: Nil, exprC :: Nil, joinType, None, childSmj, planC)
// After the second SMJ, exprA, exprB and exprC should all be sorted.
Seq(orderingA, orderingB, orderingC).foreach { ordering =>
assertSortRequirementsAreSatisfied(
childPlan = parentSmj,
requiredOrdering = Seq(ordering),
shouldHaveSort = false)
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment