From aa4ca8b873fd83e64e5faea6f7febcc830e30b02 Mon Sep 17 00:00:00 2001
From: Michael Armbrust <michael@databricks.com>
Date: Thu, 12 Feb 2015 13:11:28 -0800
Subject: [PATCH] [SQL] Improve error messages

Author: Michael Armbrust <michael@databricks.com>
Author: wangfei <wangfei1@huawei.com>

Closes #4558 from marmbrus/errorMessages and squashes the following commits:

5e5ab50 [Michael Armbrust] Merge pull request #15 from scwf/errorMessages
fa38881 [wangfei] fix for grouping__id
f279a71 [wangfei] make right references for ScriptTransformation
d29fbde [Michael Armbrust] extra case
1a797b4 [Michael Armbrust] comments
d4e9015 [Michael Armbrust] add comment
af9e668 [Michael Armbrust] no braces
34eb3a4 [Michael Armbrust] more work
6197cd5 [Michael Armbrust] [SQL] Better error messages for analysis failures
---
 .../sql/catalyst/analysis/Analyzer.scala      | 123 ++++++++++--------
 .../catalyst/expressions/AttributeSet.scala   |   6 +-
 .../expressions/namedExpressions.scala        |   2 +-
 .../plans/logical/ScriptTransformation.scala  |   6 +-
 .../spark/sql/catalyst/trees/TreeNode.scala   |   9 ++
 .../sql/catalyst/analysis/AnalysisSuite.scala |  79 +++++++----
 .../BooleanSimplificationSuite.scala          |   4 +-
 .../optimizer/ConstantFoldingSuite.scala      |   4 +-
 .../optimizer/FilterPushdownSuite.scala       |  12 +-
 .../catalyst/optimizer/OptimizeInSuite.scala  |   4 +-
 .../optimizer/UnionPushdownSuite.scala        |   4 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala  |   6 +-
 .../apache/spark/sql/hive/HiveContext.scala   |   4 +-
 .../spark/sql/hive/execution/commands.scala   |   4 +-
 14 files changed, 164 insertions(+), 103 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2d1fa106a2..58a7003977 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.types.{ArrayType, StructField, StructType, IntegerType}
+import org.apache.spark.sql.types._
 
 /**
  * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
@@ -66,32 +66,82 @@ class Analyzer(catalog: Catalog,
       typeCoercionRules ++
       extendedRules : _*),
     Batch("Check Analysis", Once,
-      CheckResolution ::
-      CheckAggregation ::
-      Nil: _*),
-    Batch("AnalysisOperators", fixedPoint,
-      EliminateAnalysisOperators)
+      CheckResolution),
+    Batch("Remove SubQueries", fixedPoint,
+      EliminateSubQueries)
   )
 
   /**
    * Makes sure all attributes and logical plans have been resolved.
    */
   object CheckResolution extends Rule[LogicalPlan] {
+    def failAnalysis(msg: String) = { throw new AnalysisException(msg) }
+
     def apply(plan: LogicalPlan): LogicalPlan = {
-      plan.transformUp {
-        case p if p.expressions.exists(!_.resolved) =>
-          val missing = p.expressions.filterNot(_.resolved).map(_.prettyString).mkString(",")
-          val from = p.inputSet.map(_.name).mkString("{", ", ", "}")
-
-          throw new AnalysisException(s"Cannot resolve '$missing' given input columns $from")
-        case p if !p.resolved && p.childrenResolved =>
-          throw new AnalysisException(s"Unresolved operator in the query plan ${p.simpleString}")
-      } match {
-        // As a backstop, use the root node to check that the entire plan tree is resolved.
-        case p if !p.resolved =>
-          throw new AnalysisException(s"Unresolved operator in the query plan ${p.simpleString}")
-        case p => p
+      // We transform up and order the rules so as to catch the first possible failure instead
+      // of the result of cascading resolution failures.
+      plan.foreachUp {
+        case operator: LogicalPlan =>
+          operator transformExpressionsUp {
+            case a: Attribute if !a.resolved =>
+              val from = operator.inputSet.map(_.name).mkString(", ")
+              failAnalysis(s"cannot resolve '${a.prettyString}' given input columns $from")
+
+            case c: Cast if !c.resolved =>
+              failAnalysis(
+                s"invalid cast from ${c.child.dataType.simpleString} to ${c.dataType.simpleString}")
+
+            case b: BinaryExpression if !b.resolved =>
+              failAnalysis(
+                s"invalid expression ${b.prettyString} " +
+                s"between ${b.left.simpleString} and ${b.right.simpleString}")
+          }
+
+          operator match {
+            case f: Filter if f.condition.dataType != BooleanType =>
+              failAnalysis(
+                s"filter expression '${f.condition.prettyString}' " +
+                s"of type ${f.condition.dataType.simpleString} is not a boolean.")
+
+            case aggregatePlan @ Aggregate(groupingExprs, aggregateExprs, child) =>
+              def checkValidAggregateExpression(expr: Expression): Unit = expr match {
+                case _: AggregateExpression => // OK
+                case e: Attribute if !groupingExprs.contains(e) =>
+                  failAnalysis(
+                    s"expression '${e.prettyString}' is neither present in the group by, " +
+                    s"nor is it an aggregate function. " +
+                     "Add to group by or wrap in first() if you don't care which value you get.")
+                case e if groupingExprs.contains(e) => // OK
+                case e if e.references.isEmpty => // OK
+                case e => e.children.foreach(checkValidAggregateExpression)
+              }
+
+              val cleaned = aggregateExprs.map(_.transform {
+                // Should trim aliases around `GetField`s. These aliases are introduced while
+                // resolving struct field accesses, because `GetField` is not a `NamedExpression`.
+                // (Should we just turn `GetField` into a `NamedExpression`?)
+                case Alias(g, _) => g
+              })
+
+              cleaned.foreach(checkValidAggregateExpression)
+
+            case o if o.children.nonEmpty &&
+              !o.references.filter(_.name != "grouping__id").subsetOf(o.inputSet) =>
+              val missingAttributes = (o.references -- o.inputSet).map(_.prettyString).mkString(",")
+              val input = o.inputSet.map(_.prettyString).mkString(",")
+
+              failAnalysis(s"resolved attributes $missingAttributes missing from $input")
+
+            // Catch all
+            case o if !o.resolved =>
+              failAnalysis(
+                s"unresolved operator ${operator.simpleString}")
+
+            case _ => // Analysis successful!
+          }
       }
+
+      plan
     }
   }
 
@@ -192,37 +242,6 @@ class Analyzer(catalog: Catalog,
     }
   }
 
-  /**
-   * Checks for non-aggregated attributes with aggregation
-   */
-  object CheckAggregation extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = {
-      plan.transform {
-        case aggregatePlan @ Aggregate(groupingExprs, aggregateExprs, child) =>
-          def isValidAggregateExpression(expr: Expression): Boolean = expr match {
-            case _: AggregateExpression => true
-            case e: Attribute => groupingExprs.contains(e)
-            case e if groupingExprs.contains(e) => true
-            case e if e.references.isEmpty => true
-            case e => e.children.forall(isValidAggregateExpression)
-          }
-
-          aggregateExprs.find { e =>
-            !isValidAggregateExpression(e.transform {
-              // Should trim aliases around `GetField`s. These aliases are introduced while
-              // resolving struct field accesses, because `GetField` is not a `NamedExpression`.
-              // (Should we just turn `GetField` into a `NamedExpression`?)
-              case Alias(g: GetField, _) => g
-            })
-          }.foreach { e =>
-            throw new TreeNodeException(plan, s"Expression not in GROUP BY: $e")
-          }
-
-          aggregatePlan
-      }
-    }
-  }
-
   /**
    * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
    */
@@ -230,7 +249,7 @@ class Analyzer(catalog: Catalog,
     def apply(plan: LogicalPlan): LogicalPlan = plan transform {
       case i @ InsertIntoTable(UnresolvedRelation(tableIdentifier, alias), _, _, _) =>
         i.copy(
-          table = EliminateAnalysisOperators(catalog.lookupRelation(tableIdentifier, alias)))
+          table = EliminateSubQueries(catalog.lookupRelation(tableIdentifier, alias)))
       case UnresolvedRelation(tableIdentifier, alias) =>
         catalog.lookupRelation(tableIdentifier, alias)
     }
@@ -477,7 +496,7 @@ class Analyzer(catalog: Catalog,
  * only required to provide scoping information for attributes and can be removed once analysis is
  * complete.
  */
-object EliminateAnalysisOperators extends Rule[LogicalPlan] {
+object EliminateSubQueries extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case Subquery(_, child) => child
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
index 171845ad14..a9ba0be596 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
@@ -20,7 +20,11 @@ package org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.analysis.Star
 
 protected class AttributeEquals(val a: Attribute) {
-  override def hashCode() = a.exprId.hashCode()
+  override def hashCode() = a match {
+    case ar: AttributeReference => ar.exprId.hashCode()
+    case a => a.hashCode()
+  }
+
   override def equals(other: Any) = (a, other.asInstanceOf[AttributeEquals].a) match {
     case (a1: AttributeReference, a2: AttributeReference) => a1.exprId == a2.exprId
     case (a1, a2) => a1 == a2
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index f77c56311c..62c062be6d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -218,7 +218,7 @@ case class PrettyAttribute(name: String) extends Attribute with trees.LeafNode[E
   override def exprId: ExprId = ???
   override def eval(input: Row): EvaluatedType = ???
   override def nullable: Boolean = ???
-  override def dataType: DataType = ???
+  override def dataType: DataType = NullType
 }
 
 object VirtualColumn {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
index cfe2c7a39a..ccf5291219 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Attribute, Expression}
 
 /**
  * Transforms the input by forking and running the specified script.
@@ -32,7 +32,9 @@ case class ScriptTransformation(
     script: String,
     output: Seq[Attribute],
     child: LogicalPlan,
-    ioschema: ScriptInputOutputSchema) extends UnaryNode
+    ioschema: ScriptInputOutputSchema) extends UnaryNode {
+  override def references: AttributeSet = AttributeSet(input.flatMap(_.references))
+}
 
 /**
  * A placeholder for implementation specific input and output properties when passing data
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 2013ae4f7b..e0930b056d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -46,6 +46,15 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] {
     children.foreach(_.foreach(f))
   }
 
+  /**
+   * Runs the given function recursively on [[children]] then on this node.
+   * @param f the function to be applied to each node in the tree.
+   */
+  def foreachUp(f: BaseType => Unit): Unit = {
+    children.foreach(_.foreach(f))
+    f(this)
+  }
+
   /**
    * Returns a Seq containing the result of applying the given function to each
    * node in this tree in a preorder traversal.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index f011a5ff15..e70c651e14 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{Literal, Alias, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.types._
 
@@ -108,24 +108,56 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
         testRelation)
   }
 
-  test("throw errors for unresolved attributes during analysis") {
-    val e = intercept[AnalysisException] {
-      caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("abcd")), testRelation))
+  def errorTest(
+      name: String,
+      plan: LogicalPlan,
+      errorMessages: Seq[String],
+      caseSensitive: Boolean = true) = {
+    test(name) {
+      val error = intercept[AnalysisException] {
+        if(caseSensitive) {
+          caseSensitiveAnalyze(plan)
+        } else {
+          caseInsensitiveAnalyze(plan)
+        }
+      }
+
+      errorMessages.foreach(m => assert(error.getMessage contains m))
     }
-    assert(e.getMessage().toLowerCase.contains("cannot resolve"))
   }
 
-  test("throw errors for unresolved plans during analysis") {
-    case class UnresolvedTestPlan() extends LeafNode {
-      override lazy val resolved = false
-      override def output = Nil
-    }
-    val e = intercept[AnalysisException] {
-      caseSensitiveAnalyze(UnresolvedTestPlan())
-    }
-    assert(e.getMessage().toLowerCase.contains("unresolved"))
+  errorTest(
+    "unresolved attributes",
+    testRelation.select('abcd),
+    "cannot resolve" :: "abcd" :: Nil)
+
+  errorTest(
+    "bad casts",
+    testRelation.select(Literal(1).cast(BinaryType).as('badCast)),
+    "invalid cast" :: Literal(1).dataType.simpleString :: BinaryType.simpleString :: Nil)
+
+  errorTest(
+    "non-boolean filters",
+    testRelation.where(Literal(1)),
+    "filter" :: "'1'" :: "not a boolean" :: Literal(1).dataType.simpleString :: Nil)
+
+  errorTest(
+    "missing group by",
+    testRelation2.groupBy('a)('b),
+    "'b'" :: "group by" :: Nil
+  )
+
+  case class UnresolvedTestPlan() extends LeafNode {
+    override lazy val resolved = false
+    override def output = Nil
   }
 
+  errorTest(
+    "catch all unresolved plan",
+    UnresolvedTestPlan(),
+    "unresolved" :: Nil)
+
+
   test("divide should be casted into fractional types") {
     val testRelation2 = LocalRelation(
       AttributeReference("a", StringType)(),
@@ -134,18 +166,15 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
       AttributeReference("d", DecimalType.Unlimited)(),
       AttributeReference("e", ShortType)())
 
-    val expr0 = 'a / 2
-    val expr1 = 'a / 'b
-    val expr2 = 'a / 'c
-    val expr3 = 'a / 'd
-    val expr4 = 'e / 'e
-    val plan = caseInsensitiveAnalyze(Project(
-      Alias(expr0, s"Analyzer($expr0)")() ::
-      Alias(expr1, s"Analyzer($expr1)")() ::
-      Alias(expr2, s"Analyzer($expr2)")() ::
-      Alias(expr3, s"Analyzer($expr3)")() ::
-      Alias(expr4, s"Analyzer($expr4)")() :: Nil, testRelation2))
+    val plan = caseInsensitiveAnalyze(
+      testRelation2.select(
+        'a / Literal(2) as 'div1,
+        'a / 'b as 'div2,
+        'a / 'c as 'div3,
+        'a / 'd as 'div4,
+        'e / 'e as 'div5))
     val pl = plan.asInstanceOf[Project].projectList
+
     assert(pl(0).dataType == DoubleType)
     assert(pl(1).dataType == DoubleType)
     assert(pl(2).dataType == DoubleType)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index 264a0eff37..72f06e26e0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
+import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.PlanTest
@@ -30,7 +30,7 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches =
       Batch("AnalysisNodes", Once,
-        EliminateAnalysisOperators) ::
+        EliminateSubQueries) ::
       Batch("Constant Folding", FixedPoint(50),
         NullPropagation,
         ConstantFolding,
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
index e22c625058..ef10c0aece 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, EliminateAnalysisOperators}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, EliminateSubQueries}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.plans.PlanTest
@@ -33,7 +33,7 @@ class ConstantFoldingSuite extends PlanTest {
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches =
       Batch("AnalysisNodes", Once,
-        EliminateAnalysisOperators) ::
+        EliminateSubQueries) ::
       Batch("ConstantFolding", Once,
         ConstantFolding,
         BooleanSimplification) :: Nil
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 1158b5dfc6..0b74bacb18 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
+import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
 import org.apache.spark.sql.catalyst.expressions.Explode
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
@@ -32,7 +32,7 @@ class FilterPushdownSuite extends PlanTest {
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches =
       Batch("Subqueries", Once,
-        EliminateAnalysisOperators) ::
+        EliminateSubQueries) ::
       Batch("Filter Pushdown", Once,
         CombineFilters,
         PushPredicateThroughProject,
@@ -351,7 +351,7 @@ class FilterPushdownSuite extends PlanTest {
     }
     val optimized = Optimize(originalQuery.analyze)
 
-    comparePlans(analysis.EliminateAnalysisOperators(originalQuery.analyze), optimized)
+    comparePlans(analysis.EliminateSubQueries(originalQuery.analyze), optimized)
   }
 
   test("joins: conjunctive predicates") {
@@ -370,7 +370,7 @@ class FilterPushdownSuite extends PlanTest {
       left.join(right, condition = Some("x.b".attr === "y.b".attr))
         .analyze
 
-    comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
+    comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer))
   }
 
   test("joins: conjunctive predicates #2") {
@@ -389,7 +389,7 @@ class FilterPushdownSuite extends PlanTest {
       left.join(right, condition = Some("x.b".attr === "y.b".attr))
         .analyze
 
-    comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
+    comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer))
   }
 
   test("joins: conjunctive predicates #3") {
@@ -412,7 +412,7 @@ class FilterPushdownSuite extends PlanTest {
           condition = Some("z.a".attr === "x.b".attr))
         .analyze
 
-    comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
+    comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer))
   }
 
   val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
index da912ab382..233e329cb2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.optimizer
 
 import scala.collection.immutable.HashSet
-import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.plans.PlanTest
@@ -34,7 +34,7 @@ class OptimizeInSuite extends PlanTest {
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches =
       Batch("AnalysisNodes", Once,
-        EliminateAnalysisOperators) ::
+        EliminateSubQueries) ::
       Batch("ConstantFolding", Once,
         ConstantFolding,
         BooleanSimplification,
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala
index dfef87bd91..a54751dfa9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
+import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.rules._
@@ -29,7 +29,7 @@ class UnionPushdownSuite extends PlanTest {
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches =
       Batch("Subqueries", Once,
-        EliminateAnalysisOperators) ::
+        EliminateSubQueries) ::
       Batch("Union Pushdown", Once,
         UnionPushdown) :: Nil
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index bba8899651..a1c8cf58f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -806,10 +806,8 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   test("throw errors for non-aggregate attributes with aggregation") {
     def checkAggregation(query: String, isInvalidQuery: Boolean = true) {
       if (isInvalidQuery) {
-        val e = intercept[TreeNodeException[LogicalPlan]](sql(query).queryExecution.analyzed)
-        assert(
-          e.getMessage.startsWith("Expression not in GROUP BY"),
-          "Non-aggregate attribute(s) not detected\n")
+        val e = intercept[AnalysisException](sql(query).queryExecution.analyzed)
+        assert(e.getMessage contains "group by")
       } else {
         // Should not throw
         sql(query).queryExecution.analyzed
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 7ae6ed6f84..ddc7b181d4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -37,7 +37,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators, OverrideCatalog, OverrideFunctionRegistry}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubQueries, OverrideCatalog, OverrideFunctionRegistry}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException}
 import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand}
@@ -104,7 +104,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
    */
   @Experimental
   def analyze(tableName: String) {
-    val relation = EliminateAnalysisOperators(catalog.lookupRelation(Seq(tableName)))
+    val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
 
     relation match {
       case relation: MetastoreRelation =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index f6bea1c6a6..ce0db1125c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
+import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.{DataFrame, SQLContext}
@@ -175,7 +175,7 @@ case class CreateMetastoreDataSourceAsSelect(
           val resolved =
             ResolvedDataSource(sqlContext, Some(query.schema), provider, optionsWithPath)
           val createdRelation = LogicalRelation(resolved.relation)
-          EliminateAnalysisOperators(sqlContext.table(tableName).logicalPlan) match {
+          EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match {
             case l @ LogicalRelation(i: InsertableRelation) =>
               if (l.schema != createdRelation.schema) {
                 val errorDescription =
-- 
GitLab