diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 0284ecc0d9f75b16f9c15ecf556cfed2bc748028..0c2ebb0e5b7390fceea362f58ef5cec7b5edbadd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -22,7 +22,7 @@ import scala.util.Try import org.json4s.JsonDSL._ -import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.SparkException import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering} import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser} @@ -389,6 +389,11 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru object StructType extends AbstractDataType { + /** + * A key used in field metadata to indicate that the field comes from the result of merging + * two different StructTypes that do not always contain the field. That is to say, the field + * might be missing (optional) from one of the StructTypes. + */ private[sql] val metadataKeyForOptionalField = "_OPTIONAL_" override private[sql] def defaultConcreteType: DataType = new StructType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index e0a113a1b362a1045f9bc4e08d2677fd790f69df..426263fa445a0b2fc93045ce39570afa7cefe8e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.io.Serializable - import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary @@ -26,18 +24,10 @@ import org.apache.parquet.io.api.Binary import org.apache.spark.sql.sources import org.apache.spark.sql.types._ +/** + * Some utility function to convert Spark data source filters to Parquet filters. + */ private[sql] object ParquetFilters { - case class SetInFilter[T <: Comparable[T]]( - valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable { - - override def keep(value: T): Boolean = { - value != null && valueSet.contains(value) - } - - override def canDrop(statistics: Statistics[T]): Boolean = false - - override def inverseCanDrop(statistics: Statistics[T]): Boolean = false - } private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { case BooleanType => @@ -154,36 +144,16 @@ private[sql] object ParquetFilters { FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) } - private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = { - case IntegerType => - (n: String, v: Set[Any]) => - FilterApi.userDefined(intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]])) - case LongType => - (n: String, v: Set[Any]) => - FilterApi.userDefined(longColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Long]])) - case FloatType => - (n: String, v: Set[Any]) => - FilterApi.userDefined(floatColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Float]])) - case DoubleType => - (n: String, v: Set[Any]) => - FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]])) - case StringType => - (n: String, v: Set[Any]) => - FilterApi.userDefined(binaryColumn(n), - SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String])))) - case BinaryType => - (n: String, v: Set[Any]) => - FilterApi.userDefined(binaryColumn(n), - SetInFilter(v.map(e => Binary.fromReusedByteArray(e.asInstanceOf[Array[Byte]])))) - } - /** + * Returns a map from name of the column to the data type, if predicate push down applies + * (i.e. not an optional field). + * * SPARK-11955: The optional fields will have metadata StructType.metadataKeyForOptionalField. * These fields only exist in one side of merged schemas. Due to that, we can't push down filters - * using such fields, otherwise Parquet library will throw exception. Here we filter out such - * fields. + * using such fields, otherwise Parquet library will throw exception (PARQUET-389). + * Here we filter out such fields. */ - private def getFieldMap(dataType: DataType): Array[(String, DataType)] = dataType match { + private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match { case StructType(fields) => // Here we don't flatten the fields in the nested schema but just look up through // root fields. Currently, accessing to nested fields does not push down filters @@ -191,15 +161,15 @@ private[sql] object ParquetFilters { fields.filter { f => !f.metadata.contains(StructType.metadataKeyForOptionalField) || !f.metadata.getBoolean(StructType.metadataKeyForOptionalField) - }.map(f => f.name -> f.dataType) - case _ => Array.empty[(String, DataType)] + }.map(f => f.name -> f.dataType).toMap + case _ => Map.empty[String, DataType] } /** * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { - val dataTypeOf = getFieldMap(schema).toMap + val dataTypeOf = getFieldMap(schema) // NOTE: // @@ -242,9 +212,6 @@ private[sql] object ParquetFilters { case sources.GreaterThanOrEqual(name, value) if dataTypeOf.contains(name) => makeGtEq.lift(dataTypeOf(name)).map(_(name, value)) - case sources.In(name, valueSet) => - makeInSet.lift(dataTypeOf(name)).map(_(name, valueSet.toSet)) - case sources.And(lhs, rhs) => // At here, it is not safe to just convert one side if we do not understand the // other side. Here is an example used to explain the reason. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 84fdcfea3c8f35305db262ed84ba67444b6b83eb..f59d474d00ec2b632da7192c6e2e777a37fe45cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -514,36 +514,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("SPARK-11164: test the parquet filter in") { - import testImplicits._ - withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/table1" - (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path) - - // When a filter is pushed to Parquet, Parquet can apply it to every row. - // So, we can check the number of rows returned from the Parquet - // to make sure our filter pushdown work. - val df = spark.read.parquet(path).where("b in (0,2)") - assert(stripSparkFilter(df).count == 3) - - val df1 = spark.read.parquet(path).where("not (b in (1))") - assert(stripSparkFilter(df1).count == 3) - - val df2 = spark.read.parquet(path).where("not (b in (1,3) or a <= 2)") - assert(stripSparkFilter(df2).count == 2) - - val df3 = spark.read.parquet(path).where("not (b in (1,3) and a <= 2)") - assert(stripSparkFilter(df3).count == 4) - - val df4 = spark.read.parquet(path).where("not (a <= 2)") - assert(stripSparkFilter(df4).count == 3) - } - } - } - } - test("SPARK-16371 Do not push down filters when inner name and outer name are the same") { withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df => // Here the schema becomes as below: