Skip to content
Snippets Groups Projects
Commit d5f12bfe authored by Yin Huai's avatar Yin Huai Committed by Michael Armbrust
Browse files

[SPARK-5875][SQL]logical.Project should not be resolved if it contains aggregates or generators

https://issues.apache.org/jira/browse/SPARK-5875 has a case to reproduce the bug and explain the root cause.

Author: Yin Huai <yhuai@databricks.com>

Closes #4663 from yhuai/projectResolved and squashes the following commits:

472f7b6 [Yin Huai] If a logical.Project has any AggregateExpression or Generator, it's resolved field should be false.
parent a51fc7ef
No related branches found
No related tags found
No related merge requests found
......@@ -23,6 +23,16 @@ import org.apache.spark.sql.types._
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
def output = projectList.map(_.toAttribute)
override lazy val resolved: Boolean = {
val containsAggregatesOrGenerators = projectList.exists ( _.collect {
case agg: AggregateExpression => agg
case generator: Generator => generator
}.nonEmpty
)
!expressions.exists(!_.resolved) && childrenResolved && !containsAggregatesOrGenerators
}
}
/**
......
......@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Literal, Alias, AttributeReference}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
......@@ -58,6 +58,17 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
assert(caseInsensitiveAnalyze(plan).resolved)
}
test("check project's resolved") {
assert(Project(testRelation.output, testRelation).resolved)
assert(!Project(Seq(UnresolvedAttribute("a")), testRelation).resolved)
val explode = Explode(Nil, AttributeReference("a", IntegerType, nullable = true)())
assert(!Project(Seq(Alias(explode, "explode")()), testRelation).resolved)
assert(!Project(Seq(Alias(Count(Literal(1)), "count")()), testRelation).resolved)
}
test("analyze project") {
assert(
caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("a")), testRelation)) ===
......
......@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.hive.HiveShim
import org.apache.spark.sql.hive.{MetastoreRelation, HiveShim}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
......@@ -316,4 +316,34 @@ class SQLQuerySuite extends QueryTest {
dropTempTable("data")
}
test("logical.Project should not be resolved if it contains aggregates or generators") {
// This test is used to test the fix of SPARK-5875.
// The original issue was that Project's resolved will be true when it contains
// AggregateExpressions or Generators. However, in this case, the Project
// is not in a valid state (cannot be executed). Because of this bug, the analysis rule of
// PreInsertionCasts will actually start to work before ImplicitGenerate and then
// generates an invalid query plan.
val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i+1}]}"""))
jsonRDD(rdd).registerTempTable("data")
val originalConf = getConf("spark.sql.hive.convertCTAS", "false")
setConf("spark.sql.hive.convertCTAS", "false")
sql("CREATE TABLE explodeTest (key bigInt)")
table("explodeTest").queryExecution.analyzed match {
case metastoreRelation: MetastoreRelation => // OK
case _ =>
fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation")
}
sql(s"INSERT OVERWRITE TABLE explodeTest SELECT explode(a) AS val FROM data")
checkAnswer(
sql("SELECT key from explodeTest"),
(1 to 5).flatMap(i => Row(i) :: Row(i + 1) :: Nil)
)
sql("DROP TABLE explodeTest")
dropTempTable("data")
setConf("spark.sql.hive.convertCTAS", originalConf)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment