Skip to content
Snippets Groups Projects
Commit 81020144 authored by Cheng Lian's avatar Cheng Lian Committed by Michael Armbrust
Browse files

[SPARK-6575] [SQL] Adds configuration to disable schema merging while...

[SPARK-6575] [SQL] Adds configuration to disable schema merging while converting metastore Parquet tables

Consider a metastore Parquet table that

1. doesn't have schema evolution issue
2. has lots of data files and/or partitions

In this case, driver schema merging can be both slow and unnecessary. Would be good to have a configuration to let the use disable schema merging when converting such a metastore Parquet table.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5231)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #5231 from liancheng/spark-6575 and squashes the following commits:

cd96159 [Cheng Lian] Adds configuration to disable schema merging while converting metastore Parquet tables
parent a7992ffa
No related branches found
No related tags found
No related merge requests found
......@@ -57,6 +57,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
protected[sql] def convertMetastoreParquet: Boolean =
getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"
/**
* When true, also tries to merge possibly different but compatible Parquet schemas in different
* Parquet data files.
*
* This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true.
*/
protected[sql] def convertMetastoreParquetWithSchemaMerging: Boolean =
getConf("spark.sql.hive.convertMetastoreParquet.mergeSchema", "false") == "true"
/**
* When true, a table created by a Hive CTAS statement (no USING clause) will be
* converted to a data source table, using the data source set by spark.sql.sources.default.
......
......@@ -218,6 +218,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
val parquetOptions = Map(
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
// NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to
// serialize the Metastore schema to JSON and pass it as a data source option because of the
......@@ -234,18 +238,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val paths = partitions.map(_.path)
LogicalRelation(
ParquetRelation2(
paths,
Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json),
None,
Some(partitionSpec))(hive))
LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
} else {
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
LogicalRelation(
ParquetRelation2(
paths,
Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json))(hive))
LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment