Skip to content
Snippets Groups Projects
Commit 6722aca8 authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Cheng Lian
Browse files

[SPARK-8785] [SQL] Improve Parquet schema merging

JIRA: https://issues.apache.org/jira/browse/SPARK-8785

Currently, the parquet schema merging (`ParquetRelation2.readSchema`) may spend much time to merge duplicate schema. We can select only non duplicate schema and merge them later.

Author: Liang-Chi Hsieh <viirya@gmail.com>
Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #7182 from viirya/improve_parquet_merging and squashes the following commits:

5cf934f [Liang-Chi Hsieh] Refactor it to make it faster.
f3411ea [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into improve_parquet_merging
a63c3ff [Liang-Chi Hsieh] Improve Parquet schema merging.
parent bf02e377
No related branches found
No related tags found
No related merge requests found
......@@ -21,6 +21,7 @@ import java.net.URI
import java.util.{List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Try
import com.google.common.base.Objects
......@@ -30,8 +31,9 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.metadata.{FileMetaData, CompressionCodecName}
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.schema.MessageType
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
......@@ -508,44 +510,56 @@ private[sql] object ParquetRelation2 extends Logging {
private[parquet] def readSchema(
footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
footers.map { footer =>
def parseParquetSchema(schema: MessageType): StructType = {
StructType.fromAttributes(
// TODO Really no need to use `Attribute` here, we only need to know the data type.
ParquetTypesConverter.convertToAttributes(
schema,
sqlContext.conf.isParquetBinaryAsString,
sqlContext.conf.isParquetINT96AsTimestamp))
}
val seen = mutable.HashSet[String]()
val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
val metadata = footer.getParquetMetadata.getFileMetaData
val parquetSchema = metadata.getSchema
val maybeSparkSchema = metadata
val serializedSchema = metadata
.getKeyValueMetaData
.toMap
.get(RowReadSupport.SPARK_METADATA_KEY)
.flatMap { serializedSchema =>
// Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
// whatever is available.
Try(DataType.fromJson(serializedSchema))
.recover { case _: Throwable =>
logInfo(
s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
"falling back to the deprecated DataType.fromCaseClassString parser.")
DataType.fromCaseClassString(serializedSchema)
}
.recover { case cause: Throwable =>
logWarning(
s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
|\t$serializedSchema
""".stripMargin,
cause)
}
.map(_.asInstanceOf[StructType])
.toOption
}
maybeSparkSchema.getOrElse {
// Falls back to Parquet schema if Spark SQL schema is absent.
StructType.fromAttributes(
// TODO Really no need to use `Attribute` here, we only need to know the data type.
ParquetTypesConverter.convertToAttributes(
parquetSchema,
sqlContext.conf.isParquetBinaryAsString,
sqlContext.conf.isParquetINT96AsTimestamp))
if (serializedSchema == None) {
// Falls back to Parquet schema if no Spark SQL schema found.
Some(parseParquetSchema(metadata.getSchema))
} else if (!seen.contains(serializedSchema.get)) {
seen += serializedSchema.get
// Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
// whatever is available.
Some(Try(DataType.fromJson(serializedSchema.get))
.recover { case _: Throwable =>
logInfo(
s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
"falling back to the deprecated DataType.fromCaseClassString parser.")
DataType.fromCaseClassString(serializedSchema.get)
}
.recover { case cause: Throwable =>
logWarning(
s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
|\t$serializedSchema
""".stripMargin,
cause)
}
.map(_.asInstanceOf[StructType])
.getOrElse {
// Falls back to Parquet schema if Spark SQL schema can't be parsed.
parseParquetSchema(metadata.getSchema)
})
} else {
None
}
}.reduceOption { (left, right) =>
}
finalSchemas.reduceOption { (left, right) =>
try left.merge(right) catch { case e: Throwable =>
throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment