Skip to content
Snippets Groups Projects
Commit ae2370e7 authored by Cheng Lian's avatar Cheng Lian Committed by Yin Huai
Browse files

[SPARK-10005] [SQL] Fixes schema merging for nested structs

In case of schema merging, we only handled first level fields when converting Parquet groups to `InternalRow`s. Nested struct fields are not properly handled.

For example, the schema of a Parquet file to be read can be:

```
message individual {
  required group f1 {
    optional binary f11 (utf8);
  }
}
```

while the global schema is:

```
message global {
  required group f1 {
    optional binary f11 (utf8);
    optional int32 f12;
  }
}
```

This PR fixes this issue by padding missing fields when creating actual converters.

Author: Cheng Lian <lian@databricks.com>

Closes #8228 from liancheng/spark-10005/nested-schema-merging.
parent cf016075
No related branches found
No related tags found
No related merge requests found
......@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
// Called after `init()` when initializing Parquet record reader.
override def prepareForRead(
conf: Configuration,
keyValueMetaData: JMap[String, String],
......@@ -51,19 +52,30 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
// available if the target file is written by Spark SQL.
.orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
}.map(StructType.fromString).getOrElse {
logDebug("Catalyst schema not available, falling back to Parquet schema")
logInfo("Catalyst schema not available, falling back to Parquet schema")
toCatalyst.convert(parquetRequestedSchema)
}
logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema")
logInfo {
s"""Going to read the following fields from the Parquet file:
|
|Parquet form:
|$parquetRequestedSchema
|
|Catalyst form:
|$catalystRequestedSchema
""".stripMargin
}
new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
}
// Called before `prepareForRead()` when initializing Parquet record reader.
override def init(context: InitContext): ReadContext = {
val conf = context.getConfiguration
// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
// schema of this file from its the metadata.
// schema of this file from its metadata.
val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
// Optional schema of requested columns, in the form of a string serialized from a Catalyst
......@@ -141,7 +153,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema")
new ReadContext(parquetRequestedSchema, metadata)
}
}
......
......@@ -25,9 +25,10 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.OriginalType.LIST
import org.apache.parquet.schema.OriginalType.{LIST, INT_32, UTF8}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}
import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
......@@ -88,12 +89,54 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
}
/**
* A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark SQL [[InternalRow]]s.
* Since any Parquet record is also a struct, this converter can also be used as root converter.
* A [[CatalystRowConverter]] is used to convert Parquet records into Catalyst [[InternalRow]]s.
* Since Catalyst `StructType` is also a Parquet record, this converter can be used as root
* converter. Take the following Parquet type as an example:
* {{{
* message root {
* required int32 f1;
* optional group f2 {
* required double f21;
* optional binary f22 (utf8);
* }
* }
* }}}
* 5 converters will be created:
*
* - a root [[CatalystRowConverter]] for [[MessageType]] `root`, which contains:
* - a [[CatalystPrimitiveConverter]] for required [[INT_32]] field `f1`, and
* - a nested [[CatalystRowConverter]] for optional [[GroupType]] `f2`, which contains:
* - a [[CatalystPrimitiveConverter]] for required [[DOUBLE]] field `f21`, and
* - a [[CatalystStringConverter]] for optional [[UTF8]] string field `f22`
*
* When used as a root converter, [[NoopUpdater]] should be used since root converters don't have
* any "parent" container.
*
* @note Constructor argument [[parquetType]] refers to requested fields of the actual schema of the
* Parquet file being read, while constructor argument [[catalystType]] refers to requested
* fields of the global schema. The key difference is that, in case of schema merging,
* [[parquetType]] can be a subset of [[catalystType]]. For example, it's possible to have
* the following [[catalystType]]:
* {{{
* new StructType()
* .add("f1", IntegerType, nullable = false)
* .add("f2", StringType, nullable = true)
* .add("f3", new StructType()
* .add("f31", DoubleType, nullable = false)
* .add("f32", IntegerType, nullable = true)
* .add("f33", StringType, nullable = true), nullable = false)
* }}}
* and the following [[parquetType]] (`f2` and `f32` are missing):
* {{{
* message root {
* required int32 f1;
* required group f3 {
* required double f31;
* optional binary f33 (utf8);
* }
* }
* }}}
*
* @param parquetType Parquet schema of Parquet records
* @param catalystType Spark SQL schema that corresponds to the Parquet record type
* @param updater An updater which propagates converted field values to the parent container
......@@ -126,7 +169,24 @@ private[parquet] class CatalystRowConverter(
// Converters for each field.
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
parquetType.getFields.zip(catalystType).zipWithIndex.map {
// In case of schema merging, `parquetType` can be a subset of `catalystType`. We need to pad
// those missing fields and create converters for them, although values of these fields are
// always null.
val paddedParquetFields = {
val parquetFields = parquetType.getFields
val parquetFieldNames = parquetFields.map(_.getName).toSet
val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name))
// We don't need to worry about feature flag arguments like `assumeBinaryIsString` when
// creating the schema converter here, since values of missing fields are always null.
val toParquet = new CatalystSchemaConverter()
(parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f =>
catalystType.indexWhere(_.name == f.getName)
}
}
paddedParquetFields.zip(catalystType).zipWithIndex.map {
case ((parquetFieldType, catalystField), ordinal) =>
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal))
......
......@@ -72,18 +72,9 @@ private[parquet] class CatalystSchemaConverter(
followParquetFormatSpec = conf.followParquetFormatSpec)
def this(conf: Configuration) = this(
assumeBinaryIsString =
conf.getBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get),
assumeInt96IsTimestamp =
conf.getBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get),
followParquetFormatSpec =
conf.getBoolean(
SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key,
SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get))
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
followParquetFormatSpec = conf.get(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key).toBoolean)
/**
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
......
......@@ -21,7 +21,7 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
import org.apache.spark.sql._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
......@@ -201,4 +201,32 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
assert(Decimal("67123.45") === Decimal(decimal))
}
}
test("SPARK-10005 Schema merging for nested struct") {
val sqlContext = _sqlContext
import sqlContext.implicits._
withTempPath { dir =>
val path = dir.getCanonicalPath
def append(df: DataFrame): Unit = {
df.write.mode(SaveMode.Append).parquet(path)
}
// Note that both the following two DataFrames contain a single struct column with multiple
// nested fields.
append((1 to 2).map(i => Tuple1((i, i))).toDF())
append((1 to 2).map(i => Tuple1((i, i, i))).toDF())
withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
checkAnswer(
sqlContext.read.option("mergeSchema", "true").parquet(path),
Seq(
Row(Row(1, 1, null)),
Row(Row(2, 2, null)),
Row(Row(1, 1, 1)),
Row(Row(2, 2, 2))))
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment