Skip to content
Snippets Groups Projects
Commit a7992ffa authored by Cheng Lian's avatar Cheng Lian Committed by Michael Armbrust
Browse files

[SPARK-6555] [SQL] Overrides equals() and hashCode() for MetastoreRelation

Also removes temporary workarounds made in #5183 and #5251.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5289)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #5289 from liancheng/spark-6555 and squashes the following commits:

d0095ac [Cheng Lian] Removes unused imports
cfafeeb [Cheng Lian] Removes outdated comment
75a2746 [Cheng Lian] Overrides equals() and hashCode() for MetastoreRelation
parent d01a6d8c
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.util.{List => JList}
import com.google.common.base.Objects
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition => TPartition, Table => TTable}
import org.apache.hadoop.hive.metastore.{TableType, Warehouse}
......@@ -465,7 +466,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation -> relation.output, parquetRelation, attributedRewrites)
(relation, parquetRelation, attributedRewrites)
// Write path
case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
......@@ -476,7 +477,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation -> relation.output, parquetRelation, attributedRewrites)
(relation, parquetRelation, attributedRewrites)
// Read path
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
......@@ -485,33 +486,28 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation -> relation.output, parquetRelation, attributedRewrites)
(relation, parquetRelation, attributedRewrites)
}
// Quick fix for SPARK-6450: Notice that we're using both the MetastoreRelation instances and
// their output attributes as the key of the map. This is because MetastoreRelation.equals
// doesn't take output attributes into account, thus multiple MetastoreRelation instances
// pointing to the same table get collapsed into a single entry in the map. A proper fix for
// this should be overriding equals & hashCode in MetastoreRelation.
val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))
// Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
// attribute IDs referenced in other nodes.
plan.transformUp {
case r: MetastoreRelation if relationMap.contains(r -> r.output) =>
val parquetRelation = relationMap(r -> r.output)
case r: MetastoreRelation if relationMap.contains(r) =>
val parquetRelation = relationMap(r)
val alias = r.alias.getOrElse(r.tableName)
Subquery(alias, parquetRelation)
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
if relationMap.contains(r -> r.output) =>
val parquetRelation = relationMap(r -> r.output)
if relationMap.contains(r) =>
val parquetRelation = relationMap(r)
InsertIntoTable(parquetRelation, partition, child, overwrite)
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
if relationMap.contains(r -> r.output) =>
val parquetRelation = relationMap(r -> r.output)
if relationMap.contains(r) =>
val parquetRelation = relationMap(r)
InsertIntoTable(parquetRelation, partition, child, overwrite)
case other => other.transformExpressions {
......@@ -707,6 +703,19 @@ private[hive] case class MetastoreRelation
self: Product =>
override def equals(other: scala.Any): Boolean = other match {
case relation: MetastoreRelation =>
databaseName == relation.databaseName &&
tableName == relation.tableName &&
alias == relation.alias &&
output == relation.output
case _ => false
}
override def hashCode(): Int = {
Objects.hashCode(databaseName, tableName, alias, output)
}
// TODO: Can we use org.apache.hadoop.hive.ql.metadata.Table as the type of table and
// use org.apache.hadoop.hive.ql.metadata.Partition as the type of elements of partitions.
// Right now, using org.apache.hadoop.hive.ql.metadata.Table and
......@@ -786,10 +795,7 @@ private[hive] case class MetastoreRelation
val columnOrdinals = AttributeMap(attributes.zipWithIndex)
override def newInstance() = {
val newCopy = MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext)
// The project here is an ugly hack to work around the fact that MetastoreRelation's
// equals method is broken. Please remove this when SPARK-6555 is fixed.
Project(newCopy.output, newCopy)
MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext)
}
}
......
......@@ -22,10 +22,12 @@ import org.apache.spark.sql.hive.test.TestHive
class HivePlanTest extends QueryTest {
import TestHive._
import TestHive.implicits._
test("udf constant folding") {
val optimized = sql("SELECT cos(null) FROM src").queryExecution.optimizedPlan
val correctAnswer = sql("SELECT cast(null as double) FROM src").queryExecution.optimizedPlan
Seq.empty[Tuple1[Int]].toDF("a").registerTempTable("t")
val optimized = sql("SELECT cos(null) FROM t").queryExecution.optimizedPlan
val correctAnswer = sql("SELECT cast(null as double) FROM t").queryExecution.optimizedPlan
comparePlans(optimized, correctAnswer)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment