From 789bdbe3d0d9558043872161bdfa148ec021a849 Mon Sep 17 00:00:00 2001
From: Wenchen Fan <wenchen@databricks.com>
Date: Wed, 10 May 2017 19:30:00 +0800
Subject: [PATCH] [SPARK-20688][SQL] correctly check analysis for scalar
 sub-queries

## What changes were proposed in this pull request?

In `CheckAnalysis`, we should call `checkAnalysis` for `ScalarSubquery` at the beginning, as later we will call `plan.output` which is invalid if `plan` is not resolved.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17930 from cloud-fan/tmp.
---
 .../spark/sql/catalyst/analysis/CheckAnalysis.scala    |  6 +++---
 .../scala/org/apache/spark/sql/SubquerySuite.scala     | 10 +++++++++-
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 61797bc34d..ea4560aac7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -130,12 +130,13 @@ trait CheckAnalysis extends PredicateHelper {
             }
 
           case s @ ScalarSubquery(query, conditions, _) =>
+            checkAnalysis(query)
+
             // If no correlation, the output must be exactly one column
             if (conditions.isEmpty && query.output.size != 1) {
               failAnalysis(
                 s"Scalar subquery must return only one column, but got ${query.output.size}")
-            }
-            else if (conditions.nonEmpty) {
+            } else if (conditions.nonEmpty) {
               def checkAggregate(agg: Aggregate): Unit = {
                 // Make sure correlated scalar subqueries contain one row for every outer row by
                 // enforcing that they are aggregates containing exactly one aggregate expression.
@@ -179,7 +180,6 @@ trait CheckAnalysis extends PredicateHelper {
                 case fail => failAnalysis(s"Correlated scalar subqueries must be Aggregated: $fail")
               }
             }
-            checkAnalysis(query)
             s
 
           case s: SubqueryExpression =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 131abf7c1e..a01eb2a216 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -72,7 +72,7 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
     }
   }
 
-  test("rdd deserialization does not crash [SPARK-15791]") {
+  test("SPARK-15791: rdd deserialization does not crash") {
     sql("select (select 1 as b) as b").rdd.count()
   }
 
@@ -867,4 +867,12 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
       sql("select * from l, r where l.a = r.c + 1 AND (exists (select * from r) OR l.a = r.c)"),
       Row(3, 3.0, 2, 3.0) :: Row(3, 3.0, 2, 3.0) :: Nil)
   }
+
+  test("SPARK-20688: correctly check analysis for scalar sub-queries") {
+    withTempView("t") {
+      Seq(1 -> "a").toDF("i", "j").createTempView("t")
+      val e = intercept[AnalysisException](sql("SELECT (SELECT count(*) FROM t WHERE a = 1)"))
+      assert(e.message.contains("cannot resolve '`a`' given input columns: [i, j]"))
+    }
+  }
 }
-- 
GitLab