Skip to content
Snippets Groups Projects
Commit 2a105134 authored by Dongjoon Hyun's avatar Dongjoon Hyun Committed by Herman van Hovell
Browse files

[SPARK-16771][SQL] WITH clause should not fall into infinite loop.

## What changes were proposed in this pull request?

This PR changes the CTE resolving rule to use only **forward-declared** tables in order to prevent infinite loops. More specifically, new logic is like the following.

* Resolve CTEs in `WITH` clauses first before replacing the main SQL body.
* When resolving CTEs, only forward-declared CTEs or base tables are referenced.
  - Self-referencing is not allowed any more.
  - Cross-referencing is not allowed any more.

**Reported Error Scenarios**
```scala
scala> sql("WITH t AS (SELECT 1 FROM t) SELECT * FROM t")
java.lang.StackOverflowError
...
scala> sql("WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2")
java.lang.StackOverflowError
...
```
Note that `t`, `t1`, and `t2` are not declared in database. Spark falls into infinite loops before resolving table names.

## How was this patch tested?

Pass the Jenkins tests with new two testcases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #14397 from dongjoon-hyun/SPARK-16771-TREENODE.
parent bbae20ad
No related branches found
No related tags found
No related merge requests found
......@@ -125,22 +125,22 @@ class Analyzer(
object CTESubstitution extends Rule[LogicalPlan] {
// TODO allow subquery to define CTE
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case With(child, relations) => substituteCTE(child, relations)
case With(child, relations) =>
substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) {
case (resolved, (name, relation)) =>
resolved :+ name -> ResolveRelations(substituteCTE(relation, resolved))
})
case other => other
}
def substituteCTE(plan: LogicalPlan, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
plan transform {
// In hive, if there is same table name in database and CTE definition,
// hive will use the table in database, not the CTE one.
// Taking into account the reasonableness and the implementation complexity,
// here use the CTE definition first, check table name only and ignore database name
// see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info
def substituteCTE(plan: LogicalPlan, cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = {
plan transformDown {
case u : UnresolvedRelation =>
val substituted = cteRelations.get(u.tableIdentifier.table).map { relation =>
val withAlias = u.alias.map(SubqueryAlias(_, relation))
withAlias.getOrElse(relation)
}
val substituted = cteRelations.find(x => resolver(x._1, u.tableIdentifier.table))
.map(_._2).map { relation =>
val withAlias = u.alias.map(SubqueryAlias(_, relation))
withAlias.getOrElse(relation)
}
substituted.getOrElse(u)
case other =>
// This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.
......
......@@ -97,7 +97,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
}
// Check for duplicate names.
checkDuplicateKeys(ctes, ctx)
With(query, ctes.toMap)
With(query, ctes)
}
}
......
......@@ -392,11 +392,10 @@ case class InsertIntoTable(
* This operator will be removed during analysis and the relations will be substituted into child.
*
* @param child The final query of this CTE.
* @param cteRelations Queries that this CTE defined,
* key is the alias of the CTE definition,
* value is the CTE definition.
* @param cteRelations A sequence of pair (alias, the CTE definition) that this CTE defined
* Each CTE can see the base tables and the previously defined CTEs only.
*/
case class With(child: LogicalPlan, cteRelations: Map[String, SubqueryAlias]) extends UnaryNode {
case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}
......
......@@ -81,7 +81,7 @@ class PlanParserSuite extends PlanTest {
val ctes = namedPlans.map {
case (name, cte) =>
name -> SubqueryAlias(name, cte)
}.toMap
}
With(plan, ctes)
}
assertEqual(
......
create temporary view t as select * from values 0, 1, 2 as t(id);
create temporary view t2 as select * from values 0, 1 as t(id);
-- WITH clause should not fall into infinite loop by referencing self
WITH s AS (SELECT 1 FROM s) SELECT * FROM s;
-- WITH clause should reference the base table
WITH t AS (SELECT 1 FROM t) SELECT * FROM t;
-- WITH clause should not allow cross reference
WITH s1 AS (SELECT 1 FROM s2), s2 AS (SELECT 1 FROM s1) SELECT * FROM s1, s2;
-- WITH clause should reference the previous CTE
WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2;
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 6
-- !query 0
create temporary view t as select * from values 0, 1, 2 as t(id)
-- !query 0 schema
struct<>
-- !query 0 output
-- !query 1
create temporary view t2 as select * from values 0, 1 as t(id)
-- !query 1 schema
struct<>
-- !query 1 output
-- !query 2
WITH s AS (SELECT 1 FROM s) SELECT * FROM s
-- !query 2 schema
struct<>
-- !query 2 output
org.apache.spark.sql.AnalysisException
Table or view not found: s; line 1 pos 25
-- !query 3
WITH t AS (SELECT 1 FROM t) SELECT * FROM t
-- !query 3 schema
struct<1:int>
-- !query 3 output
1
1
1
-- !query 4
WITH s1 AS (SELECT 1 FROM s2), s2 AS (SELECT 1 FROM s1) SELECT * FROM s1, s2
-- !query 4 schema
struct<>
-- !query 4 output
org.apache.spark.sql.AnalysisException
Table or view not found: s2; line 1 pos 26
-- !query 5
WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2
-- !query 5 schema
struct<id:int,2:int>
-- !query 5 output
0 2
0 2
1 2
1 2
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment