Skip to content
Snippets Groups Projects
Commit 8e097890 authored by Josh Rosen's avatar Josh Rosen Committed by Wenchen Fan
Browse files

[SPARK-20686][SQL] PropagateEmptyRelation incorrectly handles aggregate without grouping


The query

```
SELECT 1 FROM (SELECT COUNT(*) WHERE FALSE) t1
```

should return a single row of output because the subquery is an aggregate without a group-by and thus should return a single row. However, Spark incorrectly returns zero rows.

This is caused by SPARK-16208 / #13906, a patch which added an optimizer rule to propagate EmptyRelation through operators. The logic for handling aggregates is wrong: it checks whether aggregate expressions are non-empty for deciding whether the output should be empty, whereas it should be checking grouping expressions instead:

An aggregate with non-empty grouping expression will return one output row per group. If the input to the grouped aggregate is empty then all groups will be empty and thus the output will be empty. It doesn't matter whether the aggregation output columns include aggregate expressions since that won't affect the number of output rows.

If the grouping expressions are empty, however, then the aggregate will always produce a single output row and thus we cannot propagate the EmptyRelation.

The current implementation is incorrect and also misses an optimization opportunity by not propagating EmptyRelation in the case where a grouped aggregate has aggregate expressions (in other words, `SELECT COUNT(*) from emptyRelation GROUP BY x` would _not_ be optimized to `EmptyRelation` in the old code, even though it safely could be).

This patch resolves this issue by modifying `PropagateEmptyRelation` to consider only the presence/absence of grouping expressions, not the aggregate functions themselves, when deciding whether to propagate EmptyRelation.

- Added end-to-end regression tests in `SQLQueryTest`'s `group-by.sql` file.
- Updated unit tests in `PropagateEmptyRelationSuite`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #17929 from JoshRosen/fix-PropagateEmptyRelation.

(cherry picked from commit a90c5cd8)
Signed-off-by: default avatarWenchen Fan <wenchen@databricks.com>
parent 50f28dfe
No related branches found
No related tags found
No related merge requests found
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.spark.sql.catalyst.optimizer package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.rules._
...@@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.rules._ ...@@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.rules._
* - Join with one or two empty children (including Intersect/Except). * - Join with one or two empty children (including Intersect/Except).
* 2. Unary-node Logical Plans * 2. Unary-node Logical Plans
* - Project/Filter/Sample/Join/Limit/Repartition with all empty children. * - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
* - Aggregate with all empty children and without AggregateFunction expressions like COUNT. * - Aggregate with all empty children and at least one grouping expression.
* - Generate(Explode) with all empty children. Others like Hive UDTF may return results. * - Generate(Explode) with all empty children. Others like Hive UDTF may return results.
*/ */
object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper { object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
...@@ -39,10 +38,6 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper { ...@@ -39,10 +38,6 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
case _ => false case _ => false
} }
private def containsAggregateExpression(e: Expression): Boolean = {
e.collectFirst { case _: AggregateFunction => () }.isDefined
}
private def empty(plan: LogicalPlan) = LocalRelation(plan.output, data = Seq.empty) private def empty(plan: LogicalPlan) = LocalRelation(plan.output, data = Seq.empty)
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
...@@ -68,8 +63,13 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper { ...@@ -68,8 +63,13 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
case _: LocalLimit => empty(p) case _: LocalLimit => empty(p)
case _: Repartition => empty(p) case _: Repartition => empty(p)
case _: RepartitionByExpression => empty(p) case _: RepartitionByExpression => empty(p)
// AggregateExpressions like COUNT(*) return their results like 0. // An aggregate with non-empty group expression will return one output row per group when the
case Aggregate(_, ae, _) if !ae.exists(containsAggregateExpression) => empty(p) // input to the aggregate is not empty. If the input to the aggregate is empty then all groups
// will be empty and thus the output will be empty.
//
// If the grouping expressions are empty, however, then the aggregate will always produce a
// single output row and thus we cannot propagate the EmptyRelation.
case Aggregate(ge, _, _) if ge.nonEmpty => empty(p)
// Generators like Hive-style UDTF may return their records within `close`. // Generators like Hive-style UDTF may return their records within `close`.
case Generate(_: Explode, _, _, _, _, _) => empty(p) case Generate(_: Explode, _, _, _, _, _) => empty(p)
case _ => p case _ => p
......
...@@ -142,7 +142,7 @@ class PropagateEmptyRelationSuite extends PlanTest { ...@@ -142,7 +142,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
comparePlans(optimized, correctAnswer.analyze) comparePlans(optimized, correctAnswer.analyze)
} }
test("propagate empty relation through Aggregate without aggregate function") { test("propagate empty relation through Aggregate with grouping expressions") {
val query = testRelation1 val query = testRelation1
.where(false) .where(false)
.groupBy('a)('a, ('a + 1).as('x)) .groupBy('a)('a, ('a + 1).as('x))
...@@ -153,13 +153,13 @@ class PropagateEmptyRelationSuite extends PlanTest { ...@@ -153,13 +153,13 @@ class PropagateEmptyRelationSuite extends PlanTest {
comparePlans(optimized, correctAnswer) comparePlans(optimized, correctAnswer)
} }
test("don't propagate empty relation through Aggregate with aggregate function") { test("don't propagate empty relation through Aggregate without grouping expressions") {
val query = testRelation1 val query = testRelation1
.where(false) .where(false)
.groupBy('a)(count('a)) .groupBy()()
val optimized = Optimize.execute(query.analyze) val optimized = Optimize.execute(query.analyze)
val correctAnswer = LocalRelation('a.int).groupBy('a)(count('a)).analyze val correctAnswer = LocalRelation('a.int).groupBy()().analyze
comparePlans(optimized, correctAnswer) comparePlans(optimized, correctAnswer)
} }
......
...@@ -35,3 +35,10 @@ FROM testData; ...@@ -35,3 +35,10 @@ FROM testData;
-- Aggregate with foldable input and multiple distinct groups. -- Aggregate with foldable input and multiple distinct groups.
SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) GROUP BY a; SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) GROUP BY a;
-- Aggregate with empty input and non-empty GroupBy expressions.
SELECT a, COUNT(1) FROM testData WHERE false GROUP BY a;
-- Aggregate with empty input and empty GroupBy expressions.
SELECT COUNT(1) FROM testData WHERE false;
SELECT 1 FROM (SELECT COUNT(1) FROM testData WHERE false) t;
-- Automatically generated by SQLQueryTestSuite -- Automatically generated by SQLQueryTestSuite
-- Number of queries: 15 -- Number of queries: 18
-- !query 0 -- !query 0
...@@ -139,3 +139,27 @@ SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS ...@@ -139,3 +139,27 @@ SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS
struct<count(DISTINCT b):bigint,count(DISTINCT b, c):bigint> struct<count(DISTINCT b):bigint,count(DISTINCT b, c):bigint>
-- !query 14 output -- !query 14 output
1 1 1 1
-- !query 15
SELECT a, COUNT(1) FROM testData WHERE false GROUP BY a
-- !query 15 schema
struct<a:int,count(1):bigint>
-- !query 15 output
-- !query 16
SELECT COUNT(1) FROM testData WHERE false
-- !query 16 schema
struct<count(1):bigint>
-- !query 16 output
0
-- !query 17
SELECT 1 FROM (SELECT COUNT(1) FROM testData WHERE false) t
-- !query 17 schema
struct<1:int>
-- !query 17 output
1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment