Skip to content
Snippets Groups Projects
Commit 22ba2134 authored by Michael Armbrust's avatar Michael Armbrust Committed by Yin Huai
Browse files

[SPARK-13087][SQL] Fix group by function for sort based aggregation

It is not valid to call `toAttribute` on a `NamedExpression` unless we know for sure that the child produced that `NamedExpression`.  The current code worked fine when the grouping expressions were simple, but when they were a derived value this blew up at execution time.

Author: Michael Armbrust <michael@databricks.com>

Closes #11013 from marmbrus/groupByFunction-master.
parent b8666fd0
No related branches found
No related tags found
No related merge requests found
...@@ -33,15 +33,14 @@ object Utils { ...@@ -33,15 +33,14 @@ object Utils {
resultExpressions: Seq[NamedExpression], resultExpressions: Seq[NamedExpression],
child: SparkPlan): Seq[SparkPlan] = { child: SparkPlan): Seq[SparkPlan] = {
val groupingAttributes = groupingExpressions.map(_.toAttribute)
val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete)) val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete))
val completeAggregateAttributes = completeAggregateExpressions.map { val completeAggregateAttributes = completeAggregateExpressions.map {
expr => aggregateFunctionToAttribute(expr.aggregateFunction, expr.isDistinct) expr => aggregateFunctionToAttribute(expr.aggregateFunction, expr.isDistinct)
} }
SortBasedAggregate( SortBasedAggregate(
requiredChildDistributionExpressions = Some(groupingAttributes), requiredChildDistributionExpressions = Some(groupingExpressions),
groupingExpressions = groupingAttributes, groupingExpressions = groupingExpressions,
aggregateExpressions = completeAggregateExpressions, aggregateExpressions = completeAggregateExpressions,
aggregateAttributes = completeAggregateAttributes, aggregateAttributes = completeAggregateAttributes,
initialInputBufferOffset = 0, initialInputBufferOffset = 0,
......
...@@ -193,6 +193,14 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te ...@@ -193,6 +193,14 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
sqlContext.dropTempTable("emptyTable") sqlContext.dropTempTable("emptyTable")
} }
test("group by function") {
Seq((1, 2)).toDF("a", "b").registerTempTable("data")
checkAnswer(
sql("SELECT floor(a) AS a, collect_set(b) FROM data GROUP BY floor(a) ORDER BY a"),
Row(1, Array(2)) :: Nil)
}
test("empty table") { test("empty table") {
// If there is no GROUP BY clause and the table is empty, we will generate a single row. // If there is no GROUP BY clause and the table is empty, we will generate a single row.
checkAnswer( checkAnswer(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment