Skip to content
Snippets Groups Projects
Commit a90c5cd8 authored by Josh Rosen's avatar Josh Rosen Committed by Wenchen Fan
Browse files

[SPARK-20686][SQL] PropagateEmptyRelation incorrectly handles aggregate without grouping

## What changes were proposed in this pull request?

The query

```
SELECT 1 FROM (SELECT COUNT(*) WHERE FALSE) t1
```

should return a single row of output because the subquery is an aggregate without a group-by and thus should return a single row. However, Spark incorrectly returns zero rows.

This is caused by SPARK-16208 / #13906, a patch which added an optimizer rule to propagate EmptyRelation through operators. The logic for handling aggregates is wrong: it checks whether aggregate expressions are non-empty for deciding whether the output should be empty, whereas it should be checking grouping expressions instead:

An aggregate with non-empty grouping expression will return one output row per group. If the input to the grouped aggregate is empty then all groups will be empty and thus the output will be empty. It doesn't matter whether the aggregation output columns include aggregate expressions since that won't affect the number of output rows.

If the grouping expressions are empty, however, then the aggregate will always produce a single output row and thus we cannot propagate the EmptyRelation.

The current implementation is incorrect and also misses an optimization opportunity by not propagating EmptyRelation in the case where a grouped aggregate has aggregate expressions (in other words, `SELECT COUNT(*) from emptyRelation GROUP BY x` would _not_ be optimized to `EmptyRelation` in the old code, even though it safely could be).

This patch resolves this issue by modifying `PropagateEmptyRelation` to consider only the presence/absence of grouping expressions, not the aggregate functions themselves, when deciding whether to propagate EmptyRelation.

## How was this patch tested?

- Added end-to-end regression tests in `SQLQueryTest`'s `group-by.sql` file.
- Updated unit tests in `PropagateEmptyRelationSuite`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #17929 from JoshRosen/fix-PropagateEmptyRelation.
parent 3d2131ab
No related branches found
No related tags found
No related merge requests found
......@@ -18,7 +18,6 @@
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
......@@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.rules._
* - Join with one or two empty children (including Intersect/Except).
* 2. Unary-node Logical Plans
* - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
* - Aggregate with all empty children and without AggregateFunction expressions like COUNT.
* - Aggregate with all empty children and at least one grouping expression.
* - Generate(Explode) with all empty children. Others like Hive UDTF may return results.
*/
object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
......@@ -39,10 +38,6 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
case _ => false
}
private def containsAggregateExpression(e: Expression): Boolean = {
e.collectFirst { case _: AggregateFunction => () }.isDefined
}
private def empty(plan: LogicalPlan) = LocalRelation(plan.output, data = Seq.empty)
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
......@@ -68,8 +63,13 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
case _: LocalLimit => empty(p)
case _: Repartition => empty(p)
case _: RepartitionByExpression => empty(p)
// AggregateExpressions like COUNT(*) return their results like 0.
case Aggregate(_, ae, _) if !ae.exists(containsAggregateExpression) => empty(p)
// An aggregate with non-empty group expression will return one output row per group when the
// input to the aggregate is not empty. If the input to the aggregate is empty then all groups
// will be empty and thus the output will be empty.
//
// If the grouping expressions are empty, however, then the aggregate will always produce a
// single output row and thus we cannot propagate the EmptyRelation.
case Aggregate(ge, _, _) if ge.nonEmpty => empty(p)
// Generators like Hive-style UDTF may return their records within `close`.
case Generate(_: Explode, _, _, _, _, _) => empty(p)
case _ => p
......
......@@ -142,7 +142,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
comparePlans(optimized, correctAnswer.analyze)
}
test("propagate empty relation through Aggregate without aggregate function") {
test("propagate empty relation through Aggregate with grouping expressions") {
val query = testRelation1
.where(false)
.groupBy('a)('a, ('a + 1).as('x))
......@@ -153,13 +153,13 @@ class PropagateEmptyRelationSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
test("don't propagate empty relation through Aggregate with aggregate function") {
test("don't propagate empty relation through Aggregate without grouping expressions") {
val query = testRelation1
.where(false)
.groupBy('a)(count('a))
.groupBy()()
val optimized = Optimize.execute(query.analyze)
val correctAnswer = LocalRelation('a.int).groupBy('a)(count('a)).analyze
val correctAnswer = LocalRelation('a.int).groupBy()().analyze
comparePlans(optimized, correctAnswer)
}
......
......@@ -53,3 +53,10 @@ set spark.sql.groupByAliases=false;
-- Check analysis exceptions
SELECT a AS k, COUNT(b) FROM testData GROUP BY k;
-- Aggregate with empty input and non-empty GroupBy expressions.
SELECT a, COUNT(1) FROM testData WHERE false GROUP BY a;
-- Aggregate with empty input and empty GroupBy expressions.
SELECT COUNT(1) FROM testData WHERE false;
SELECT 1 FROM (SELECT COUNT(1) FROM testData WHERE false) t;
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 22
-- Number of queries: 25
-- !query 0
......@@ -203,3 +203,27 @@ struct<>
-- !query 21 output
org.apache.spark.sql.AnalysisException
cannot resolve '`k`' given input columns: [a, b]; line 1 pos 47
-- !query 22
SELECT a, COUNT(1) FROM testData WHERE false GROUP BY a
-- !query 22 schema
struct<a:int,count(1):bigint>
-- !query 22 output
-- !query 23
SELECT COUNT(1) FROM testData WHERE false
-- !query 23 schema
struct<count(1):bigint>
-- !query 23 output
0
-- !query 24
SELECT 1 FROM (SELECT COUNT(1) FROM testData WHERE false) t
-- !query 24 schema
struct<1:int>
-- !query 24 output
1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment