Skip to content
Snippets Groups Projects
Commit 24823293 authored by Uri Laserson's avatar Uri Laserson Committed by Matei Zaharia
Browse files

[SPARK-3389] Add Converter for ease of Parquet reading in PySpark

https://issues.apache.org/jira/browse/SPARK-3389

Author: Uri Laserson <laserson@cloudera.com>

Closes #2256 from laserson/SPARK-3389 and squashes the following commits:

0ed363e [Uri Laserson] PEP8'd the python file
0b4b380 [Uri Laserson] Moved converter to examples and added python example
eecf4dc [Uri Laserson] [SPARK-3389] Add Converter for ease of Parquet reading in PySpark
parent 5b922bb4
No related branches found
No related tags found
No related merge requests found
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
from pyspark import SparkContext
"""
Read data file users.parquet in local Spark distro:
$ cd $SPARK_HOME
$ export AVRO_PARQUET_JARS=/path/to/parquet-avro-1.5.0.jar
$ ./bin/spark-submit --driver-class-path /path/to/example/jar \\
--jars $AVRO_PARQUET_JARS \\
./examples/src/main/python/parquet_inputformat.py \\
examples/src/main/resources/users.parquet
<...lots of log output...>
{u'favorite_color': None, u'name': u'Alyssa', u'favorite_numbers': [3, 9, 15, 20]}
{u'favorite_color': u'red', u'name': u'Ben', u'favorite_numbers': []}
<...more log output...>
"""
if __name__ == "__main__":
if len(sys.argv) != 2:
print >> sys.stderr, """
Usage: parquet_inputformat.py <data_file>
Run with example jar:
./bin/spark-submit --driver-class-path /path/to/example/jar \\
/path/to/examples/parquet_inputformat.py <data_file>
Assumes you have Parquet data stored in <data_file>.
"""
exit(-1)
path = sys.argv[1]
sc = SparkContext(appName="ParquetInputFormat")
parquet_rdd = sc.newAPIHadoopFile(
path,
'parquet.avro.AvroParquetInputFormat',
'java.lang.Void',
'org.apache.avro.generic.IndexedRecord',
valueConverter='org.apache.spark.examples.pythonconverters.IndexedRecordToJavaConverter')
output = parquet_rdd.map(lambda x: x[1]).collect()
for k in output:
print k
{"type": "record", "namespace": "example.avro", "name": "User", "fields": [{"type": "string", "name": "name"}, {"type": ["string", "null"], "name": "favorite_color"}, {"type": {"items": "int", "type": "array"}, "name": "favorite_numbers"}]}
\ No newline at end of file
File added
......@@ -30,21 +30,28 @@ import org.apache.spark.api.python.Converter
import org.apache.spark.SparkException
/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts
* an Avro Record wrapped in an AvroKey (or AvroValue) to a Java Map. It tries
* to work with all 3 Avro data mappings (Generic, Specific and Reflect).
*/
class AvroWrapperToJavaConverter extends Converter[Any, Any] {
override def convert(obj: Any): Any = {
object AvroConversionUtil extends Serializable {
def fromAvro(obj: Any, schema: Schema): Any = {
if (obj == null) {
return null
}
obj.asInstanceOf[AvroWrapper[_]].datum() match {
case null => null
case record: IndexedRecord => unpackRecord(record)
case other => throw new SparkException(
s"Unsupported top-level Avro data type ${other.getClass.getName}")
schema.getType match {
case UNION => unpackUnion(obj, schema)
case ARRAY => unpackArray(obj, schema)
case FIXED => unpackFixed(obj, schema)
case MAP => unpackMap(obj, schema)
case BYTES => unpackBytes(obj)
case RECORD => unpackRecord(obj)
case STRING => obj.toString
case ENUM => obj.toString
case NULL => obj
case BOOLEAN => obj
case DOUBLE => obj
case FLOAT => obj
case INT => obj
case LONG => obj
case other => throw new SparkException(
s"Unknown Avro schema type ${other.getName}")
}
}
......@@ -103,28 +110,37 @@ class AvroWrapperToJavaConverter extends Converter[Any, Any] {
"Unions may only consist of a concrete type and null")
}
}
}
def fromAvro(obj: Any, schema: Schema): Any = {
/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts
* an Avro IndexedRecord (e.g., derived from AvroParquetInputFormat) to a Java Map.
*/
class IndexedRecordToJavaConverter extends Converter[IndexedRecord, JMap[String, Any]]{
override def convert(record: IndexedRecord): JMap[String, Any] = {
if (record == null) {
return null
}
val map = new java.util.HashMap[String, Any]
AvroConversionUtil.unpackRecord(record)
}
}
/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts
* an Avro Record wrapped in an AvroKey (or AvroValue) to a Java Map. It tries
* to work with all 3 Avro data mappings (Generic, Specific and Reflect).
*/
class AvroWrapperToJavaConverter extends Converter[Any, Any] {
override def convert(obj: Any): Any = {
if (obj == null) {
return null
}
schema.getType match {
case UNION => unpackUnion(obj, schema)
case ARRAY => unpackArray(obj, schema)
case FIXED => unpackFixed(obj, schema)
case MAP => unpackMap(obj, schema)
case BYTES => unpackBytes(obj)
case RECORD => unpackRecord(obj)
case STRING => obj.toString
case ENUM => obj.toString
case NULL => obj
case BOOLEAN => obj
case DOUBLE => obj
case FLOAT => obj
case INT => obj
case LONG => obj
case other => throw new SparkException(
s"Unknown Avro schema type ${other.getName}")
obj.asInstanceOf[AvroWrapper[_]].datum() match {
case null => null
case record: IndexedRecord => AvroConversionUtil.unpackRecord(record)
case other => throw new SparkException(
s"Unsupported top-level Avro data type ${other.getClass.getName}")
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment