From dff73bfa5e08c4c065584cfa9655a7839d28ad49 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun <dongjoon@apache.org> Date: Fri, 8 Jul 2016 16:44:53 +0800 Subject: [PATCH] [SPARK-16052][SQL] Improve `CollapseRepartition` optimizer for Repartition/RepartitionBy ## What changes were proposed in this pull request? This PR improves `CollapseRepartition` to optimize the adjacent combinations of **Repartition** and **RepartitionBy**. Also, this PR adds a testsuite for this optimizer. **Target Scenario** ```scala scala> val dsView1 = spark.range(8).repartition(8, $"id") scala> dsView1.createOrReplaceTempView("dsView1") scala> sql("select id from dsView1 distribute by id").explain(true) ``` **Before** ```scala scala> sql("select id from dsView1 distribute by id").explain(true) == Parsed Logical Plan == 'RepartitionByExpression ['id] +- 'Project ['id] +- 'UnresolvedRelation `dsView1` == Analyzed Logical Plan == id: bigint RepartitionByExpression [id#0L] +- Project [id#0L] +- SubqueryAlias dsview1 +- RepartitionByExpression [id#0L], 8 +- Range (0, 8, splits=8) == Optimized Logical Plan == RepartitionByExpression [id#0L] +- RepartitionByExpression [id#0L], 8 +- Range (0, 8, splits=8) == Physical Plan == Exchange hashpartitioning(id#0L, 200) +- Exchange hashpartitioning(id#0L, 8) +- *Range (0, 8, splits=8) ``` **After** ```scala scala> sql("select id from dsView1 distribute by id").explain(true) == Parsed Logical Plan == 'RepartitionByExpression ['id] +- 'Project ['id] +- 'UnresolvedRelation `dsView1` == Analyzed Logical Plan == id: bigint RepartitionByExpression [id#0L] +- Project [id#0L] +- SubqueryAlias dsview1 +- RepartitionByExpression [id#0L], 8 +- Range (0, 8, splits=8) == Optimized Logical Plan == RepartitionByExpression [id#0L] +- Range (0, 8, splits=8) == Physical Plan == Exchange hashpartitioning(id#0L, 200) +- *Range (0, 8, splits=8) ``` ## How was this patch tested? Pass the Jenkins tests (including a new testsuite). Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13765 from dongjoon-hyun/SPARK-16052. --- python/pyspark/sql/dataframe.py | 4 +- .../spark/sql/catalyst/dsl/package.scala | 7 +- .../sql/catalyst/optimizer/Optimizer.scala | 17 +++- .../optimizer/CollapseRepartitionSuite.scala | 78 +++++++++++++++++++ .../sql/catalyst/parser/PlanParserSuite.scala | 6 +- 5 files changed, 104 insertions(+), 8 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index a0ac7a9342..dd670a9b3d 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -464,10 +464,10 @@ class DataFrame(object): +---+-----+ |age| name| +---+-----+ - | 5| Bob| - | 5| Bob| | 2|Alice| + | 5| Bob| | 2|Alice| + | 5| Bob| +---+-----+ >>> data.rdd.getNumPartitions() 7 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 84c9cc8c8e..5181dcc786 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -370,8 +370,11 @@ package object dsl { case plan => SubqueryAlias(alias, plan) } - def distribute(exprs: Expression*): LogicalPlan = - RepartitionByExpression(exprs, logicalPlan) + def repartition(num: Integer): LogicalPlan = + Repartition(num, shuffle = true, logicalPlan) + + def distribute(exprs: Expression*)(n: Int = -1): LogicalPlan = + RepartitionByExpression(exprs, logicalPlan, numPartitions = if (n < 0) None else Some(n)) def analyze: LogicalPlan = EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(logicalPlan)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 03d15eabdd..368e9a5396 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -556,12 +556,27 @@ object CollapseProject extends Rule[LogicalPlan] { } /** - * Combines adjacent [[Repartition]] operators by keeping only the last one. + * Combines adjacent [[Repartition]] and [[RepartitionByExpression]] operator combinations + * by keeping only the one. + * 1. For adjacent [[Repartition]]s, collapse into the last [[Repartition]]. + * 2. For adjacent [[RepartitionByExpression]]s, collapse into the last [[RepartitionByExpression]]. + * 3. For a combination of [[Repartition]] and [[RepartitionByExpression]], collapse as a single + * [[RepartitionByExpression]] with the expression and last number of partition. */ object CollapseRepartition extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + // Case 1 case Repartition(numPartitions, shuffle, Repartition(_, _, child)) => Repartition(numPartitions, shuffle, child) + // Case 2 + case RepartitionByExpression(exprs, RepartitionByExpression(_, child, _), numPartitions) => + RepartitionByExpression(exprs, child, numPartitions) + // Case 3 + case Repartition(numPartitions, _, r: RepartitionByExpression) => + r.copy(numPartitions = Some(numPartitions)) + // Case 3 + case RepartitionByExpression(exprs, Repartition(_, _, child), numPartitions) => + RepartitionByExpression(exprs, child, numPartitions) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala new file mode 100644 index 0000000000..8952c72fe4 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor + +class CollapseRepartitionSuite extends PlanTest { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("CollapseRepartition", FixedPoint(10), + CollapseRepartition) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int) + + test("collapse two adjacent repartitions into one") { + val query = testRelation + .repartition(10) + .repartition(20) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = testRelation.repartition(20).analyze + + comparePlans(optimized, correctAnswer) + } + + test("collapse repartition and repartitionBy into one") { + val query = testRelation + .repartition(10) + .distribute('a)(20) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = testRelation.distribute('a)(20).analyze + + comparePlans(optimized, correctAnswer) + } + + test("collapse repartitionBy and repartition into one") { + val query = testRelation + .distribute('a)(20) + .repartition(10) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = testRelation.distribute('a)(10).analyze + + comparePlans(optimized, correctAnswer) + } + + test("collapse two adjacent repartitionBys into one") { + val query = testRelation + .distribute('b)(10) + .distribute('a)(20) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = testRelation.distribute('a)(20).analyze + + comparePlans(optimized, correctAnswer) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 456948d645..fbe236e196 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -151,9 +151,9 @@ class PlanParserSuite extends PlanTest { ("", basePlan), (" order by a, b desc", basePlan.orderBy('a.asc, 'b.desc)), (" sort by a, b desc", basePlan.sortBy('a.asc, 'b.desc)), - (" distribute by a, b", basePlan.distribute('a, 'b)), - (" distribute by a sort by b", basePlan.distribute('a).sortBy('b.asc)), - (" cluster by a, b", basePlan.distribute('a, 'b).sortBy('a.asc, 'b.asc)) + (" distribute by a, b", basePlan.distribute('a, 'b)()), + (" distribute by a sort by b", basePlan.distribute('a)().sortBy('b.asc)), + (" cluster by a, b", basePlan.distribute('a, 'b)().sortBy('a.asc, 'b.asc)) ) orderSortDistrClusterClauses.foreach { -- GitLab