Skip to content
Snippets Groups Projects
Commit 14978b78 authored by Cheng Lian's avatar Cheng Lian Committed by Davies Liu
Browse files

[SPARK-10395] [SQL] Simplifies CatalystReadSupport

Please refer to [SPARK-10395] [1] for details.

[1]: https://issues.apache.org/jira/browse/SPARK-10395

Author: Cheng Lian <lian@databricks.com>

Closes #8553 from liancheng/spark-10395/simplify-parquet-read-support.
parent 353c30bd
No related branches found
No related tags found
No related merge requests found
...@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet ...@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.util.{Map => JMap} import java.util.{Map => JMap}
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, mapAsScalaMapConverter} import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
...@@ -29,34 +29,62 @@ import org.apache.parquet.schema.Type.Repetition ...@@ -29,34 +29,62 @@ import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema._ import org.apache.parquet.schema._
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
/**
* A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst
* [[InternalRow]]s.
*
* The API interface of [[ReadSupport]] is a little bit over complicated because of historical
* reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be
* instantiated and initialized twice on both driver side and executor side. The [[init()]] method
* is for driver side initialization, while [[prepareForRead()]] is for executor side. However,
* starting from parquet-mr 1.6.0, it's no longer the case, and [[ReadSupport]] is only instantiated
* and initialized on executor side. So, theoretically, now it's totally fine to combine these two
* methods into a single initialization method. The only reason (I could think of) to still have
* them here is for parquet-mr API backwards-compatibility.
*
* Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
* to [[prepareForRead()]], but use a private `var` for simplicity.
*/
private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
// Called after `init()` when initializing Parquet record reader. private var catalystRequestedSchema: StructType = _
/**
* Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record
* readers. Responsible for figuring out Parquet requested schema used for column pruning.
*/
override def init(context: InitContext): ReadContext = {
catalystRequestedSchema = {
// scalastyle:off jobcontext
val conf = context.getConfiguration
// scalastyle:on jobcontext
val schemaString = conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
assert(schemaString != null, "Parquet requested schema not set.")
StructType.fromString(schemaString)
}
val parquetRequestedSchema =
CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
}
/**
* Called on executor side after [[init()]], before instantiating actual Parquet record readers.
* Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
* records to Catalyst [[InternalRow]]s.
*/
override def prepareForRead( override def prepareForRead(
conf: Configuration, conf: Configuration,
keyValueMetaData: JMap[String, String], keyValueMetaData: JMap[String, String],
fileSchema: MessageType, fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[InternalRow] = { readContext: ReadContext): RecordMaterializer[InternalRow] = {
log.debug(s"Preparing for read Parquet file with message type: $fileSchema") log.debug(s"Preparing for read Parquet file with message type: $fileSchema")
val toCatalyst = new CatalystSchemaConverter(conf)
val parquetRequestedSchema = readContext.getRequestedSchema val parquetRequestedSchema = readContext.getRequestedSchema
val catalystRequestedSchema =
Option(readContext.getReadSupportMetadata).map(_.asScala).flatMap { metadata =>
metadata
// First tries to read requested schema, which may result from projections
.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
// If not available, tries to read Catalyst schema from file metadata. It's only
// available if the target file is written by Spark SQL.
.orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
}.map(StructType.fromString).getOrElse {
logInfo("Catalyst schema not available, falling back to Parquet schema")
toCatalyst.convert(parquetRequestedSchema)
}
logInfo { logInfo {
s"""Going to read the following fields from the Parquet file: s"""Going to read the following fields from the Parquet file:
| |
...@@ -69,36 +97,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with ...@@ -69,36 +97,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema) new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
} }
// Called before `prepareForRead()` when initializing Parquet record reader.
override def init(context: InitContext): ReadContext = {
val conf = {
// scalastyle:off jobcontext
context.getConfiguration
// scalastyle:on jobcontext
}
// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
// schema of this file from its metadata.
val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
// Optional schema of requested columns, in the form of a string serialized from a Catalyst
// `StructType` containing all requested columns.
val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
val parquetRequestedSchema =
maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
val catalystRequestedSchema = StructType.fromString(schemaString)
CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
}
val metadata =
Map.empty[String, String] ++
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
new ReadContext(parquetRequestedSchema, metadata.asJava)
}
} }
private[parquet] object CatalystReadSupport { private[parquet] object CatalystReadSupport {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment