Skip to content
Snippets Groups Projects
  • Nick Pentreath's avatar
    f971d6cb
    SPARK-1416: PySpark support for SequenceFile and Hadoop InputFormats · f971d6cb
    Nick Pentreath authored
    So I finally resurrected this PR. It seems the old one against the incubator mirror is no longer available, so I cannot reference it.
    
    This adds initial support for reading Hadoop ```SequenceFile```s, as well as arbitrary Hadoop ```InputFormat```s, in PySpark.
    
    # Overview
    The basics are as follows:
    1. ```PythonRDD``` object contains the relevant methods, that are in turn invoked by ```SparkContext``` in PySpark
    2. The SequenceFile or InputFormat is read on the Scala side and converted from ```Writable``` instances to the relevant Scala classes (in the case of primitives)
    3. Pyrolite is used to serialize Java objects. If this fails, the fallback is ```toString```
    4. ```PickleSerializer``` on the Python side deserializes.
    
    This works "out the box" for simple ```Writable```s:
    * ```Text```
    * ```IntWritable```, ```DoubleWritable```, ```FloatWritable```
    * ```NullWritable```
    * ```BooleanWritable```
    * ```BytesWritable```
    * ```MapWritable```
    
    It also works for simple, "struct-like" classes. Due to the way Pyrolite works, this requires that the classes satisfy the JavaBeans convenstions (i.e. with fields and a no-arg constructor and getters/setters). (Perhaps in future some sugar for case classes and reflection could be added).
    
    I've tested it out with ```ESInputFormat```  as an example and it works very nicely:
    ```python
    conf = {"es.resource" : "index/type" }
    rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
    rdd.first()
    ```
    
    I suspect for things like HBase/Cassandra it will be a bit trickier to get it to work out the box.
    
    # Some things still outstanding:
    1. ~~Requires ```msgpack-python``` and will fail without it. As originally discussed with Josh, add a ```as_strings``` argument that defaults to ```False```, that can be used if ```msgpack-python``` is not available~~
    2. ~~I see from https://github.com/apache/spark/pull/363 that Pyrolite is being used there for SerDe between Scala and Python. @ahirreddy @mateiz what is the plan behind this - is Pyrolite preferred? It seems from a cursory glance that adapting the ```msgpack```-based SerDe here to use Pyrolite wouldn't be too hard~~
    3. ~~Support the key and value "wrapper" that would allow a Scala/Java function to be plugged in that would transform whatever the key/value Writable class is into something that can be serialized (e.g. convert some custom Writable to a JavaBean or ```java.util.Map``` that can be easily serialized)~~
    4. Support ```saveAsSequenceFile``` and ```saveAsHadoopFile``` etc. This would require SerDe in the reverse direction, that can be handled by Pyrolite. Will work on this as a separate PR
    
    Author: Nick Pentreath <nick.pentreath@gmail.com>
    
    Closes #455 from MLnick/pyspark-inputformats and squashes the following commits:
    
    268df7e [Nick Pentreath] Documentation changes mer @pwendell comments
    761269b [Nick Pentreath] Address @pwendell comments, simplify default writable conversions and remove registry.
    4c972d8 [Nick Pentreath] Add license headers
    d150431 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    cde6af9 [Nick Pentreath] Parameterize converter trait
    5ebacfa [Nick Pentreath] Update docs for PySpark input formats
    a985492 [Nick Pentreath] Move Converter examples to own package
    365d0be [Nick Pentreath] Make classes private[python]. Add docs and @Experimental annotation to Converter interface.
    eeb8205 [Nick Pentreath] Fix path relative to SPARK_HOME in tests
    1eaa08b [Nick Pentreath] HBase -> Cassandra app name oversight
    3f90c3e [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    2c18513 [Nick Pentreath] Add examples for reading HBase and Cassandra InputFormats from Python
    b65606f [Nick Pentreath] Add converter interface
    5757f6e [Nick Pentreath] Default key/value classes for sequenceFile asre None
    085b55f [Nick Pentreath] Move input format tests to tests.py and clean up docs
    43eb728 [Nick Pentreath] PySpark InputFormats docs into programming guide
    94beedc [Nick Pentreath] Clean up args in PythonRDD. Set key/value converter defaults to None for PySpark context.py methods
    1a4a1d6 [Nick Pentreath] Address @mateiz style comments
    01e0813 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    15a7d07 [Nick Pentreath] Remove default args for key/value classes. Arg names to camelCase
    9fe6bd5 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    84fe8e3 [Nick Pentreath] Python programming guide space formatting
    d0f52b6 [Nick Pentreath] Python programming guide
    7caa73a [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    93ef995 [Nick Pentreath] Add back context.py changes
    9ef1896 [Nick Pentreath] Recover earlier changes lost in previous merge for serializers.py
    077ecb2 [Nick Pentreath] Recover earlier changes lost in previous merge for context.py
    5af4770 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    35b8e3a [Nick Pentreath] Another fix for test ordering
    bef3afb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    e001b94 [Nick Pentreath] Fix test failures due to ordering
    78978d9 [Nick Pentreath] Add doc for SequenceFile and InputFormat support to Python programming guide
    64eb051 [Nick Pentreath] Scalastyle fix
    e7552fa [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    44f2857 [Nick Pentreath] Remove msgpack dependency and switch serialization to Pyrolite, plus some clean up and refactoring
    c0ebfb6 [Nick Pentreath] Change sequencefile test data generator to easily be called from PySpark tests
    1d7c17c [Nick Pentreath] Amend tests to auto-generate sequencefile data in temp dir
    17a656b [Nick Pentreath] remove binary sequencefile for tests
    f60959e [Nick Pentreath] Remove msgpack dependency and serializer from PySpark
    450e0a2 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    31a2fff [Nick Pentreath] Scalastyle fixes
    fc5099e [Nick Pentreath] Add Apache license headers
    4e08983 [Nick Pentreath] Clean up docs for PySpark context methods
    b20ec7e [Nick Pentreath] Clean up merge duplicate dependencies
    951c117 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    f6aac55 [Nick Pentreath] Bring back msgpack
    9d2256e [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    1bbbfb0 [Nick Pentreath] Clean up SparkBuild from merge
    a67dfad [Nick Pentreath] Clean up Msgpack serialization and registering
    7237263 [Nick Pentreath] Add back msgpack serializer and hadoop file code lost during merging
    25da1ca [Nick Pentreath] Add generator for nulls, bools, bytes and maps
    65360d5 [Nick Pentreath] Adding test SequenceFiles
    0c612e5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    d72bf18 [Nick Pentreath] msgpack
    dd57922 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    e67212a [Nick Pentreath] Add back msgpack dependency
    f2d76a0 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    41856a5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    97ef708 [Nick Pentreath] Remove old writeToStream
    2beeedb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    795a763 [Nick Pentreath] Change name to WriteInputFormatTestDataGenerator. Cleanup some var names. Use SPARK_HOME in path for writing test sequencefile data.
    174f520 [Nick Pentreath] Add back graphx settings
    703ee65 [Nick Pentreath] Add back msgpack
    619c0fa [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    1c8efbc [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    eb40036 [Nick Pentreath] Remove unused comment lines
    4d7ef2e [Nick Pentreath] Fix indentation
    f1d73e3 [Nick Pentreath] mergeConfs returns a copy rather than mutating one of the input arguments
    0f5cd84 [Nick Pentreath] Remove unused pair UTF8 class. Add comments to msgpack deserializer
    4294cbb [Nick Pentreath] Add old Hadoop api methods. Clean up and expand comments. Clean up argument names
    818a1e6 [Nick Pentreath] Add seqencefile and Hadoop InputFormat support to PythonRDD
    4e7c9e3 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    c304cc8 [Nick Pentreath] Adding supporting sequncefiles for tests. Cleaning up
    4b0a43f [Nick Pentreath] Refactoring utils into own objects. Cleaning up old commented-out code
    d86325f [Nick Pentreath] Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop InputFormat
    f971d6cb
    History
    SPARK-1416: PySpark support for SequenceFile and Hadoop InputFormats
    Nick Pentreath authored
    So I finally resurrected this PR. It seems the old one against the incubator mirror is no longer available, so I cannot reference it.
    
    This adds initial support for reading Hadoop ```SequenceFile```s, as well as arbitrary Hadoop ```InputFormat```s, in PySpark.
    
    # Overview
    The basics are as follows:
    1. ```PythonRDD``` object contains the relevant methods, that are in turn invoked by ```SparkContext``` in PySpark
    2. The SequenceFile or InputFormat is read on the Scala side and converted from ```Writable``` instances to the relevant Scala classes (in the case of primitives)
    3. Pyrolite is used to serialize Java objects. If this fails, the fallback is ```toString```
    4. ```PickleSerializer``` on the Python side deserializes.
    
    This works "out the box" for simple ```Writable```s:
    * ```Text```
    * ```IntWritable```, ```DoubleWritable```, ```FloatWritable```
    * ```NullWritable```
    * ```BooleanWritable```
    * ```BytesWritable```
    * ```MapWritable```
    
    It also works for simple, "struct-like" classes. Due to the way Pyrolite works, this requires that the classes satisfy the JavaBeans convenstions (i.e. with fields and a no-arg constructor and getters/setters). (Perhaps in future some sugar for case classes and reflection could be added).
    
    I've tested it out with ```ESInputFormat```  as an example and it works very nicely:
    ```python
    conf = {"es.resource" : "index/type" }
    rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
    rdd.first()
    ```
    
    I suspect for things like HBase/Cassandra it will be a bit trickier to get it to work out the box.
    
    # Some things still outstanding:
    1. ~~Requires ```msgpack-python``` and will fail without it. As originally discussed with Josh, add a ```as_strings``` argument that defaults to ```False```, that can be used if ```msgpack-python``` is not available~~
    2. ~~I see from https://github.com/apache/spark/pull/363 that Pyrolite is being used there for SerDe between Scala and Python. @ahirreddy @mateiz what is the plan behind this - is Pyrolite preferred? It seems from a cursory glance that adapting the ```msgpack```-based SerDe here to use Pyrolite wouldn't be too hard~~
    3. ~~Support the key and value "wrapper" that would allow a Scala/Java function to be plugged in that would transform whatever the key/value Writable class is into something that can be serialized (e.g. convert some custom Writable to a JavaBean or ```java.util.Map``` that can be easily serialized)~~
    4. Support ```saveAsSequenceFile``` and ```saveAsHadoopFile``` etc. This would require SerDe in the reverse direction, that can be handled by Pyrolite. Will work on this as a separate PR
    
    Author: Nick Pentreath <nick.pentreath@gmail.com>
    
    Closes #455 from MLnick/pyspark-inputformats and squashes the following commits:
    
    268df7e [Nick Pentreath] Documentation changes mer @pwendell comments
    761269b [Nick Pentreath] Address @pwendell comments, simplify default writable conversions and remove registry.
    4c972d8 [Nick Pentreath] Add license headers
    d150431 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    cde6af9 [Nick Pentreath] Parameterize converter trait
    5ebacfa [Nick Pentreath] Update docs for PySpark input formats
    a985492 [Nick Pentreath] Move Converter examples to own package
    365d0be [Nick Pentreath] Make classes private[python]. Add docs and @Experimental annotation to Converter interface.
    eeb8205 [Nick Pentreath] Fix path relative to SPARK_HOME in tests
    1eaa08b [Nick Pentreath] HBase -> Cassandra app name oversight
    3f90c3e [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    2c18513 [Nick Pentreath] Add examples for reading HBase and Cassandra InputFormats from Python
    b65606f [Nick Pentreath] Add converter interface
    5757f6e [Nick Pentreath] Default key/value classes for sequenceFile asre None
    085b55f [Nick Pentreath] Move input format tests to tests.py and clean up docs
    43eb728 [Nick Pentreath] PySpark InputFormats docs into programming guide
    94beedc [Nick Pentreath] Clean up args in PythonRDD. Set key/value converter defaults to None for PySpark context.py methods
    1a4a1d6 [Nick Pentreath] Address @mateiz style comments
    01e0813 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    15a7d07 [Nick Pentreath] Remove default args for key/value classes. Arg names to camelCase
    9fe6bd5 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    84fe8e3 [Nick Pentreath] Python programming guide space formatting
    d0f52b6 [Nick Pentreath] Python programming guide
    7caa73a [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    93ef995 [Nick Pentreath] Add back context.py changes
    9ef1896 [Nick Pentreath] Recover earlier changes lost in previous merge for serializers.py
    077ecb2 [Nick Pentreath] Recover earlier changes lost in previous merge for context.py
    5af4770 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    35b8e3a [Nick Pentreath] Another fix for test ordering
    bef3afb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    e001b94 [Nick Pentreath] Fix test failures due to ordering
    78978d9 [Nick Pentreath] Add doc for SequenceFile and InputFormat support to Python programming guide
    64eb051 [Nick Pentreath] Scalastyle fix
    e7552fa [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    44f2857 [Nick Pentreath] Remove msgpack dependency and switch serialization to Pyrolite, plus some clean up and refactoring
    c0ebfb6 [Nick Pentreath] Change sequencefile test data generator to easily be called from PySpark tests
    1d7c17c [Nick Pentreath] Amend tests to auto-generate sequencefile data in temp dir
    17a656b [Nick Pentreath] remove binary sequencefile for tests
    f60959e [Nick Pentreath] Remove msgpack dependency and serializer from PySpark
    450e0a2 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    31a2fff [Nick Pentreath] Scalastyle fixes
    fc5099e [Nick Pentreath] Add Apache license headers
    4e08983 [Nick Pentreath] Clean up docs for PySpark context methods
    b20ec7e [Nick Pentreath] Clean up merge duplicate dependencies
    951c117 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    f6aac55 [Nick Pentreath] Bring back msgpack
    9d2256e [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    1bbbfb0 [Nick Pentreath] Clean up SparkBuild from merge
    a67dfad [Nick Pentreath] Clean up Msgpack serialization and registering
    7237263 [Nick Pentreath] Add back msgpack serializer and hadoop file code lost during merging
    25da1ca [Nick Pentreath] Add generator for nulls, bools, bytes and maps
    65360d5 [Nick Pentreath] Adding test SequenceFiles
    0c612e5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    d72bf18 [Nick Pentreath] msgpack
    dd57922 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    e67212a [Nick Pentreath] Add back msgpack dependency
    f2d76a0 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    41856a5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
    97ef708 [Nick Pentreath] Remove old writeToStream
    2beeedb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    795a763 [Nick Pentreath] Change name to WriteInputFormatTestDataGenerator. Cleanup some var names. Use SPARK_HOME in path for writing test sequencefile data.
    174f520 [Nick Pentreath] Add back graphx settings
    703ee65 [Nick Pentreath] Add back msgpack
    619c0fa [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    1c8efbc [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    eb40036 [Nick Pentreath] Remove unused comment lines
    4d7ef2e [Nick Pentreath] Fix indentation
    f1d73e3 [Nick Pentreath] mergeConfs returns a copy rather than mutating one of the input arguments
    0f5cd84 [Nick Pentreath] Remove unused pair UTF8 class. Add comments to msgpack deserializer
    4294cbb [Nick Pentreath] Add old Hadoop api methods. Clean up and expand comments. Clean up argument names
    818a1e6 [Nick Pentreath] Add seqencefile and Hadoop InputFormat support to PythonRDD
    4e7c9e3 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
    c304cc8 [Nick Pentreath] Adding supporting sequncefiles for tests. Cleaning up
    4b0a43f [Nick Pentreath] Refactoring utils into own objects. Cleaning up old commented-out code
    d86325f [Nick Pentreath] Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop InputFormat