diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6a01a23124d950210c84ea8c876029588ec850d4..f20f0ad99f865193e33b2b78c97cb207048da2fc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import java.io.IOException
 import java.util.{List => JList}
 
+import com.google.common.base.Objects
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition => TPartition, Table => TTable}
 import org.apache.hadoop.hive.metastore.{TableType, Warehouse}
@@ -465,7 +466,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
               relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation -> relation.output, parquetRelation, attributedRewrites)
+          (relation, parquetRelation, attributedRewrites)
 
         // Write path
         case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
@@ -476,7 +477,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
             relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation -> relation.output, parquetRelation, attributedRewrites)
+          (relation, parquetRelation, attributedRewrites)
 
         // Read path
         case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
@@ -485,33 +486,28 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
               relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation -> relation.output, parquetRelation, attributedRewrites)
+          (relation, parquetRelation, attributedRewrites)
       }
 
-      // Quick fix for SPARK-6450: Notice that we're using both the MetastoreRelation instances and
-      // their output attributes as the key of the map. This is because MetastoreRelation.equals
-      // doesn't take output attributes into account, thus multiple MetastoreRelation instances
-      // pointing to the same table get collapsed into a single entry in the map. A proper fix for
-      // this should be overriding equals & hashCode in MetastoreRelation.
       val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
       val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))
 
       // Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
       // attribute IDs referenced in other nodes.
       plan.transformUp {
-        case r: MetastoreRelation if relationMap.contains(r -> r.output) =>
-          val parquetRelation = relationMap(r -> r.output)
+        case r: MetastoreRelation if relationMap.contains(r) =>
+          val parquetRelation = relationMap(r)
           val alias = r.alias.getOrElse(r.tableName)
           Subquery(alias, parquetRelation)
 
         case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
-          if relationMap.contains(r -> r.output) =>
-          val parquetRelation = relationMap(r -> r.output)
+          if relationMap.contains(r) =>
+          val parquetRelation = relationMap(r)
           InsertIntoTable(parquetRelation, partition, child, overwrite)
 
         case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
-          if relationMap.contains(r -> r.output) =>
-          val parquetRelation = relationMap(r -> r.output)
+          if relationMap.contains(r) =>
+          val parquetRelation = relationMap(r)
           InsertIntoTable(parquetRelation, partition, child, overwrite)
 
         case other => other.transformExpressions {
@@ -707,6 +703,19 @@ private[hive] case class MetastoreRelation
 
   self: Product =>
 
+  override def equals(other: scala.Any): Boolean = other match {
+    case relation: MetastoreRelation =>
+      databaseName == relation.databaseName &&
+        tableName == relation.tableName &&
+        alias == relation.alias &&
+        output == relation.output
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    Objects.hashCode(databaseName, tableName, alias, output)
+  }
+
   // TODO: Can we use org.apache.hadoop.hive.ql.metadata.Table as the type of table and
   // use org.apache.hadoop.hive.ql.metadata.Partition as the type of elements of partitions.
   // Right now, using org.apache.hadoop.hive.ql.metadata.Table and
@@ -786,10 +795,7 @@ private[hive] case class MetastoreRelation
   val columnOrdinals = AttributeMap(attributes.zipWithIndex)
 
   override def newInstance() = {
-    val newCopy = MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext)
-    // The project here is an ugly hack to work around the fact that MetastoreRelation's
-    // equals method is broken.  Please remove this when SPARK-6555 is fixed.
-    Project(newCopy.output, newCopy)
+    MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext)
   }
 }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
index c939e6e99d28a741ca721faae868e4866c3b9676..bdb53ddf59c19776b3d3582dfca2a4a347277037 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
@@ -22,10 +22,12 @@ import org.apache.spark.sql.hive.test.TestHive
 
 class HivePlanTest extends QueryTest {
   import TestHive._
+  import TestHive.implicits._
 
   test("udf constant folding") {
-    val optimized = sql("SELECT cos(null) FROM src").queryExecution.optimizedPlan
-    val correctAnswer = sql("SELECT cast(null as double) FROM src").queryExecution.optimizedPlan
+    Seq.empty[Tuple1[Int]].toDF("a").registerTempTable("t")
+    val optimized = sql("SELECT cos(null) FROM t").queryExecution.optimizedPlan
+    val correctAnswer = sql("SELECT cast(null as double) FROM t").queryExecution.optimizedPlan
 
     comparePlans(optimized, correctAnswer)
   }