Skip to content
Snippets Groups Projects
Commit d95e4d9d authored by Dilip Biswal's avatar Dilip Biswal Committed by Herman van Hovell
Browse files

[SPARK-20334][SQL] Return a better error message when correlated predicates...

[SPARK-20334][SQL] Return a better error message when correlated predicates contain aggregate expression that has mixture of outer and local references.

## What changes were proposed in this pull request?
Address a follow up in [comment](https://github.com/apache/spark/pull/16954#discussion_r105718880)
Currently subqueries with correlated predicates containing aggregate expression having mixture of outer references and local references generate a codegen error like following :

```SQL
SELECT t1a
FROM   t1
GROUP  BY 1
HAVING EXISTS (SELECT 1
               FROM  t2
               WHERE t2a < min(t1a + t2a));
```
Exception snippet.
```
Cannot evaluate expression: min((input[0, int, false] + input[4, int, false]))
	at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:226)
	at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:87)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:106)
	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:103)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:103)

```
After this PR, a better error message is issued.
```
org.apache.spark.sql.AnalysisException
Error in query: Found an aggregate expression in a correlated
predicate that has both outer and local references, which is not supported yet.
Aggregate expression: min((t1.`t1a` + t2.`t2a`)),
Outer references: t1.`t1a`,
Local references: t2.`t2a`.;
```
## How was this patch tested?
Added tests in SQLQueryTestSuite.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #17636 from dilipbiswal/subquery_followup1.
parent b2ebadfd
No related branches found
No related tags found
No related merge requests found
......@@ -1204,6 +1204,28 @@ class Analyzer(
private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = {
val outerReferences = ArrayBuffer.empty[Expression]
// Validate that correlated aggregate expression do not contain a mixture
// of outer and local references.
def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = {
expr.foreach {
case a: AggregateExpression if containsOuter(a) =>
val outer = a.collect { case OuterReference(e) => e.toAttribute }
val local = a.references -- outer
if (local.nonEmpty) {
val msg =
s"""
|Found an aggregate expression in a correlated predicate that has both
|outer and local references, which is not supported yet.
|Aggregate expression: ${SubExprUtils.stripOuterReference(a).sql},
|Outer references: ${outer.map(_.sql).mkString(", ")},
|Local references: ${local.map(_.sql).mkString(", ")}.
""".stripMargin.replace("\n", " ").trim()
failAnalysis(msg)
}
case _ =>
}
}
// Make sure a plan's subtree does not contain outer references
def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
if (hasOuterReferences(p)) {
......@@ -1211,9 +1233,12 @@ class Analyzer(
}
}
// Make sure a plan's expressions do not contain outer references
def failOnOuterReference(p: LogicalPlan): Unit = {
if (p.expressions.exists(containsOuter)) {
// Make sure a plan's expressions do not contain :
// 1. Aggregate expressions that have mixture of outer and local references.
// 2. Expressions containing outer references on plan nodes other than Filter.
def failOnInvalidOuterReference(p: LogicalPlan): Unit = {
p.expressions.foreach(checkMixedReferencesInsideAggregateExpr)
if (!p.isInstanceOf[Filter] && p.expressions.exists(containsOuter)) {
failAnalysis(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING " +
s"clauses:\n$p")
......@@ -1283,9 +1308,9 @@ class Analyzer(
// These operators can be anywhere in a correlated subquery.
// so long as they do not host outer references in the operators.
case s: Sort =>
failOnOuterReference(s)
failOnInvalidOuterReference(s)
case r: RepartitionByExpression =>
failOnOuterReference(r)
failOnInvalidOuterReference(r)
// Category 3:
// Filter is one of the two operators allowed to host correlated expressions.
......@@ -1299,6 +1324,8 @@ class Analyzer(
case _: EqualTo | _: EqualNullSafe => false
case _ => true
}
failOnInvalidOuterReference(f)
// The aggregate expressions are treated in a special way by getOuterReferences. If the
// aggregate expression contains only outer reference attributes then the entire aggregate
// expression is isolated as an OuterReference.
......@@ -1308,7 +1335,7 @@ class Analyzer(
// Project cannot host any correlated expressions
// but can be anywhere in a correlated subquery.
case p: Project =>
failOnOuterReference(p)
failOnInvalidOuterReference(p)
// Aggregate cannot host any correlated expressions
// It can be on a correlation path if the correlation contains
......@@ -1316,7 +1343,7 @@ class Analyzer(
// It cannot be on a correlation path if the correlation has
// non-equality correlated predicates.
case a: Aggregate =>
failOnOuterReference(a)
failOnInvalidOuterReference(a)
failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a)
// Join can host correlated expressions.
......@@ -1324,7 +1351,7 @@ class Analyzer(
joinType match {
// Inner join, like Filter, can be anywhere.
case _: InnerLike =>
failOnOuterReference(j)
failOnInvalidOuterReference(j)
// Left outer join's right operand cannot be on a correlation path.
// LeftAnti and ExistenceJoin are special cases of LeftOuter.
......@@ -1335,12 +1362,12 @@ class Analyzer(
// Any correlated references in the subplan
// of the right operand cannot be pulled up.
case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) =>
failOnOuterReference(j)
failOnInvalidOuterReference(j)
failOnOuterReferenceInSubTree(right)
// Likewise, Right outer join's left operand cannot be on a correlation path.
case RightOuter =>
failOnOuterReference(j)
failOnInvalidOuterReference(j)
failOnOuterReferenceInSubTree(left)
// Any other join types not explicitly listed above,
......@@ -1356,7 +1383,7 @@ class Analyzer(
// Note:
// Generator with join=false is treated as Category 4.
case g: Generate if g.join =>
failOnOuterReference(g)
failOnInvalidOuterReference(g)
// Category 4: Any other operators not in the above 3 categories
// cannot be on a correlation path, that is they are allowed only
......
-- The test file contains negative test cases
-- of invalid queries where error messages are expected.
create temporary view t1 as select * from values
CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
(1, 2, 3)
as t1(t1a, t1b, t1c);
AS t1(t1a, t1b, t1c);
create temporary view t2 as select * from values
CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
(1, 0, 1)
as t2(t2a, t2b, t2c);
AS t2(t2a, t2b, t2c);
create temporary view t3 as select * from values
CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES
(3, 1, 2)
as t3(t3a, t3b, t3c);
AS t3(t3a, t3b, t3c);
-- TC 01.01
-- The column t2b in the SELECT of the subquery is invalid
-- because it is neither an aggregate function nor a GROUP BY column.
select t1a, t2b
from t1, t2
where t1b = t2c
and t2b = (select max(avg)
from (select t2b, avg(t2b) avg
from t2
where t2a = t1.t1b
SELECT t1a, t2b
FROM t1, t2
WHERE t1b = t2c
AND t2b = (SELECT max(avg)
FROM (SELECT t2b, avg(t2b) avg
FROM t2
WHERE t2a = t1.t1b
)
)
;
-- TC 01.02
-- Invalid due to the column t2b not part of the output from table t2.
select *
from t1
where t1a in (select min(t2a)
from t2
group by t2c
having t2c in (select max(t3c)
from t3
group by t3b
having t3b > t2b ))
SELECT *
FROM t1
WHERE t1a IN (SELECT min(t2a)
FROM t2
GROUP BY t2c
HAVING t2c IN (SELECT max(t3c)
FROM t3
GROUP BY t3b
HAVING t3b > t2b ))
;
-- TC 01.03
-- Invalid due to mixure of outer and local references under an AggegatedExpression
-- in a correlated predicate
SELECT t1a
FROM t1
GROUP BY 1
HAVING EXISTS (SELECT 1
FROM t2
WHERE t2a < min(t1a + t2a));
-- TC 01.04
-- Invalid due to mixure of outer and local references under an AggegatedExpression
SELECT t1a
FROM t1
WHERE t1a IN (SELECT t2a
FROM t2
WHERE EXISTS (SELECT 1
FROM t3
GROUP BY 1
HAVING min(t2a + t3a) > 1));
-- TC 01.05
-- Invalid due to outer reference appearing in projection list
SELECT t1a
FROM t1
WHERE t1a IN (SELECT t2a
FROM t2
WHERE EXISTS (SELECT min(t2a)
FROM t3));
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 5
-- Number of queries: 8
-- !query 0
create temporary view t1 as select * from values
CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
(1, 2, 3)
as t1(t1a, t1b, t1c)
AS t1(t1a, t1b, t1c)
-- !query 0 schema
struct<>
-- !query 0 output
......@@ -13,9 +13,9 @@ struct<>
-- !query 1
create temporary view t2 as select * from values
CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
(1, 0, 1)
as t2(t2a, t2b, t2c)
AS t2(t2a, t2b, t2c)
-- !query 1 schema
struct<>
-- !query 1 output
......@@ -23,9 +23,9 @@ struct<>
-- !query 2
create temporary view t3 as select * from values
CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES
(3, 1, 2)
as t3(t3a, t3b, t3c)
AS t3(t3a, t3b, t3c)
-- !query 2 schema
struct<>
-- !query 2 output
......@@ -33,13 +33,13 @@ struct<>
-- !query 3
select t1a, t2b
from t1, t2
where t1b = t2c
and t2b = (select max(avg)
from (select t2b, avg(t2b) avg
from t2
where t2a = t1.t1b
SELECT t1a, t2b
FROM t1, t2
WHERE t1b = t2c
AND t2b = (SELECT max(avg)
FROM (SELECT t2b, avg(t2b) avg
FROM t2
WHERE t2a = t1.t1b
)
)
-- !query 3 schema
......@@ -50,17 +50,67 @@ grouping expressions sequence is empty, and 't2.`t2b`' is not an aggregate funct
-- !query 4
select *
from t1
where t1a in (select min(t2a)
from t2
group by t2c
having t2c in (select max(t3c)
from t3
group by t3b
having t3b > t2b ))
SELECT *
FROM t1
WHERE t1a IN (SELECT min(t2a)
FROM t2
GROUP BY t2c
HAVING t2c IN (SELECT max(t3c)
FROM t3
GROUP BY t3b
HAVING t3b > t2b ))
-- !query 4 schema
struct<>
-- !query 4 output
org.apache.spark.sql.AnalysisException
resolved attribute(s) t2b#x missing from min(t2a)#x,t2c#x in operator !Filter t2c#x IN (list#x [t2b#x]);
-- !query 5
SELECT t1a
FROM t1
GROUP BY 1
HAVING EXISTS (SELECT 1
FROM t2
WHERE t2a < min(t1a + t2a))
-- !query 5 schema
struct<>
-- !query 5 output
org.apache.spark.sql.AnalysisException
Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t1.`t1a` + t2.`t2a`)), Outer references: t1.`t1a`, Local references: t2.`t2a`.;
-- !query 6
SELECT t1a
FROM t1
WHERE t1a IN (SELECT t2a
FROM t2
WHERE EXISTS (SELECT 1
FROM t3
GROUP BY 1
HAVING min(t2a + t3a) > 1))
-- !query 6 schema
struct<>
-- !query 6 output
org.apache.spark.sql.AnalysisException
Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t2.`t2a` + t3.`t3a`)), Outer references: t2.`t2a`, Local references: t3.`t3a`.;
-- !query 7
SELECT t1a
FROM t1
WHERE t1a IN (SELECT t2a
FROM t2
WHERE EXISTS (SELECT min(t2a)
FROM t3))
-- !query 7 schema
struct<>
-- !query 7 output
org.apache.spark.sql.AnalysisException
Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses:
Aggregate [min(outer(t2a#x)) AS min(outer())#x]
+- SubqueryAlias t3
+- Project [t3a#x, t3b#x, t3c#x]
+- SubqueryAlias t3
+- LocalRelation [t3a#x, t3b#x, t3c#x]
;
......@@ -822,12 +822,25 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
checkAnswer(
sql(
"""
| select c2
| from t1
| where exists (select *
| from t2 lateral view explode(arr_c2) q as c2
where t1.c1 = t2.c1)""".stripMargin),
| SELECT c2
| FROM t1
| WHERE EXISTS (SELECT *
| FROM t2 LATERAL VIEW explode(arr_c2) q AS c2
WHERE t1.c1 = t2.c1)""".stripMargin),
Row(1) :: Row(0) :: Nil)
val msg1 = intercept[AnalysisException] {
sql(
"""
| SELECT c1
| FROM t2
| WHERE EXISTS (SELECT *
| FROM t1 LATERAL VIEW explode(t2.arr_c2) q AS c2
| WHERE t1.c1 = t2.c1)
""".stripMargin)
}
assert(msg1.getMessage.contains(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING"))
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment