From 59250fe51486908f9e3f3d9ef10aadbcb9b4d62d Mon Sep 17 00:00:00 2001 From: scwf <wangfei1@huawei.com> Date: Wed, 13 May 2015 16:13:48 -0700 Subject: [PATCH] [SPARK-7303] [SQL] push down project if possible when the child is sort Optimize the case of `project(_, sort)` , a example is: `select key from (select * from testData order by key) t` before this PR: ``` == Parsed Logical Plan == 'Project ['key] 'Subquery t 'Sort ['key ASC], true 'Project [*] 'UnresolvedRelation [testData], None == Analyzed Logical Plan == Project [key#0] Subquery t Sort [key#0 ASC], true Project [key#0,value#1] Subquery testData LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Optimized Logical Plan == Project [key#0] Sort [key#0 ASC], true LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Physical Plan == Project [key#0] Sort [key#0 ASC], true Exchange (RangePartitioning [key#0 ASC], 5), [] PhysicalRDD [key#0,value#1], MapPartitionsRDD[1] ``` after this PR ``` == Parsed Logical Plan == 'Project ['key] 'Subquery t 'Sort ['key ASC], true 'Project [*] 'UnresolvedRelation [testData], None == Analyzed Logical Plan == Project [key#0] Subquery t Sort [key#0 ASC], true Project [key#0,value#1] Subquery testData LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Optimized Logical Plan == Sort [key#0 ASC], true Project [key#0] LogicalRDD [key#0,value#1], MapPartitionsRDD[1] == Physical Plan == Sort [key#0 ASC], true Exchange (RangePartitioning [key#0 ASC], 5), [] Project [key#0] PhysicalRDD [key#0,value#1], MapPartitionsRDD[1] ``` with this rule we will first do column pruning on the table and then do sorting. Author: scwf <wangfei1@huawei.com> This patch had conflicts when merged, resolved by Committer: Michael Armbrust <michael@databricks.com> Closes #5838 from scwf/pruning and squashes the following commits: b00d833 [scwf] address michael's comment e230155 [scwf] fix tests failure b09b895 [scwf] improve column pruning --- .../sql/catalyst/optimizer/Optimizer.scala | 5 +++ .../optimizer/FilterPushdownSuite.scala | 36 ++++++++++++++++++- 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index b163707cc9..c2818d957c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -156,6 +156,11 @@ object ColumnPruning extends Rule[LogicalPlan] { case Project(projectList, Limit(exp, child)) => Limit(exp, Project(projectList, child)) + // push down project if possible when the child is sort + case p @ Project(projectList, s @ Sort(_, _, grandChild)) + if s.references.subsetOf(p.outputSet) => + s.copy(child = Project(projectList, grandChild)) + // Eliminate no-op Projects case Project(projectList, child) if child.output == projectList => child } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 0c428f7231..be33cb9bb8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries -import org.apache.spark.sql.catalyst.expressions.{Count, Explode} +import org.apache.spark.sql.catalyst.expressions.{SortOrder, Ascending, Count, Explode} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.rules._ @@ -542,4 +542,38 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, originalQuery) } + + test("push down project past sort") { + val x = testRelation.subquery('x) + + // push down valid + val originalQuery = { + x.select('a, 'b) + .sortBy(SortOrder('a, Ascending)) + .select('a) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + x.select('a) + .sortBy(SortOrder('a, Ascending)).analyze + + comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) + + // push down invalid + val originalQuery1 = { + x.select('a, 'b) + .sortBy(SortOrder('a, Ascending)) + .select('b) + } + + val optimized1 = Optimize.execute(originalQuery1.analyze) + val correctAnswer1 = + x.select('a, 'b) + .sortBy(SortOrder('a, Ascending)) + .select('b).analyze + + comparePlans(optimized1, analysis.EliminateSubQueries(correctAnswer1)) + + } } -- GitLab