Skip to content
Snippets Groups Projects
Commit 0887e5e8 authored by Cheng Lian's avatar Cheng Lian
Browse files

[SPARK-11153][SQL] Disables Parquet filter push-down for string and binary columns

Due to PARQUET-251, `BINARY` columns in existing Parquet files may be written with corrupted statistics information. This information is used by filter push-down optimization. Since Spark 1.5 turns on Parquet filter push-down by default, we may end up with wrong query results. PARQUET-251 has been fixed in parquet-mr 1.8.1, but Spark 1.5 is still using 1.7.0.

This affects all Spark SQL data types that can be mapped to Parquet {{BINARY}}, namely:

- `StringType`

- `BinaryType`

- `DecimalType`

  (But Spark SQL doesn't support pushing down filters involving `DecimalType` columns for now.)

To avoid wrong query results, we should disable filter push-down for columns of `StringType` and `BinaryType` until we upgrade to parquet-mr 1.8.

Author: Cheng Lian <lian@databricks.com>

Closes #9152 from liancheng/spark-11153.workaround-parquet-251.
parent a3ab6714
No related branches found
No related tags found
No related merge requests found
...@@ -60,6 +60,8 @@ private[sql] object ParquetFilters { ...@@ -60,6 +60,8 @@ private[sql] object ParquetFilters {
case DoubleType => case DoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
// See https://issues.apache.org/jira/browse/SPARK-11153
/*
// Binary.fromString and Binary.fromByteArray don't accept null values // Binary.fromString and Binary.fromByteArray don't accept null values
case StringType => case StringType =>
(n: String, v: Any) => FilterApi.eq( (n: String, v: Any) => FilterApi.eq(
...@@ -69,6 +71,7 @@ private[sql] object ParquetFilters { ...@@ -69,6 +71,7 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq( (n: String, v: Any) => FilterApi.eq(
binaryColumn(n), binaryColumn(n),
Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull) Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
*/
} }
private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
...@@ -82,6 +85,9 @@ private[sql] object ParquetFilters { ...@@ -82,6 +85,9 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType => case DoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType => case StringType =>
(n: String, v: Any) => FilterApi.notEq( (n: String, v: Any) => FilterApi.notEq(
binaryColumn(n), binaryColumn(n),
...@@ -90,6 +96,7 @@ private[sql] object ParquetFilters { ...@@ -90,6 +96,7 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq( (n: String, v: Any) => FilterApi.notEq(
binaryColumn(n), binaryColumn(n),
Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull) Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
*/
} }
private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
...@@ -101,6 +108,9 @@ private[sql] object ParquetFilters { ...@@ -101,6 +108,9 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float]) (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType => case DoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType => case StringType =>
(n: String, v: Any) => (n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), FilterApi.lt(binaryColumn(n),
...@@ -108,6 +118,7 @@ private[sql] object ParquetFilters { ...@@ -108,6 +118,7 @@ private[sql] object ParquetFilters {
case BinaryType => case BinaryType =>
(n: String, v: Any) => (n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
*/
} }
private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
...@@ -119,6 +130,9 @@ private[sql] object ParquetFilters { ...@@ -119,6 +130,9 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType => case DoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType => case StringType =>
(n: String, v: Any) => (n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), FilterApi.ltEq(binaryColumn(n),
...@@ -126,6 +140,7 @@ private[sql] object ParquetFilters { ...@@ -126,6 +140,7 @@ private[sql] object ParquetFilters {
case BinaryType => case BinaryType =>
(n: String, v: Any) => (n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
*/
} }
private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
...@@ -137,6 +152,9 @@ private[sql] object ParquetFilters { ...@@ -137,6 +152,9 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float]) (n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType => case DoubleType =>
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType => case StringType =>
(n: String, v: Any) => (n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), FilterApi.gt(binaryColumn(n),
...@@ -144,6 +162,7 @@ private[sql] object ParquetFilters { ...@@ -144,6 +162,7 @@ private[sql] object ParquetFilters {
case BinaryType => case BinaryType =>
(n: String, v: Any) => (n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
*/
} }
private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
...@@ -155,6 +174,9 @@ private[sql] object ParquetFilters { ...@@ -155,6 +174,9 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType => case DoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType => case StringType =>
(n: String, v: Any) => (n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), FilterApi.gtEq(binaryColumn(n),
...@@ -162,6 +184,7 @@ private[sql] object ParquetFilters { ...@@ -162,6 +184,7 @@ private[sql] object ParquetFilters {
case BinaryType => case BinaryType =>
(n: String, v: Any) => (n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
*/
} }
private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = { private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
...@@ -177,6 +200,9 @@ private[sql] object ParquetFilters { ...@@ -177,6 +200,9 @@ private[sql] object ParquetFilters {
case DoubleType => case DoubleType =>
(n: String, v: Set[Any]) => (n: String, v: Set[Any]) =>
FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]])) FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType => case StringType =>
(n: String, v: Set[Any]) => (n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n), FilterApi.userDefined(binaryColumn(n),
...@@ -185,6 +211,7 @@ private[sql] object ParquetFilters { ...@@ -185,6 +211,7 @@ private[sql] object ParquetFilters {
(n: String, v: Set[Any]) => (n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n), FilterApi.userDefined(binaryColumn(n),
SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]])))) SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
*/
} }
/** /**
......
...@@ -219,7 +219,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex ...@@ -219,7 +219,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
} }
} }
test("filter pushdown - string") { // See https://issues.apache.org/jira/browse/SPARK-11153
ignore("filter pushdown - string") {
withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df => withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate( checkFilterPredicate(
...@@ -247,7 +248,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex ...@@ -247,7 +248,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
} }
} }
test("filter pushdown - binary") { // See https://issues.apache.org/jira/browse/SPARK-11153
ignore("filter pushdown - binary") {
implicit class IntToBinary(int: Int) { implicit class IntToBinary(int: Int) {
def b: Array[Byte] = int.toString.getBytes("UTF-8") def b: Array[Byte] = int.toString.getBytes("UTF-8")
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment