Skip to content
Snippets Groups Projects
Commit 095862a3 authored by Tejas Patil's avatar Tejas Patil Committed by Herman van Hovell
Browse files

[SPARK-17271][SQL] Planner adds un-necessary Sort even if child ordering is...

[SPARK-17271][SQL] Planner adds un-necessary Sort even if child ordering is semantically same as required ordering

## What changes were proposed in this pull request?

Jira : https://issues.apache.org/jira/browse/SPARK-17271

Planner is adding un-needed SORT operation due to bug in the way comparison for `SortOrder` is done at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L253
`SortOrder` needs to be compared semantically because `Expression` within two `SortOrder` can be "semantically equal" but not literally equal objects.

eg. In case of `sql("SELECT * FROM table1 a JOIN table2 b ON a.col1=b.col1")`

Expression in required SortOrder:
```
      AttributeReference(
        name = "col1",
        dataType = LongType,
        nullable = false
      ) (exprId = exprId,
        qualifier = Some("a")
      )
```

Expression in child SortOrder:
```
      AttributeReference(
        name = "col1",
        dataType = LongType,
        nullable = false
      ) (exprId = exprId)
```

Notice that the output column has a qualifier but the child attribute does not but the inherent expression is the same and hence in this case we can say that the child satisfies the required sort order.

This PR includes following changes:
- Added a `semanticEquals` method to `SortOrder` so that it can compare underlying child expressions semantically (and not using default Object.equals)
- Fixed `EnsureRequirements` to use semantic comparison of SortOrder

## How was this patch tested?

- Added a test case to `PlannerSuite`. Ran rest tests in `PlannerSuite`

Author: Tejas Patil <tejasp@fb.com>

Closes #14841 from tejasapatil/SPARK-17271_sort_order_equals_bug.
parent e07baf14
No related branches found
No related tags found
No related merge requests found
......@@ -61,6 +61,9 @@ case class SortOrder(child: Expression, direction: SortDirection)
override def sql: String = child.sql + " " + direction.sql
def isAscending: Boolean = direction == Ascending
def semanticEquals(other: SortOrder): Boolean =
(direction == other.direction) && child.semanticEquals(other.child)
}
/**
......
......@@ -250,7 +250,16 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
if (requiredOrdering.nonEmpty) {
// If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort.
if (requiredOrdering != child.outputOrdering.take(requiredOrdering.length)) {
val orderingMatched = if (requiredOrdering.length > child.outputOrdering.length) {
false
} else {
requiredOrdering.zip(child.outputOrdering).forall {
case (requiredOrder, childOutputOrder) =>
requiredOrder.semanticEquals(childOutputOrder)
}
}
if (!orderingMatched) {
SortExec(requiredOrdering, global = false, child = child)
} else {
child
......
......@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{execution, DataFrame, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Literal, SortOrder}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
import org.apache.spark.sql.catalyst.plans.physical._
......@@ -444,6 +444,44 @@ class PlannerSuite extends SharedSQLContext {
}
}
test("EnsureRequirements skips sort when required ordering is semantically equal to " +
"existing ordering") {
val exprId: ExprId = NamedExpression.newExprId
val attribute1 =
AttributeReference(
name = "col1",
dataType = LongType,
nullable = false
) (exprId = exprId,
qualifier = Some("col1_qualifier")
)
val attribute2 =
AttributeReference(
name = "col1",
dataType = LongType,
nullable = false
) (exprId = exprId)
val orderingA1 = SortOrder(attribute1, Ascending)
val orderingA2 = SortOrder(attribute2, Ascending)
assert(orderingA1 != orderingA2, s"$orderingA1 should NOT equal to $orderingA2")
assert(orderingA1.semanticEquals(orderingA2),
s"$orderingA1 should be semantically equal to $orderingA2")
val inputPlan = DummySparkPlan(
children = DummySparkPlan(outputOrdering = Seq(orderingA1)) :: Nil,
requiredChildOrdering = Seq(Seq(orderingA2)),
requiredChildDistribution = Seq(UnspecifiedDistribution)
)
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assertDistributionRequirementsAreSatisfied(outputPlan)
if (outputPlan.collect { case s: SortExec => true }.nonEmpty) {
fail(s"No sorts should have been added:\n$outputPlan")
}
}
// This is a regression test for SPARK-11135
test("EnsureRequirements adds sort when required ordering isn't a prefix of existing ordering") {
val orderingA = SortOrder(Literal(1), Ascending)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment