diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala index 80c25d0b0fb7a5b6a0e5f8a02518188ea5acba33..fffcc7c9ef53a2bbe3a984679450b11f01e9f259 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala @@ -105,12 +105,22 @@ case class AggregateExpression( } // We compute the same thing regardless of our final result. - override lazy val canonicalized: Expression = + override lazy val canonicalized: Expression = { + val normalizedAggFunc = mode match { + // For PartialMerge or Final mode, the input to the `aggregateFunction` is aggregate buffers, + // and the actual children of `aggregateFunction` is not used, here we normalize the expr id. + case PartialMerge | Final => aggregateFunction.transform { + case a: AttributeReference => a.withExprId(ExprId(0)) + } + case Partial | Complete => aggregateFunction + } + AggregateExpression( - aggregateFunction.canonicalized.asInstanceOf[AggregateFunction], + normalizedAggFunc.canonicalized.asInstanceOf[AggregateFunction], mode, isDistinct, ExprId(0)) + } override def children: Seq[Expression] = aggregateFunction :: Nil override def dataType: DataType = aggregateFunction.dataType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 51faa333307b3b3ea687a3899ef1d9bb09331575..959fcf7c7548e32eb6eb8092c643db22c82cffcd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -286,7 +286,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT def recursiveTransform(arg: Any): AnyRef = arg match { case e: Expression => transformExpression(e) - case Some(e: Expression) => Some(transformExpression(e)) + case Some(value) => Some(recursiveTransform(value)) case m: Map[_, _] => m case d: DataType => d // Avoid unpacking Structs case seq: Traversable[_] => seq.map(recursiveTransform) @@ -320,7 +320,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT productIterator.flatMap { case e: Expression => e :: Nil - case Some(e: Expression) => e :: Nil + case s: Some[_] => seqToExpressions(s.toSeq) case seq: Traversable[_] => seqToExpressions(seq) case other => Nil }.toSeq diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala index 25e4ca060ae0221272423a7989db292ca8c025d8..aaf51b5b901113f6a1dcc95607a035a334893eb1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext /** * Tests for the sameResult function for [[SparkPlan]]s. */ class SameResultSuite extends QueryTest with SharedSQLContext { + import testImplicits._ test("FileSourceScanExec: different orders of data filters and partition filters") { withTempPath { path => @@ -46,4 +48,14 @@ class SameResultSuite extends QueryTest with SharedSQLContext { df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get .asInstanceOf[FileSourceScanExec] } + + test("SPARK-20725: partial aggregate should behave correctly for sameResult") { + val df1 = spark.range(10).agg(sum($"id")) + val df2 = spark.range(10).agg(sum($"id")) + assert(df1.queryExecution.executedPlan.sameResult(df2.queryExecution.executedPlan)) + + val df3 = spark.range(10).agg(sumDistinct($"id")) + val df4 = spark.range(10).agg(sumDistinct($"id")) + assert(df3.queryExecution.executedPlan.sameResult(df4.queryExecution.executedPlan)) + } }