From 708794e8aae2c66bd291bab4f12117c33b57840c Mon Sep 17 00:00:00 2001 From: Wenchen Fan <cloud0fan@outlook.com> Date: Wed, 29 Jul 2015 00:08:45 -0700 Subject: [PATCH] [SPARK-9251][SQL] do not order by expressions which still need evaluation as an offline discussion with rxin , it's weird to be computing stuff while doing sorting, we should only order by bound reference during execution. Author: Wenchen Fan <cloud0fan@outlook.com> Closes #7593 from cloud-fan/sort and squashes the following commits: 7b1bef7 [Wenchen Fan] add test daf206d [Wenchen Fan] add more comments 289bee0 [Wenchen Fan] do not order by expressions which still need evaluation --- .../sql/catalyst/analysis/Analyzer.scala | 58 +++++++++++++++++++ .../sql/catalyst/expressions/random.scala | 4 +- .../plans/logical/basicOperators.scala | 13 +++-- .../sql/catalyst/analysis/AnalysisSuite.scala | 36 ++++++++++-- .../scala/org/apache/spark/sql/TestData.scala | 2 - 5 files changed, 101 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a6ea0cc0a8..265f3d1e41 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -79,6 +79,7 @@ class Analyzer( ExtractWindowExpressions :: GlobalAggregates :: UnresolvedHavingClauseAttributes :: + RemoveEvaluationFromSort :: HiveTypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Nondeterministic", Once, @@ -947,6 +948,63 @@ class Analyzer( Project(p.output, newPlan.withNewChildren(newChild :: Nil)) } } + + /** + * Removes all still-need-evaluate ordering expressions from sort and use an inner project to + * materialize them, finally use a outer project to project them away to keep the result same. + * Then we can make sure we only sort by [[AttributeReference]]s. + * + * As an example, + * {{{ + * Sort('a, 'b + 1, + * Relation('a, 'b)) + * }}} + * will be turned into: + * {{{ + * Project('a, 'b, + * Sort('a, '_sortCondition, + * Project('a, 'b, ('b + 1).as("_sortCondition"), + * Relation('a, 'b)))) + * }}} + */ + object RemoveEvaluationFromSort extends Rule[LogicalPlan] { + private def hasAlias(expr: Expression) = { + expr.find { + case a: Alias => true + case _ => false + }.isDefined + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // The ordering expressions have no effect to the output schema of `Sort`, + // so `Alias`s in ordering expressions are unnecessary and we should remove them. + case s @ Sort(ordering, _, _) if ordering.exists(hasAlias) => + val newOrdering = ordering.map(_.transformUp { + case Alias(child, _) => child + }.asInstanceOf[SortOrder]) + s.copy(order = newOrdering) + + case s @ Sort(ordering, global, child) + if s.expressions.forall(_.resolved) && s.childrenResolved && !s.hasNoEvaluation => + + val (ref, needEval) = ordering.partition(_.child.isInstanceOf[AttributeReference]) + + val namedExpr = needEval.map(_.child match { + case n: NamedExpression => n + case e => Alias(e, "_sortCondition")() + }) + + val newOrdering = ref ++ needEval.zip(namedExpr).map { case (order, ne) => + order.copy(child = ne.toAttribute) + } + + // Add still-need-evaluate ordering expressions into inner project and then project + // them away after the sort. + Project(child.output, + Sort(newOrdering, global, + Project(child.output ++ namedExpr, child))) + } + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala index 8f30519697..62d3d204ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala @@ -66,7 +66,7 @@ case class Rand(seed: Long) extends RDG { val rngTerm = ctx.freshName("rng") val className = classOf[XORShiftRandom].getName ctx.addMutableState(className, rngTerm, - s"$rngTerm = new $className($seed + org.apache.spark.TaskContext.getPartitionId());") + s"$rngTerm = new $className(${seed}L + org.apache.spark.TaskContext.getPartitionId());") ev.isNull = "false" s""" final ${ctx.javaType(dataType)} ${ev.primitive} = $rngTerm.nextDouble(); @@ -89,7 +89,7 @@ case class Randn(seed: Long) extends RDG { val rngTerm = ctx.freshName("rng") val className = classOf[XORShiftRandom].getName ctx.addMutableState(className, rngTerm, - s"$rngTerm = new $className($seed + org.apache.spark.TaskContext.getPartitionId());") + s"$rngTerm = new $className(${seed}L + org.apache.spark.TaskContext.getPartitionId());") ev.isNull = "false" s""" final ${ctx.javaType(dataType)} ${ev.primitive} = $rngTerm.nextGaussian(); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index af68358daf..ad5af19578 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -33,7 +33,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend }.nonEmpty ) - !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions + expressions.forall(_.resolved) && childrenResolved && !hasSpecialExpressions } } @@ -67,7 +67,7 @@ case class Generate( generator.resolved && childrenResolved && generator.elementTypes.length == generatorOutput.length && - !generatorOutput.exists(!_.resolved) + generatorOutput.forall(_.resolved) } // we don't want the gOutput to be taken as part of the expressions @@ -187,7 +187,7 @@ case class WithWindowDefinition( } /** - * @param order The ordering expressions + * @param order The ordering expressions, should all be [[AttributeReference]] * @param global True means global sorting apply for entire data set, * False means sorting only apply within the partition. * @param child Child logical plan @@ -197,6 +197,11 @@ case class Sort( global: Boolean, child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output + + def hasNoEvaluation: Boolean = order.forall(_.child.isInstanceOf[AttributeReference]) + + override lazy val resolved: Boolean = + expressions.forall(_.resolved) && childrenResolved && hasNoEvaluation } case class Aggregate( @@ -211,7 +216,7 @@ case class Aggregate( }.nonEmpty ) - !expressions.exists(!_.resolved) && childrenResolved && !hasWindowExpressions + expressions.forall(_.resolved) && childrenResolved && !hasWindowExpressions } override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 221b4e92f0..a86cefe941 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -165,11 +165,39 @@ class AnalysisSuite extends AnalysisTest { test("pull out nondeterministic expressions from Sort") { val plan = Sort(Seq(SortOrder(Rand(33), Ascending)), false, testRelation) - val projected = Alias(Rand(33), "_nondeterministic")() + val analyzed = caseSensitiveAnalyzer.execute(plan) + analyzed.transform { + case s: Sort if s.expressions.exists(!_.deterministic) => + fail("nondeterministic expressions are not allowed in Sort") + } + } + + test("remove still-need-evaluate ordering expressions from sort") { + val a = testRelation2.output(0) + val b = testRelation2.output(1) + + def makeOrder(e: Expression): SortOrder = SortOrder(e, Ascending) + + val noEvalOrdering = makeOrder(a) + val noEvalOrderingWithAlias = makeOrder(Alias(Alias(b, "name1")(), "name2")()) + + val needEvalExpr = Coalesce(Seq(a, Literal("1"))) + val needEvalExpr2 = Coalesce(Seq(a, b)) + val needEvalOrdering = makeOrder(needEvalExpr) + val needEvalOrdering2 = makeOrder(needEvalExpr2) + + val plan = Sort( + Seq(noEvalOrdering, noEvalOrderingWithAlias, needEvalOrdering, needEvalOrdering2), + false, testRelation2) + + val evaluatedOrdering = makeOrder(AttributeReference("_sortCondition", StringType)()) + val materializedExprs = Seq(needEvalExpr, needEvalExpr2).map(e => Alias(e, "_sortCondition")()) + val expected = - Project(testRelation.output, - Sort(Seq(SortOrder(projected.toAttribute, Ascending)), false, - Project(testRelation.output :+ projected, testRelation))) + Project(testRelation2.output, + Sort(Seq(makeOrder(a), makeOrder(b), evaluatedOrdering, evaluatedOrdering), false, + Project(testRelation2.output ++ materializedExprs, testRelation2))) + checkAnalysis(plan, expected) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index 207d7a352c..e340f54850 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql -import java.sql.Timestamp - import org.apache.spark.sql.test.TestSQLContext.implicits._ import org.apache.spark.sql.test._ -- GitLab