From fe81f6c779213a91369ec61cf5489ad5c66cc49c Mon Sep 17 00:00:00 2001
From: Michael Armbrust <michael@databricks.com>
Date: Mon, 30 Mar 2015 22:24:12 +0800
Subject: [PATCH] [SPARK-6595][SQL] MetastoreRelation should be a
 MultiInstanceRelation

Now that we have `DataFrame`s it is possible to have multiple copies in a single query plan.  As such, it needs to inherit from `MultiInstanceRelation` or self joins will break.  I also add better debugging errors when our self join handling fails in case there are future bugs.

Author: Michael Armbrust <michael@databricks.com>

Closes #5251 from marmbrus/multiMetaStore and squashes the following commits:

4272f6d [Michael Armbrust] [SPARK-6595][SQL] MetastoreRelation should be MuliInstanceRelation
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 +++++++++-
 .../sql/catalyst/analysis/MultiInstanceRelation.scala |  2 +-
 .../apache/spark/sql/hive/HiveMetastoreCatalog.scala  | 11 +++++++++--
 .../spark/sql/hive/HiveMetastoreCatalogSuite.scala    |  8 ++++++++
 4 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 44eceb0b37..ba1ac141b9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -252,7 +252,15 @@ class Analyzer(catalog: Catalog,
           case oldVersion @ Aggregate(_, aggregateExpressions, _)
               if findAliases(aggregateExpressions).intersect(conflictingAttributes).nonEmpty =>
             (oldVersion, oldVersion.copy(aggregateExpressions = newAliases(aggregateExpressions)))
-        }.head // Only handle first case found, others will be fixed on the next pass.
+        }.headOption.getOrElse { // Only handle first case, others will be fixed on the next pass.
+          sys.error(
+            s"""
+              |Failure when resolving conflicting references in Join:
+              |$plan
+              |
+              |Conflicting attributes: ${conflictingAttributes.mkString(",")}
+              """.stripMargin)
+        }
 
         val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
         val newRight = right transformUp {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
index 894c3500cf..35b74024a4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
@@ -30,5 +30,5 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
  * of itself with globally unique expression ids.
  */
 trait MultiInstanceRelation {
-  def newInstance(): this.type
+  def newInstance(): LogicalPlan
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d1a99555e9..203164ea84 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.util.ReflectionUtils
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext}
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Catalog, OverrideCatalog}
+import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException, Catalog, OverrideCatalog}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical
@@ -697,7 +697,7 @@ private[hive] case class MetastoreRelation
     (databaseName: String, tableName: String, alias: Option[String])
     (val table: TTable, val partitions: Seq[TPartition])
     (@transient sqlContext: SQLContext)
-  extends LeafNode {
+  extends LeafNode with MultiInstanceRelation {
 
   self: Product =>
 
@@ -778,6 +778,13 @@ private[hive] case class MetastoreRelation
 
   /** An attribute map for determining the ordinal for non-partition columns. */
   val columnOrdinals = AttributeMap(attributes.zipWithIndex)
+
+  override def newInstance() = {
+    val newCopy = MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext)
+    // The project here is an ugly hack to work around the fact that MetastoreRelation's
+    // equals method is broken.  Please remove this when SPARK-6555 is fixed.
+    Project(newCopy.output, newCopy)
+  }
 }
 
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index aad48ada52..fa8e11ffec 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hive
 
+import org.apache.spark.sql.hive.test.TestHive
 import org.scalatest.FunSuite
 
 import org.apache.spark.sql.test.ExamplePointUDT
@@ -36,4 +37,11 @@ class HiveMetastoreCatalogSuite extends FunSuite {
     assert(HiveMetastoreTypes.toMetastoreType(udt) ===
       HiveMetastoreTypes.toMetastoreType(udt.sqlType))
   }
+
+  test("duplicated metastore relations") {
+    import TestHive.implicits._
+    val df = TestHive.sql("SELECT * FROM src")
+    println(df.queryExecution)
+    df.as('a).join(df.as('b), $"a.key" === $"b.key")
+  }
 }
-- 
GitLab