Skip to content
Snippets Groups Projects
  1. Jun 10, 2014
    • Nick Pentreath's avatar
      SPARK-1416: PySpark support for SequenceFile and Hadoop InputFormats · f971d6cb
      Nick Pentreath authored
      So I finally resurrected this PR. It seems the old one against the incubator mirror is no longer available, so I cannot reference it.
      
      This adds initial support for reading Hadoop ```SequenceFile```s, as well as arbitrary Hadoop ```InputFormat```s, in PySpark.
      
      # Overview
      The basics are as follows:
      1. ```PythonRDD``` object contains the relevant methods, that are in turn invoked by ```SparkContext``` in PySpark
      2. The SequenceFile or InputFormat is read on the Scala side and converted from ```Writable``` instances to the relevant Scala classes (in the case of primitives)
      3. Pyrolite is used to serialize Java objects. If this fails, the fallback is ```toString```
      4. ```PickleSerializer``` on the Python side deserializes.
      
      This works "out the box" for simple ```Writable```s:
      * ```Text```
      * ```IntWritable```, ```DoubleWritable```, ```FloatWritable```
      * ```NullWritable```
      * ```BooleanWritable```
      * ```BytesWritable```
      * ```MapWritable```
      
      It also works for simple, "struct-like" classes. Due to the way Pyrolite works, this requires that the classes satisfy the JavaBeans convenstions (i.e. with fields and a no-arg constructor and getters/setters). (Perhaps in future some sugar for case classes and reflection could be added).
      
      I've tested it out with ```ESInputFormat```  as an example and it works very nicely:
      ```python
      conf = {"es.resource" : "index/type" }
      rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
      rdd.first()
      ```
      
      I suspect for things like HBase/Cassandra it will be a bit trickier to get it to work out the box.
      
      # Some things still outstanding:
      1. ~~Requires ```msgpack-python``` and will fail without it. As originally discussed with Josh, add a ```as_strings``` argument that defaults to ```False```, that can be used if ```msgpack-python``` is not available~~
      2. ~~I see from https://github.com/apache/spark/pull/363 that Pyrolite is being used there for SerDe between Scala and Python. @ahirreddy @mateiz what is the plan behind this - is Pyrolite preferred? It seems from a cursory glance that adapting the ```msgpack```-based SerDe here to use Pyrolite wouldn't be too hard~~
      3. ~~Support the key and value "wrapper" that would allow a Scala/Java function to be plugged in that would transform whatever the key/value Writable class is into something that can be serialized (e.g. convert some custom Writable to a JavaBean or ```java.util.Map``` that can be easily serialized)~~
      4. Support ```saveAsSequenceFile``` and ```saveAsHadoopFile``` etc. This would require SerDe in the reverse direction, that can be handled by Pyrolite. Will work on this as a separate PR
      
      Author: Nick Pentreath <nick.pentreath@gmail.com>
      
      Closes #455 from MLnick/pyspark-inputformats and squashes the following commits:
      
      268df7e [Nick Pentreath] Documentation changes mer @pwendell comments
      761269b [Nick Pentreath] Address @pwendell comments, simplify default writable conversions and remove registry.
      4c972d8 [Nick Pentreath] Add license headers
      d150431 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
      cde6af9 [Nick Pentreath] Parameterize converter trait
      5ebacfa [Nick Pentreath] Update docs for PySpark input formats
      a985492 [Nick Pentreath] Move Converter examples to own package
      365d0be [Nick Pentreath] Make classes private[python]. Add docs and @Experimental annotation to Converter interface.
      eeb8205 [Nick Pentreath] Fix path relative to SPARK_HOME in tests
      1eaa08b [Nick Pentreath] HBase -> Cassandra app name oversight
      3f90c3e [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
      2c18513 [Nick Pentreath] Add examples for reading HBase and Cassandra InputFormats from Python
      b65606f [Nick Pentreath] Add converter interface
      5757f6e [Nick Pentreath] Default key/value classes for sequenceFile asre None
      085b55f [Nick Pentreath] Move input format tests to tests.py and clean up docs
      43eb728 [Nick Pentreath] PySpark InputFormats docs into programming guide
      94beedc [Nick Pentreath] Clean up args in PythonRDD. Set key/value converter defaults to None for PySpark context.py methods
      1a4a1d6 [Nick Pentreath] Address @mateiz style comments
      01e0813 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
      15a7d07 [Nick Pentreath] Remove default args for key/value classes. Arg names to camelCase
      9fe6bd5 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
      84fe8e3 [Nick Pentreath] Python programming guide space formatting
      d0f52b6 [Nick Pentreath] Python programming guide
      7caa73a [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
      93ef995 [Nick Pentreath] Add back context.py changes
      9ef1896 [Nick Pentreath] Recover earlier changes lost in previous merge for serializers.py
      077ecb2 [Nick Pentreath] Recover earlier changes lost in previous merge for context.py
      5af4770 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
      35b8e3a [Nick Pentreath] Another fix for test ordering
      bef3afb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
      e001b94 [Nick Pentreath] Fix test failures due to ordering
      78978d9 [Nick Pentreath] Add doc for SequenceFile and InputFormat support to Python programming guide
      64eb051 [Nick Pentreath] Scalastyle fix
      e7552fa [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
      44f2857 [Nick Pentreath] Remove msgpack dependency and switch serialization to Pyrolite, plus some clean up and refactoring
      c0ebfb6 [Nick Pentreath] Change sequencefile test data generator to easily be called from PySpark tests
      1d7c17c [Nick Pentreath] Amend tests to auto-generate sequencefile data in temp dir
      17a656b [Nick Pentreath] remove binary sequencefile for tests
      f60959e [Nick Pentreath] Remove msgpack dependency and serializer from PySpark
      450e0a2 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
      31a2fff [Nick Pentreath] Scalastyle fixes
      fc5099e [Nick Pentreath] Add Apache license headers
      4e08983 [Nick Pentreath] Clean up docs for PySpark context methods
      b20ec7e [Nick Pentreath] Clean up merge duplicate dependencies
      951c117 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
      f6aac55 [Nick Pentreath] Bring back msgpack
      9d2256e [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
      1bbbfb0 [Nick Pentreath] Clean up SparkBuild from merge
      a67dfad [Nick Pentreath] Clean up Msgpack serialization and registering
      7237263 [Nick Pentreath] Add back msgpack serializer and hadoop file code lost during merging
      25da1ca [Nick Pentreath] Add generator for nulls, bools, bytes and maps
      65360d5 [Nick Pentreath] Adding test SequenceFiles
      0c612e5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
      d72bf18 [Nick Pentreath] msgpack
      dd57922 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
      e67212a [Nick Pentreath] Add back msgpack dependency
      f2d76a0 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
      41856a5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
      97ef708 [Nick Pentreath] Remove old writeToStream
      2beeedb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
      795a763 [Nick Pentreath] Change name to WriteInputFormatTestDataGenerator. Cleanup some var names. Use SPARK_HOME in path for writing test sequencefile data.
      174f520 [Nick Pentreath] Add back graphx settings
      703ee65 [Nick Pentreath] Add back msgpack
      619c0fa [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
      1c8efbc [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
      eb40036 [Nick Pentreath] Remove unused comment lines
      4d7ef2e [Nick Pentreath] Fix indentation
      f1d73e3 [Nick Pentreath] mergeConfs returns a copy rather than mutating one of the input arguments
      0f5cd84 [Nick Pentreath] Remove unused pair UTF8 class. Add comments to msgpack deserializer
      4294cbb [Nick Pentreath] Add old Hadoop api methods. Clean up and expand comments. Clean up argument names
      818a1e6 [Nick Pentreath] Add seqencefile and Hadoop InputFormat support to PythonRDD
      4e7c9e3 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
      c304cc8 [Nick Pentreath] Adding supporting sequncefiles for tests. Cleaning up
      4b0a43f [Nick Pentreath] Refactoring utils into own objects. Cleaning up old commented-out code
      d86325f [Nick Pentreath] Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop InputFormat
      f971d6cb
Loading