Skip to content
Snippets Groups Projects
Commit 59db9e9c authored by hyukjinkwon's avatar hyukjinkwon Committed by Cheng Lian
Browse files

[SPARK-11103][SQL] Filter applied on Merged Parquet shema with new column fail

When enabling mergedSchema and predicate filter, this fails since Parquet does not accept filters pushed down when the columns of the filters do not exist in the schema.
This is related with Parquet issue (https://issues.apache.org/jira/browse/PARQUET-389).

For now, it just simply disables predicate push down when using merged schema in this PR.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #9327 from HyukjinKwon/SPARK-11103.
parent 86d65265
No related branches found
No related tags found
No related merge requests found
......@@ -292,6 +292,10 @@ private[sql] class ParquetRelation(
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
// When merging schemas is enabled and the column of the given filter does not exist,
// Parquet emits an exception which is an issue of Parquet (PARQUET-389).
val safeParquetFilterPushDown = !shouldMergeSchemas && parquetFilterPushDown
// Parquet row group size. We will use this value as the value for
// mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
// of these flags are smaller than the parquet row group size.
......@@ -305,7 +309,7 @@ private[sql] class ParquetRelation(
dataSchema,
parquetBlockSize,
useMetadataCache,
parquetFilterPushDown,
safeParquetFilterPushDown,
assumeBinaryIsString,
assumeInt96IsTimestamp) _
......
......@@ -316,4 +316,24 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}
test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
import testImplicits._
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
withTempPath { dir =>
var pathOne = s"${dir.getCanonicalPath}/table1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
var pathTwo = s"${dir.getCanonicalPath}/table2"
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
// If the "c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits. This is a Parquet issue (PARQUET-389).
checkAnswer(
sqlContext.read.parquet(pathOne, pathTwo).filter("c = 1"),
(1 to 1).map(i => Row(i, i.toString, null)))
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment