Skip to content
Snippets Groups Projects
Commit 9061e777 authored by Yash Datta's avatar Yash Datta Committed by Michael Armbrust
Browse files

[SPARK-11878][SQL] Eliminate distribute by in case group by is present with...

[SPARK-11878][SQL] Eliminate distribute by in case group by is present with exactly the same grouping expressi

For queries like :
select <> from table group by a distribute by a
we can eliminate distribute by ; since group by will anyways do a hash partitioning
Also applicable when user uses Dataframe API

Author: Yash Datta <Yash.Datta@guavus.com>

Closes #9858 from saucam/eliminatedistribute.
parent 94c202c7
No related branches found
No related tags found
No related merge requests found
...@@ -467,6 +467,12 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ ...@@ -467,6 +467,12 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
} }
def apply(plan: SparkPlan): SparkPlan = plan.transformUp { def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case operator @ Exchange(partitioning, child, _) =>
child.children match {
case Exchange(childPartitioning, baseChild, _)::Nil =>
if (childPartitioning.guarantees(partitioning)) child else operator
case _ => operator
}
case operator: SparkPlan => ensureDistributionAndOrdering(operator) case operator: SparkPlan => ensureDistributionAndOrdering(operator)
} }
} }
...@@ -417,6 +417,45 @@ class PlannerSuite extends SharedSQLContext { ...@@ -417,6 +417,45 @@ class PlannerSuite extends SharedSQLContext {
} }
} }
test("EnsureRequirements eliminates Exchange if child has Exchange with same partitioning") {
val distribution = ClusteredDistribution(Literal(1) :: Nil)
val finalPartitioning = HashPartitioning(Literal(1) :: Nil, 5)
val childPartitioning = HashPartitioning(Literal(2) :: Nil, 5)
assert(!childPartitioning.satisfies(distribution))
val inputPlan = Exchange(finalPartitioning,
DummySparkPlan(
children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil,
requiredChildDistribution = Seq(distribution),
requiredChildOrdering = Seq(Seq.empty)),
None)
val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
assertDistributionRequirementsAreSatisfied(outputPlan)
if (outputPlan.collect { case e: Exchange => true }.size == 2) {
fail(s"Topmost Exchange should have been eliminated:\n$outputPlan")
}
}
test("EnsureRequirements does not eliminate Exchange with different partitioning") {
val distribution = ClusteredDistribution(Literal(1) :: Nil)
// Number of partitions differ
val finalPartitioning = HashPartitioning(Literal(1) :: Nil, 8)
val childPartitioning = HashPartitioning(Literal(2) :: Nil, 5)
assert(!childPartitioning.satisfies(distribution))
val inputPlan = Exchange(finalPartitioning,
DummySparkPlan(
children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil,
requiredChildDistribution = Seq(distribution),
requiredChildOrdering = Seq(Seq.empty)),
None)
val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
assertDistributionRequirementsAreSatisfied(outputPlan)
if (outputPlan.collect { case e: Exchange => true }.size == 1) {
fail(s"Topmost Exchange should not have been eliminated:\n$outputPlan")
}
}
// --------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment