Skip to content
Snippets Groups Projects
Commit fd6b3101 authored by Wenchen Fan's avatar Wenchen Fan Committed by Michael Armbrust
Browse files

[SPARK-9113] [SQL] enable analysis check code for self join

The check was unreachable before, as `case operator: LogicalPlan` catches everything already.

Author: Wenchen Fan <cloud0fan@outlook.com>

Closes #7449 from cloud-fan/tmp and squashes the following commits:

2bb6637 [Wenchen Fan] add test
5493aea [Wenchen Fan] add the check back
27221a7 [Wenchen Fan] remove unnecessary analysis check code for self join
parent 15fc2ffe
No related branches found
No related tags found
No related merge requests found
...@@ -316,7 +316,7 @@ class Analyzer( ...@@ -316,7 +316,7 @@ class Analyzer(
) )
// Special handling for cases when self-join introduce duplicate expression ids. // Special handling for cases when self-join introduce duplicate expression ids.
case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty => case j @ Join(left, right, _, _) if !j.selfJoinResolved =>
val conflictingAttributes = left.outputSet.intersect(right.outputSet) val conflictingAttributes = left.outputSet.intersect(right.outputSet)
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} in $j") logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} in $j")
......
...@@ -109,29 +109,27 @@ trait CheckAnalysis { ...@@ -109,29 +109,27 @@ trait CheckAnalysis {
s"resolved attribute(s) $missingAttributes missing from $input " + s"resolved attribute(s) $missingAttributes missing from $input " +
s"in operator ${operator.simpleString}") s"in operator ${operator.simpleString}")
case o if !o.resolved =>
failAnalysis(
s"unresolved operator ${operator.simpleString}")
case p @ Project(exprs, _) if containsMultipleGenerators(exprs) => case p @ Project(exprs, _) if containsMultipleGenerators(exprs) =>
failAnalysis( failAnalysis(
s"""Only a single table generating function is allowed in a SELECT clause, found: s"""Only a single table generating function is allowed in a SELECT clause, found:
| ${exprs.map(_.prettyString).mkString(",")}""".stripMargin) | ${exprs.map(_.prettyString).mkString(",")}""".stripMargin)
// Special handling for cases when self-join introduce duplicate expression ids.
case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
failAnalysis(
s"""
|Failure when resolving conflicting references in Join:
|$plan
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
|""".stripMargin)
case o if !o.resolved =>
failAnalysis(
s"unresolved operator ${operator.simpleString}")
case _ => // Analysis successful! case _ => // Analysis successful!
} }
// Special handling for cases when self-join introduce duplicate expression ids.
case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
failAnalysis(
s"""
|Failure when resolving conflicting references in Join:
|$plan
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
|""".stripMargin)
} }
extendedCheckRules.foreach(_(plan)) extendedCheckRules.foreach(_(plan))
} }
......
...@@ -123,11 +123,11 @@ case class Join( ...@@ -123,11 +123,11 @@ case class Join(
} }
} }
private def selfJoinResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty def selfJoinResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
// Joins are only resolved if they don't introduce ambiguious expression ids. // Joins are only resolved if they don't introduce ambiguous expression ids.
override lazy val resolved: Boolean = { override lazy val resolved: Boolean = {
childrenResolved && !expressions.exists(!_.resolved) && selfJoinResolved childrenResolved && expressions.forall(_.resolved) && selfJoinResolved
} }
} }
......
...@@ -23,10 +23,11 @@ import org.apache.spark.SparkFunSuite ...@@ -23,10 +23,11 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._ import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.{InternalRow, SimpleCatalystConf} import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.types._
case class TestFunction( case class TestFunction(
children: Seq[Expression], children: Seq[Expression],
...@@ -164,4 +165,13 @@ class AnalysisErrorSuite extends SparkFunSuite with BeforeAndAfter { ...@@ -164,4 +165,13 @@ class AnalysisErrorSuite extends SparkFunSuite with BeforeAndAfter {
assert(message.contains("resolved attribute(s) a#1 missing from a#2")) assert(message.contains("resolved attribute(s) a#1 missing from a#2"))
} }
test("error test for self-join") {
val join = Join(testRelation, testRelation, Inner, None)
val error = intercept[AnalysisException] {
SimpleAnalyzer.checkAnalysis(join)
}
error.message.contains("Failure when resolving conflicting references in Join")
error.message.contains("Conflicting attributes")
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment