From 776d183c82b424ef7c3cae30537d8afe9b9eee83 Mon Sep 17 00:00:00 2001 From: Ryan Blue <blue@apache.org> Date: Fri, 27 May 2016 16:59:38 -0700 Subject: [PATCH] [SPARK-9876][SQL] Update Parquet to 1.8.1. ## What changes were proposed in this pull request? This includes minimal changes to get Spark using the current release of Parquet, 1.8.1. ## How was this patch tested? This uses the existing Parquet tests. Author: Ryan Blue <blue@apache.org> Closes #13280 from rdblue/SPARK-9876-update-parquet. --- dev/deps/spark-deps-hadoop-2.2 | 11 ++- dev/deps/spark-deps-hadoop-2.3 | 11 ++- dev/deps/spark-deps-hadoop-2.4 | 11 ++- dev/deps/spark-deps-hadoop-2.6 | 11 ++- dev/deps/spark-deps-hadoop-2.7 | 11 ++- pom.xml | 2 +- .../SpecificParquetRecordReaderBase.java | 20 +++-- .../parquet/CatalystReadSupport.scala | 12 ++- .../parquet/CatalystSchemaConverter.scala | 16 ++++ .../datasources/parquet/ParquetFilters.scala | 83 ++++--------------- .../parquet/ParquetSchemaSuite.scala | 20 +++-- 11 files changed, 91 insertions(+), 117 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index 578691cc93..deec033c21 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -129,14 +129,13 @@ opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.8.jar -parquet-column-1.7.0.jar -parquet-common-1.7.0.jar -parquet-encoding-1.7.0.jar +parquet-column-1.8.1.jar +parquet-common-1.8.1.jar +parquet-encoding-1.8.1.jar parquet-format-2.3.0-incubating.jar -parquet-generator-1.7.0.jar -parquet-hadoop-1.7.0.jar +parquet-hadoop-1.8.1.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.7.0.jar +parquet-jackson-1.8.1.jar pmml-model-1.2.15.jar pmml-schema-1.2.15.jar protobuf-java-2.5.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index fc6306f366..43c7dd3580 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -136,14 +136,13 @@ opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.8.jar -parquet-column-1.7.0.jar -parquet-common-1.7.0.jar -parquet-encoding-1.7.0.jar +parquet-column-1.8.1.jar +parquet-common-1.8.1.jar +parquet-encoding-1.8.1.jar parquet-format-2.3.0-incubating.jar -parquet-generator-1.7.0.jar -parquet-hadoop-1.7.0.jar +parquet-hadoop-1.8.1.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.7.0.jar +parquet-jackson-1.8.1.jar pmml-model-1.2.15.jar pmml-schema-1.2.15.jar protobuf-java-2.5.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index dee1417c79..7186b305a8 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -136,14 +136,13 @@ opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.8.jar -parquet-column-1.7.0.jar -parquet-common-1.7.0.jar -parquet-encoding-1.7.0.jar +parquet-column-1.8.1.jar +parquet-common-1.8.1.jar +parquet-encoding-1.8.1.jar parquet-format-2.3.0-incubating.jar -parquet-generator-1.7.0.jar -parquet-hadoop-1.7.0.jar +parquet-hadoop-1.8.1.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.7.0.jar +parquet-jackson-1.8.1.jar pmml-model-1.2.15.jar pmml-schema-1.2.15.jar protobuf-java-2.5.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 9695661b9c..3e4ed74cc6 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -144,14 +144,13 @@ opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.8.jar -parquet-column-1.7.0.jar -parquet-common-1.7.0.jar -parquet-encoding-1.7.0.jar +parquet-column-1.8.1.jar +parquet-common-1.8.1.jar +parquet-encoding-1.8.1.jar parquet-format-2.3.0-incubating.jar -parquet-generator-1.7.0.jar -parquet-hadoop-1.7.0.jar +parquet-hadoop-1.8.1.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.7.0.jar +parquet-jackson-1.8.1.jar pmml-model-1.2.15.jar pmml-schema-1.2.15.jar protobuf-java-2.5.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 18c136ed63..6b999538a3 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -145,14 +145,13 @@ opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.8.jar -parquet-column-1.7.0.jar -parquet-common-1.7.0.jar -parquet-encoding-1.7.0.jar +parquet-column-1.8.1.jar +parquet-common-1.8.1.jar +parquet-encoding-1.8.1.jar parquet-format-2.3.0-incubating.jar -parquet-generator-1.7.0.jar -parquet-hadoop-1.7.0.jar +parquet-hadoop-1.8.1.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.7.0.jar +parquet-jackson-1.8.1.jar pmml-model-1.2.15.jar pmml-schema-1.2.15.jar protobuf-java-2.5.0.jar diff --git a/pom.xml b/pom.xml index 3fa0eeb5f0..ce9aa9aa00 100644 --- a/pom.xml +++ b/pom.xml @@ -133,7 +133,7 @@ <!-- Version used for internal directory structure --> <hive.version.short>1.2.1</hive.version.short> <derby.version>10.11.1.1</derby.version> - <parquet.version>1.7.0</parquet.version> + <parquet.version>1.8.1</parquet.version> <hive.parquet.version>1.6.0</hive.parquet.version> <jetty.version>9.2.16.v20160414</jetty.version> <javaxservlet.version>3.1.0</javaxservlet.version> diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index cbe8f78164..3f7a872ff6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -58,6 +58,8 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; import org.apache.spark.sql.types.StructType; @@ -186,15 +188,19 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo if (columns == null) { this.requestedSchema = fileSchema; } else { - Types.MessageTypeBuilder builder = Types.buildMessage(); - for (String s: columns) { - if (!fileSchema.containsField(s)) { - throw new IOException("Can only project existing columns. Unknown field: " + s + - " File schema:\n" + fileSchema); + if (columns.size() > 0) { + Types.MessageTypeBuilder builder = Types.buildMessage(); + for (String s: columns) { + if (!fileSchema.containsField(s)) { + throw new IOException("Can only project existing columns. Unknown field: " + s + + " File schema:\n" + fileSchema); + } + builder.addFields(fileSchema.getType(s)); } - builder.addFields(fileSchema.getType(s)); + this.requestedSchema = builder.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME()); + } else { + this.requestedSchema = CatalystSchemaConverter.EMPTY_MESSAGE(); } - this.requestedSchema = builder.named("spark_schema"); } this.sparkSchema = new CatalystSchemaConverter(config).convert(requestedSchema); this.reader = new ParquetFileReader(config, file, blocks, requestedSchema.getColumns()); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 850e807b86..9c885b252f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -109,10 +109,14 @@ private[parquet] object CatalystReadSupport { */ def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema) - Types - .buildMessage() - .addFields(clippedParquetFields: _*) - .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) + if (clippedParquetFields.isEmpty) { + CatalystSchemaConverter.EMPTY_MESSAGE + } else { + Types + .buildMessage() + .addFields(clippedParquetFields: _*) + .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) + } } private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 6f6340f541..3688c3e2b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -538,6 +538,22 @@ private[parquet] class CatalystSchemaConverter( private[parquet] object CatalystSchemaConverter { val SPARK_PARQUET_SCHEMA_NAME = "spark_schema" + // !! HACK ALERT !! + // + // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType, + // which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`. + // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT. + // + // To workaround this problem, here we first construct a `MessageType` with a single dummy + // field, and then remove the field to obtain an empty `MessageType`. + // + // TODO Reverts this change after upgrading parquet-mr to 1.8.2+ + val EMPTY_MESSAGE = Types + .buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy") + .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) + EMPTY_MESSAGE.getFields.clear() + def checkFieldName(name: String): Unit = { // ,;{}()\n\t= and space are special characters in Parquet schema checkConversionRequirement( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 95afdc789f..6240812501 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -22,8 +22,6 @@ import java.io.Serializable import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.OriginalType -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.spark.sql.sources import org.apache.spark.sql.types._ @@ -53,18 +51,15 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - // See https://issues.apache.org/jira/browse/SPARK-11153 - /* // Binary.fromString and Binary.fromByteArray don't accept null values case StringType => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), - Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull) + Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) case BinaryType => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), - Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull) - */ + Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) } private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -79,17 +74,14 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - // See https://issues.apache.org/jira/browse/SPARK-11153 - /* case StringType => (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), - Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull) + Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) case BinaryType => (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), - Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull) - */ + Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) } private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -102,16 +94,13 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - // See https://issues.apache.org/jira/browse/SPARK-11153 - /* case StringType => (n: String, v: Any) => FilterApi.lt(binaryColumn(n), - Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8"))) + Binary.fromString(v.asInstanceOf[String])) case BinaryType => (n: String, v: Any) => - FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) - */ + FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) } private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -124,16 +113,13 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - // See https://issues.apache.org/jira/browse/SPARK-11153 - /* case StringType => (n: String, v: Any) => FilterApi.ltEq(binaryColumn(n), - Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8"))) + Binary.fromString(v.asInstanceOf[String])) case BinaryType => (n: String, v: Any) => - FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) - */ + FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) } private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -147,15 +133,13 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) // See https://issues.apache.org/jira/browse/SPARK-11153 - /* case StringType => (n: String, v: Any) => FilterApi.gt(binaryColumn(n), - Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8"))) + Binary.fromString(v.asInstanceOf[String])) case BinaryType => (n: String, v: Any) => - FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) - */ + FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) } private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -168,16 +152,13 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - // See https://issues.apache.org/jira/browse/SPARK-11153 - /* case StringType => (n: String, v: Any) => FilterApi.gtEq(binaryColumn(n), - Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8"))) + Binary.fromString(v.asInstanceOf[String])) case BinaryType => (n: String, v: Any) => - FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) - */ + FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) } private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = { @@ -194,17 +175,14 @@ private[sql] object ParquetFilters { (n: String, v: Set[Any]) => FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]])) - // See https://issues.apache.org/jira/browse/SPARK-11153 - /* case StringType => (n: String, v: Set[Any]) => FilterApi.userDefined(binaryColumn(n), - SetInFilter(v.map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))))) + SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String])))) case BinaryType => (n: String, v: Set[Any]) => FilterApi.userDefined(binaryColumn(n), - SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]])))) - */ + SetInFilter(v.map(e => Binary.fromReusedByteArray(e.asInstanceOf[Array[Byte]])))) } /** @@ -228,8 +206,6 @@ private[sql] object ParquetFilters { def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { val dataTypeOf = getFieldMap(schema).toMap - relaxParquetValidTypeMap - // NOTE: // // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`, @@ -299,35 +275,4 @@ private[sql] object ParquetFilters { case _ => None } } - - // !! HACK ALERT !! - // - // This lazy val is a workaround for PARQUET-201, and should be removed once we upgrade to - // parquet-mr 1.8.1 or higher versions. - // - // In Parquet, not all types of columns can be used for filter push-down optimization. The set - // of valid column types is controlled by `ValidTypeMap`. Unfortunately, in parquet-mr 1.7.0 and - // prior versions, the limitation is too strict, and doesn't allow `BINARY (ENUM)` columns to be - // pushed down. - // - // This restriction is problematic for Spark SQL, because Spark SQL doesn't have a type that maps - // to Parquet original type `ENUM` directly, and always converts `ENUM` to `StringType`. Thus, - // a predicate involving a `ENUM` field can be pushed-down as a string column, which is perfectly - // legal except that it fails the `ValidTypeMap` check. - // - // Here we add `BINARY (ENUM)` into `ValidTypeMap` lazily via reflection to workaround this issue. - private lazy val relaxParquetValidTypeMap: Unit = { - val constructor = Class - .forName(classOf[ValidTypeMap].getCanonicalName + "$FullTypeDescriptor") - .getDeclaredConstructor(classOf[PrimitiveTypeName], classOf[OriginalType]) - - constructor.setAccessible(true) - val enumTypeDescriptor = constructor - .newInstance(PrimitiveTypeName.BINARY, OriginalType.ENUM) - .asInstanceOf[AnyRef] - - val addMethod = classOf[ValidTypeMap].getDeclaredMethods.find(_.getName == "add").get - addMethod.setAccessible(true) - addMethod.invoke(null, classOf[Binary], enumTypeDescriptor) - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 6db6492282..0b5038cb82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag -import org.apache.parquet.schema.MessageTypeParser +import org.apache.parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScalaReflection @@ -1065,18 +1065,26 @@ class ParquetSchemaSuite extends ParquetSchemaTest { parquetSchema: String, catalystSchema: StructType, expectedSchema: String): Unit = { + testSchemaClipping(testName, parquetSchema, catalystSchema, + MessageTypeParser.parseMessageType(expectedSchema)) + } + + private def testSchemaClipping( + testName: String, + parquetSchema: String, + catalystSchema: StructType, + expectedSchema: MessageType): Unit = { test(s"Clipping - $testName") { - val expected = MessageTypeParser.parseMessageType(expectedSchema) val actual = CatalystReadSupport.clipParquetSchema( MessageTypeParser.parseMessageType(parquetSchema), catalystSchema) try { - expected.checkContains(actual) - actual.checkContains(expected) + expectedSchema.checkContains(actual) + actual.checkContains(expectedSchema) } catch { case cause: Throwable => fail( s"""Expected clipped schema: - |$expected + |$expectedSchema |Actual clipped schema: |$actual """.stripMargin, @@ -1429,7 +1437,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { catalystSchema = new StructType(), - expectedSchema = "message root {}") + expectedSchema = CatalystSchemaConverter.EMPTY_MESSAGE) testSchemaClipping( "disjoint field sets", -- GitLab