From a7992ffaf1e8adc9d2c225a986fa3162e8e130eb Mon Sep 17 00:00:00 2001
From: Cheng Lian <lian@databricks.com>
Date: Tue, 31 Mar 2015 11:18:25 -0700
Subject: [PATCH] [SPARK-6555] [SQL] Overrides equals() and hashCode() for
 MetastoreRelation

Also removes temporary workarounds made in #5183 and #5251.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5289)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #5289 from liancheng/spark-6555 and squashes the following commits:

d0095ac [Cheng Lian] Removes unused imports
cfafeeb [Cheng Lian] Removes outdated comment
75a2746 [Cheng Lian] Overrides equals() and hashCode() for MetastoreRelation
---
 .../spark/sql/hive/HiveMetastoreCatalog.scala | 42 +++++++++++--------
 .../sql/hive/execution/HivePlanTest.scala     |  6 ++-
 2 files changed, 28 insertions(+), 20 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6a01a23124..f20f0ad99f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import java.io.IOException
 import java.util.{List => JList}
 
+import com.google.common.base.Objects
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition => TPartition, Table => TTable}
 import org.apache.hadoop.hive.metastore.{TableType, Warehouse}
@@ -465,7 +466,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
               relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation -> relation.output, parquetRelation, attributedRewrites)
+          (relation, parquetRelation, attributedRewrites)
 
         // Write path
         case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
@@ -476,7 +477,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
             relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation -> relation.output, parquetRelation, attributedRewrites)
+          (relation, parquetRelation, attributedRewrites)
 
         // Read path
         case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
@@ -485,33 +486,28 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
               relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation -> relation.output, parquetRelation, attributedRewrites)
+          (relation, parquetRelation, attributedRewrites)
       }
 
-      // Quick fix for SPARK-6450: Notice that we're using both the MetastoreRelation instances and
-      // their output attributes as the key of the map. This is because MetastoreRelation.equals
-      // doesn't take output attributes into account, thus multiple MetastoreRelation instances
-      // pointing to the same table get collapsed into a single entry in the map. A proper fix for
-      // this should be overriding equals & hashCode in MetastoreRelation.
       val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
       val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))
 
       // Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
       // attribute IDs referenced in other nodes.
       plan.transformUp {
-        case r: MetastoreRelation if relationMap.contains(r -> r.output) =>
-          val parquetRelation = relationMap(r -> r.output)
+        case r: MetastoreRelation if relationMap.contains(r) =>
+          val parquetRelation = relationMap(r)
           val alias = r.alias.getOrElse(r.tableName)
           Subquery(alias, parquetRelation)
 
         case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
-          if relationMap.contains(r -> r.output) =>
-          val parquetRelation = relationMap(r -> r.output)
+          if relationMap.contains(r) =>
+          val parquetRelation = relationMap(r)
           InsertIntoTable(parquetRelation, partition, child, overwrite)
 
         case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
-          if relationMap.contains(r -> r.output) =>
-          val parquetRelation = relationMap(r -> r.output)
+          if relationMap.contains(r) =>
+          val parquetRelation = relationMap(r)
           InsertIntoTable(parquetRelation, partition, child, overwrite)
 
         case other => other.transformExpressions {
@@ -707,6 +703,19 @@ private[hive] case class MetastoreRelation
 
   self: Product =>
 
+  override def equals(other: scala.Any): Boolean = other match {
+    case relation: MetastoreRelation =>
+      databaseName == relation.databaseName &&
+        tableName == relation.tableName &&
+        alias == relation.alias &&
+        output == relation.output
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    Objects.hashCode(databaseName, tableName, alias, output)
+  }
+
   // TODO: Can we use org.apache.hadoop.hive.ql.metadata.Table as the type of table and
   // use org.apache.hadoop.hive.ql.metadata.Partition as the type of elements of partitions.
   // Right now, using org.apache.hadoop.hive.ql.metadata.Table and
@@ -786,10 +795,7 @@ private[hive] case class MetastoreRelation
   val columnOrdinals = AttributeMap(attributes.zipWithIndex)
 
   override def newInstance() = {
-    val newCopy = MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext)
-    // The project here is an ugly hack to work around the fact that MetastoreRelation's
-    // equals method is broken.  Please remove this when SPARK-6555 is fixed.
-    Project(newCopy.output, newCopy)
+    MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext)
   }
 }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
index c939e6e99d..bdb53ddf59 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
@@ -22,10 +22,12 @@ import org.apache.spark.sql.hive.test.TestHive
 
 class HivePlanTest extends QueryTest {
   import TestHive._
+  import TestHive.implicits._
 
   test("udf constant folding") {
-    val optimized = sql("SELECT cos(null) FROM src").queryExecution.optimizedPlan
-    val correctAnswer = sql("SELECT cast(null as double) FROM src").queryExecution.optimizedPlan
+    Seq.empty[Tuple1[Int]].toDF("a").registerTempTable("t")
+    val optimized = sql("SELECT cos(null) FROM t").queryExecution.optimizedPlan
+    val correctAnswer = sql("SELECT cast(null as double) FROM t").queryExecution.optimizedPlan
 
     comparePlans(optimized, correctAnswer)
   }
-- 
GitLab