Skip to content
Snippets Groups Projects
Commit 8c3b0052 authored by Cheng Lian's avatar Cheng Lian Committed by Michael Armbrust
Browse files

[SPARK-6450] [SQL] Fixes metastore Parquet table conversion

The `ParquetConversions` analysis rule generates a hash map, which maps from the original `MetastoreRelation` instances to the newly created `ParquetRelation2` instances. However, `MetastoreRelation.equals` doesn't compare output attributes. Thus, if a single metastore Parquet table appears multiple times in a query, only a single entry ends up in the hash map, and the conversion is not correctly performed.

Proper fix for this issue should be overriding `equals` and `hashCode` for MetastoreRelation. Unfortunately, this breaks more tests than expected. It's possible that these tests are ill-formed from the very beginning. As 1.3.1 release is approaching, we'd like to make the change more surgical to avoid potential regressions. The proposed fix here is to make both the metastore relations and their output attributes as keys in the hash map used in ParquetConversions.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5183)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #5183 from liancheng/spark-6450 and squashes the following commits:

3536780 [Cheng Lian] Fixes metastore Parquet table conversion
parent d44a3362
No related branches found
No related tags found
No related merge requests found
......@@ -459,7 +459,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
(relation -> relation.output, parquetRelation, attributedRewrites)
// Write path
case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
......@@ -470,7 +470,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
(relation -> relation.output, parquetRelation, attributedRewrites)
// Read path
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
......@@ -479,33 +479,35 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
(relation -> relation.output, parquetRelation, attributedRewrites)
}
// Quick fix for SPARK-6450: Notice that we're using both the MetastoreRelation instances and
// their output attributes as the key of the map. This is because MetastoreRelation.equals
// doesn't take output attributes into account, thus multiple MetastoreRelation instances
// pointing to the same table get collapsed into a single entry in the map. A proper fix for
// this should be overriding equals & hashCode in MetastoreRelation.
val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))
// Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
// attribute IDs referenced in other nodes.
plan.transformUp {
case r: MetastoreRelation if relationMap.contains(r) => {
val parquetRelation = relationMap(r)
val withAlias =
r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
Subquery(r.tableName, parquetRelation))
case r: MetastoreRelation if relationMap.contains(r -> r.output) =>
val parquetRelation = relationMap(r -> r.output)
val alias = r.alias.getOrElse(r.tableName)
Subquery(alias, parquetRelation)
withAlias
}
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
if relationMap.contains(r) => {
val parquetRelation = relationMap(r)
if relationMap.contains(r -> r.output) =>
val parquetRelation = relationMap(r -> r.output)
InsertIntoTable(parquetRelation, partition, child, overwrite)
}
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
if relationMap.contains(r) => {
val parquetRelation = relationMap(r)
if relationMap.contains(r -> r.output) =>
val parquetRelation = relationMap(r -> r.output)
InsertIntoTable(parquetRelation, partition, child, overwrite)
}
case other => other.transformExpressions {
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
}
......
......@@ -365,6 +365,31 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
sql("DROP TABLE IF EXISTS test_insert_parquet")
}
test("SPARK-6450 regression test") {
sql(
"""CREATE TABLE IF NOT EXISTS ms_convert (key INT)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)
// This shouldn't throw AnalysisException
val analyzed = sql(
"""SELECT key FROM ms_convert
|UNION ALL
|SELECT key FROM ms_convert
""".stripMargin).queryExecution.analyzed
assertResult(2) {
analyzed.collect {
case r @ LogicalRelation(_: ParquetRelation2) => r
}.size
}
sql("DROP TABLE ms_convert")
}
}
class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment