Skip to content
Snippets Groups Projects
Commit e1139dd6 authored by Michael Armbrust's avatar Michael Armbrust
Browse files

[SPARK-3237][SQL] Fix parquet filters with UDFs

Author: Michael Armbrust <michael@databricks.com>

Closes #2153 from marmbrus/parquetFilters and squashes the following commits:

712731a [Michael Armbrust] Use closure serializer for sending filters.
1e83f80 [Michael Armbrust] Clean udf functions.
parent 3e2864e4
No related branches found
No related tags found
No related merge requests found
...@@ -18,10 +18,14 @@ ...@@ -18,10 +18,14 @@
package org.apache.spark.sql.catalyst.expressions package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.types.DataType import org.apache.spark.sql.catalyst.types.DataType
import org.apache.spark.util.ClosureCleaner
case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression]) case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression])
extends Expression { extends Expression {
// Clean function when not called with default no-arg constructor.
if (function != null) { ClosureCleaner.clean(function) }
type EvaluatedType = Any type EvaluatedType = Any
def nullable = true def nullable = true
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
package org.apache.spark.sql.parquet package org.apache.spark.sql.parquet
import java.nio.ByteBuffer
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import parquet.filter._ import parquet.filter._
...@@ -25,6 +27,7 @@ import parquet.column.ColumnReader ...@@ -25,6 +27,7 @@ import parquet.column.ColumnReader
import com.google.common.io.BaseEncoding import com.google.common.io.BaseEncoding
import org.apache.spark.SparkEnv
import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.expressions.{Predicate => CatalystPredicate} import org.apache.spark.sql.catalyst.expressions.{Predicate => CatalystPredicate}
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
...@@ -237,7 +240,8 @@ object ParquetFilters { ...@@ -237,7 +240,8 @@ object ParquetFilters {
*/ */
def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = { def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = {
if (filters.length > 0) { if (filters.length > 0) {
val serialized: Array[Byte] = SparkSqlSerializer.serialize(filters) val serialized: Array[Byte] =
SparkEnv.get.closureSerializer.newInstance().serialize(filters).array()
val encoded: String = BaseEncoding.base64().encode(serialized) val encoded: String = BaseEncoding.base64().encode(serialized)
conf.set(PARQUET_FILTER_DATA, encoded) conf.set(PARQUET_FILTER_DATA, encoded)
} }
...@@ -252,7 +256,7 @@ object ParquetFilters { ...@@ -252,7 +256,7 @@ object ParquetFilters {
val data = conf.get(PARQUET_FILTER_DATA) val data = conf.get(PARQUET_FILTER_DATA)
if (data != null) { if (data != null) {
val decoded: Array[Byte] = BaseEncoding.base64().decode(data) val decoded: Array[Byte] = BaseEncoding.base64().decode(data)
SparkSqlSerializer.deserialize(decoded) SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(decoded))
} else { } else {
Seq() Seq()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment