Skip to content
Snippets Groups Projects
Commit fe81f6c7 authored by Michael Armbrust's avatar Michael Armbrust Committed by Cheng Lian
Browse files

[SPARK-6595][SQL] MetastoreRelation should be a MultiInstanceRelation

Now that we have `DataFrame`s it is possible to have multiple copies in a single query plan.  As such, it needs to inherit from `MultiInstanceRelation` or self joins will break.  I also add better debugging errors when our self join handling fails in case there are future bugs.

Author: Michael Armbrust <michael@databricks.com>

Closes #5251 from marmbrus/multiMetaStore and squashes the following commits:

4272f6d [Michael Armbrust] [SPARK-6595][SQL] MetastoreRelation should be MuliInstanceRelation
parent 19d4c392
No related branches found
No related tags found
No related merge requests found
...@@ -252,7 +252,15 @@ class Analyzer(catalog: Catalog, ...@@ -252,7 +252,15 @@ class Analyzer(catalog: Catalog,
case oldVersion @ Aggregate(_, aggregateExpressions, _) case oldVersion @ Aggregate(_, aggregateExpressions, _)
if findAliases(aggregateExpressions).intersect(conflictingAttributes).nonEmpty => if findAliases(aggregateExpressions).intersect(conflictingAttributes).nonEmpty =>
(oldVersion, oldVersion.copy(aggregateExpressions = newAliases(aggregateExpressions))) (oldVersion, oldVersion.copy(aggregateExpressions = newAliases(aggregateExpressions)))
}.head // Only handle first case found, others will be fixed on the next pass. }.headOption.getOrElse { // Only handle first case, others will be fixed on the next pass.
sys.error(
s"""
|Failure when resolving conflicting references in Join:
|$plan
|
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
""".stripMargin)
}
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
val newRight = right transformUp { val newRight = right transformUp {
......
...@@ -30,5 +30,5 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan ...@@ -30,5 +30,5 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
* of itself with globally unique expression ids. * of itself with globally unique expression ids.
*/ */
trait MultiInstanceRelation { trait MultiInstanceRelation {
def newInstance(): this.type def newInstance(): LogicalPlan
} }
...@@ -32,7 +32,7 @@ import org.apache.hadoop.util.ReflectionUtils ...@@ -32,7 +32,7 @@ import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext} import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext}
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Catalog, OverrideCatalog} import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException, Catalog, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical
...@@ -697,7 +697,7 @@ private[hive] case class MetastoreRelation ...@@ -697,7 +697,7 @@ private[hive] case class MetastoreRelation
(databaseName: String, tableName: String, alias: Option[String]) (databaseName: String, tableName: String, alias: Option[String])
(val table: TTable, val partitions: Seq[TPartition]) (val table: TTable, val partitions: Seq[TPartition])
(@transient sqlContext: SQLContext) (@transient sqlContext: SQLContext)
extends LeafNode { extends LeafNode with MultiInstanceRelation {
self: Product => self: Product =>
...@@ -778,6 +778,13 @@ private[hive] case class MetastoreRelation ...@@ -778,6 +778,13 @@ private[hive] case class MetastoreRelation
/** An attribute map for determining the ordinal for non-partition columns. */ /** An attribute map for determining the ordinal for non-partition columns. */
val columnOrdinals = AttributeMap(attributes.zipWithIndex) val columnOrdinals = AttributeMap(attributes.zipWithIndex)
override def newInstance() = {
val newCopy = MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext)
// The project here is an ugly hack to work around the fact that MetastoreRelation's
// equals method is broken. Please remove this when SPARK-6555 is fixed.
Project(newCopy.output, newCopy)
}
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.spark.sql.hive package org.apache.spark.sql.hive
import org.apache.spark.sql.hive.test.TestHive
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.apache.spark.sql.test.ExamplePointUDT import org.apache.spark.sql.test.ExamplePointUDT
...@@ -36,4 +37,11 @@ class HiveMetastoreCatalogSuite extends FunSuite { ...@@ -36,4 +37,11 @@ class HiveMetastoreCatalogSuite extends FunSuite {
assert(HiveMetastoreTypes.toMetastoreType(udt) === assert(HiveMetastoreTypes.toMetastoreType(udt) ===
HiveMetastoreTypes.toMetastoreType(udt.sqlType)) HiveMetastoreTypes.toMetastoreType(udt.sqlType))
} }
test("duplicated metastore relations") {
import TestHive.implicits._
val df = TestHive.sql("SELECT * FROM src")
println(df.queryExecution)
df.as('a).join(df.as('b), $"a.key" === $"b.key")
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment