From d5f12bfe8f0a98d6fee114bb24376668ebe2898e Mon Sep 17 00:00:00 2001
From: Yin Huai <yhuai@databricks.com>
Date: Tue, 17 Feb 2015 17:50:39 -0800
Subject: [PATCH] [SPARK-5875][SQL]logical.Project should not be resolved if it
 contains aggregates or generators

https://issues.apache.org/jira/browse/SPARK-5875 has a case to reproduce the bug and explain the root cause.

Author: Yin Huai <yhuai@databricks.com>

Closes #4663 from yhuai/projectResolved and squashes the following commits:

472f7b6 [Yin Huai] If a logical.Project has any AggregateExpression or Generator, it's resolved field should be false.
---
 .../plans/logical/basicOperators.scala        | 10 ++++++
 .../sql/catalyst/analysis/AnalysisSuite.scala | 13 +++++++-
 .../sql/hive/execution/SQLQuerySuite.scala    | 32 ++++++++++++++++++-
 3 files changed, 53 insertions(+), 2 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 9628e93274..89544add74 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -23,6 +23,16 @@ import org.apache.spark.sql.types._
 
 case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
   def output = projectList.map(_.toAttribute)
+
+  override lazy val resolved: Boolean = {
+    val containsAggregatesOrGenerators = projectList.exists ( _.collect {
+        case agg: AggregateExpression => agg
+        case generator: Generator => generator
+      }.nonEmpty
+    )
+
+    !expressions.exists(!_.resolved) && childrenResolved && !containsAggregatesOrGenerators
+  }
 }
 
 /**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index e70c651e14..aec7847356 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Literal, Alias, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.types._
 
@@ -58,6 +58,17 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
     assert(caseInsensitiveAnalyze(plan).resolved)
   }
 
+  test("check project's resolved") {
+    assert(Project(testRelation.output, testRelation).resolved)
+
+    assert(!Project(Seq(UnresolvedAttribute("a")), testRelation).resolved)
+
+    val explode = Explode(Nil, AttributeReference("a", IntegerType, nullable = true)())
+    assert(!Project(Seq(Alias(explode, "explode")()), testRelation).resolved)
+
+    assert(!Project(Seq(Alias(Count(Literal(1)), "count")()), testRelation).resolved)
+  }
+
   test("analyze project") {
     assert(
       caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("a")), testRelation)) ===
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index e8d9eec3d8..ae03bc5e99 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.execution
 
-import org.apache.spark.sql.hive.HiveShim
+import org.apache.spark.sql.hive.{MetastoreRelation, HiveShim}
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -316,4 +316,34 @@ class SQLQuerySuite extends QueryTest {
 
     dropTempTable("data")
   }
+
+  test("logical.Project should not be resolved if it contains aggregates or generators") {
+    // This test is used to test the fix of SPARK-5875.
+    // The original issue was that Project's resolved will be true when it contains
+    // AggregateExpressions or Generators. However, in this case, the Project
+    // is not in a valid state (cannot be executed). Because of this bug, the analysis rule of
+    // PreInsertionCasts will actually start to work before ImplicitGenerate and then
+    // generates an invalid query plan.
+    val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i+1}]}"""))
+    jsonRDD(rdd).registerTempTable("data")
+    val originalConf = getConf("spark.sql.hive.convertCTAS", "false")
+    setConf("spark.sql.hive.convertCTAS", "false")
+
+    sql("CREATE TABLE explodeTest (key bigInt)")
+    table("explodeTest").queryExecution.analyzed match {
+      case metastoreRelation: MetastoreRelation => // OK
+      case _ =>
+        fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation")
+    }
+
+    sql(s"INSERT OVERWRITE TABLE explodeTest SELECT explode(a) AS val FROM data")
+    checkAnswer(
+      sql("SELECT key from explodeTest"),
+      (1 to 5).flatMap(i => Row(i) :: Row(i + 1) :: Nil)
+    )
+
+    sql("DROP TABLE explodeTest")
+    dropTempTable("data")
+    setConf("spark.sql.hive.convertCTAS", originalConf)
+  }
 }
-- 
GitLab