Skip to content
Snippets Groups Projects
Commit d637a666 authored by Cheng Hao's avatar Cheng Hao Committed by Michael Armbrust
Browse files

[SPARK-10327] [SQL] Cache Table is not working while subquery has alias in its project list

```scala
    import org.apache.spark.sql.hive.execution.HiveTableScan
    sql("select key, value, key + 1 from src").registerTempTable("abc")
    cacheTable("abc")

    val sparkPlan = sql(
      """select a.key, b.key, c.key from
        |abc a join abc b on a.key=b.key
        |join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan

    assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3) // failed
    assert(sparkPlan.collect { case e: HiveTableScan => e }.size === 0) // failed
```

The actual plan is:

```
== Parsed Logical Plan ==
'Project [unresolvedalias('a.key),unresolvedalias('b.key),unresolvedalias('c.key)]
 'Join Inner, Some(('a.key = 'c.key))
  'Join Inner, Some(('a.key = 'b.key))
   'UnresolvedRelation [abc], Some(a)
   'UnresolvedRelation [abc], Some(b)
  'UnresolvedRelation [abc], Some(c)

== Analyzed Logical Plan ==
key: int, key: int, key: int
Project [key#14,key#61,key#66]
 Join Inner, Some((key#14 = key#66))
  Join Inner, Some((key#14 = key#61))
   Subquery a
    Subquery abc
     Project [key#14,value#15,(key#14 + 1) AS _c2#16]
      MetastoreRelation default, src, None
   Subquery b
    Subquery abc
     Project [key#61,value#62,(key#61 + 1) AS _c2#58]
      MetastoreRelation default, src, None
  Subquery c
   Subquery abc
    Project [key#66,value#67,(key#66 + 1) AS _c2#63]
     MetastoreRelation default, src, None

== Optimized Logical Plan ==
Project [key#14,key#61,key#66]
 Join Inner, Some((key#14 = key#66))
  Project [key#14,key#61]
   Join Inner, Some((key#14 = key#61))
    Project [key#14]
     InMemoryRelation [key#14,value#15,_c2#16], true, 10000, StorageLevel(true, true, false, true, 1), (Project [key#14,value#15,(key#14 + 1) AS _c2#16]), Some(abc)
    Project [key#61]
     MetastoreRelation default, src, None
  Project [key#66]
   MetastoreRelation default, src, None

== Physical Plan ==
TungstenProject [key#14,key#61,key#66]
 BroadcastHashJoin [key#14], [key#66], BuildRight
  TungstenProject [key#14,key#61]
   BroadcastHashJoin [key#14], [key#61], BuildRight
    ConvertToUnsafe
     InMemoryColumnarTableScan [key#14], (InMemoryRelation [key#14,value#15,_c2#16], true, 10000, StorageLevel(true, true, false, true, 1), (Project [key#14,value#15,(key#14 + 1) AS _c2#16]), Some(abc))
    ConvertToUnsafe
     HiveTableScan [key#61], (MetastoreRelation default, src, None)
  ConvertToUnsafe
   HiveTableScan [key#66], (MetastoreRelation default, src, None)
```

Author: Cheng Hao <hao.cheng@intel.com>

Closes #8494 from chenghao-intel/weird_cache.
parent 52b24a60
No related branches found
No related tags found
No related merge requests found
......@@ -135,16 +135,25 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
/** Args that have cleaned such that differences in expression id should not affect equality */
protected lazy val cleanArgs: Seq[Any] = {
val input = children.flatMap(_.output)
def cleanExpression(e: Expression) = e match {
case a: Alias =>
// As the root of the expression, Alias will always take an arbitrary exprId, we need
// to erase that for equality testing.
val cleanedExprId = Alias(a.child, a.name)(ExprId(-1), a.qualifiers)
BindReferences.bindReference(cleanedExprId, input, allowFailures = true)
case other => BindReferences.bindReference(other, input, allowFailures = true)
}
productIterator.map {
// Children are checked using sameResult above.
case tn: TreeNode[_] if containsChild(tn) => null
case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
case e: Expression => cleanExpression(e)
case s: Option[_] => s.map {
case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
case e: Expression => cleanExpression(e)
case other => other
}
case s: Seq[_] => s.map {
case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
case e: Expression => cleanExpression(e)
case other => other
}
case other => other
......
......@@ -17,6 +17,8 @@
package org.apache.spark.sql
import org.apache.spark.sql.execution.PhysicalRDD
import scala.concurrent.duration._
import scala.language.postfixOps
......@@ -338,4 +340,18 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
assert((accsSize - 2) == Accumulators.originals.size)
}
}
test("SPARK-10327 Cache Table is not working while subquery has alias in its project list") {
ctx.sparkContext.parallelize((1, 1) :: (2, 2) :: Nil)
.toDF("key", "value").selectExpr("key", "value", "key+1").registerTempTable("abc")
ctx.cacheTable("abc")
val sparkPlan = sql(
"""select a.key, b.key, c.key from
|abc a join abc b on a.key=b.key
|join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan
assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3)
assert(sparkPlan.collect { case e: PhysicalRDD => e }.size === 0)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment