Skip to content
Snippets Groups Projects
  1. Nov 04, 2014
    • Davies Liu's avatar
      [SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default. · e4f42631
      Davies Liu authored
      This PR simplify serializer, always use batched serializer (AutoBatchedSerializer as default), even batch size is 1.
      
      Author: Davies Liu <davies@databricks.com>
      
      This patch had conflicts when merged, resolved by
      Committer: Josh Rosen <joshrosen@databricks.com>
      
      Closes #2920 from davies/fix_autobatch and squashes the following commits:
      
      e544ef9 [Davies Liu] revert unrelated change
      6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
      1d557fc [Davies Liu] fix tests
      8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
      76abdce [Davies Liu] clean up
      53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
      d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
      2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
      b4292ce [Davies Liu] fix bug in master
      d79744c [Davies Liu] recover hive tests
      be37ece [Davies Liu] refactor
      eb3938d [Davies Liu] refactor serializer in scala
      8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default.
      e4f42631
  2. Nov 03, 2014
    • Xiangrui Meng's avatar
      [SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample · 3cca1962
      Xiangrui Meng authored
      The current way of seed distribution makes the random sequences from partition i and i+1 offset by 1.
      
      ~~~
      In [14]: import random
      
      In [15]: r1 = random.Random(10)
      
      In [16]: r1.randint(0, 1)
      Out[16]: 1
      
      In [17]: r1.random()
      Out[17]: 0.4288890546751146
      
      In [18]: r1.random()
      Out[18]: 0.5780913011344704
      
      In [19]: r2 = random.Random(10)
      
      In [20]: r2.randint(0, 1)
      Out[20]: 1
      
      In [21]: r2.randint(0, 1)
      Out[21]: 0
      
      In [22]: r2.random()
      Out[22]: 0.5780913011344704
      ~~~
      
      Note: The new tests are not for this bug fix.
      
      Author: Xiangrui Meng <meng@databricks.com>
      
      Closes #3010 from mengxr/SPARK-4148 and squashes the following commits:
      
      869ae4b [Xiangrui Meng] move tests tests.py
      c1bacd9 [Xiangrui Meng] fix seed distribution and add some tests for rdd.sample
      3cca1962
  3. Oct 31, 2014
    • Xiangrui Meng's avatar
      [SPARK-4150][PySpark] return self in rdd.setName · f1e7361f
      Xiangrui Meng authored
      Then we can do `rdd.setName('abc').cache().count()`.
      
      Author: Xiangrui Meng <meng@databricks.com>
      
      Closes #3011 from mengxr/rdd-setname and squashes the following commits:
      
      10d0d60 [Xiangrui Meng] update test
      4ac3bbd [Xiangrui Meng] return self in rdd.setName
      f1e7361f
  4. Oct 13, 2014
    • yingjieMiao's avatar
      [Spark] RDD take() method: overestimate too much · 49bbdcb6
      yingjieMiao authored
      In the comment (Line 1083), it says: "Otherwise, interpolate the number of partitions we need to try, but overestimate it by 50%."
      
      `(1.5 * num * partsScanned / buf.size).toInt` is the guess of "num of total partitions needed". In every iteration, we should consider the increment `(1.5 * num * partsScanned / buf.size).toInt - partsScanned`
      Existing implementation 'exponentially' grows `partsScanned ` ( roughly: `x_{n+1} >= (1.5 + 1) x_n`)
      
      This could be a performance problem. (unless this is the intended behavior)
      
      Author: yingjieMiao <yingjie@42go.com>
      
      Closes #2648 from yingjieMiao/rdd_take and squashes the following commits:
      
      d758218 [yingjieMiao] scala style fix
      a8e74bb [yingjieMiao] python style fix
      4b6e777 [yingjieMiao] infix operator style fix
      4391d3b [yingjieMiao] typo fix.
      692f4e6 [yingjieMiao] cap numPartsToTry
      c4483dc [yingjieMiao] style fix
      1d2c410 [yingjieMiao] also change in rdd.py and AsyncRDD
      d31ff7e [yingjieMiao] handle the edge case after 1 iteration
      a2aa36b [yingjieMiao] RDD take method: overestimate too much
      49bbdcb6
  5. Oct 11, 2014
    • cocoatomo's avatar
      [SPARK-3909][PySpark][Doc] A corrupted format in Sphinx documents and building warnings · 7a3f589e
      cocoatomo authored
      Sphinx documents contains a corrupted ReST format and have some warnings.
      
      The purpose of this issue is same as https://issues.apache.org/jira/browse/SPARK-3773.
      
      commit: 0e8203f4
      
      output
      ```
      $ cd ./python/docs
      $ make clean html
      rm -rf _build/*
      sphinx-build -b html -d _build/doctrees   . _build/html
      Making output directory...
      Running Sphinx v1.2.3
      loading pickled environment... not yet created
      building [html]: targets for 4 source files that are out of date
      updating environment: 4 added, 0 changed, 0 removed
      reading sources... [100%] pyspark.sql
      /Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/feature.py:docstring of pyspark.mllib.feature.Word2VecModel.findSynonyms:4: WARNING: Field list ends without a blank line; unexpected unindent.
      /Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/feature.py:docstring of pyspark.mllib.feature.Word2VecModel.transform:3: WARNING: Field list ends without a blank line; unexpected unindent.
      /Users/<user>/MyRepos/Scala/spark/python/pyspark/sql.py:docstring of pyspark.sql:4: WARNING: Bullet list ends without a blank line; unexpected unindent.
      looking for now-outdated files... none found
      pickling environment... done
      checking consistency... done
      preparing documents... done
      writing output... [100%] pyspark.sql
      writing additional files... (12 module code pages) _modules/index search
      copying static files... WARNING: html_static_path entry u'/Users/<user>/MyRepos/Scala/spark/python/docs/_static' does not exist
      done
      copying extra files... done
      dumping search index... done
      dumping object inventory... done
      build succeeded, 4 warnings.
      
      Build finished. The HTML pages are in _build/html.
      ```
      
      Author: cocoatomo <cocoatomo77@gmail.com>
      
      Closes #2766 from cocoatomo/issues/3909-sphinx-build-warnings and squashes the following commits:
      
      2c7faa8 [cocoatomo] [SPARK-3909][PySpark][Doc] A corrupted format in Sphinx documents and building warnings
      7a3f589e
  6. Oct 07, 2014
  7. Oct 06, 2014
    • cocoatomo's avatar
      [SPARK-3773][PySpark][Doc] Sphinx build warning · 2300eb58
      cocoatomo authored
      When building Sphinx documents for PySpark, we have 12 warnings.
      Their causes are almost docstrings in broken ReST format.
      
      To reproduce this issue, we should run following commands on the commit: 6e27cb63.
      
      ```bash
      $ cd ./python/docs
      $ make clean html
      ...
      /Users/<user>/MyRepos/Scala/spark/python/pyspark/__init__.py:docstring of pyspark.SparkContext.sequenceFile:4: ERROR: Unexpected indentation.
      /Users/<user>/MyRepos/Scala/spark/python/pyspark/__init__.py:docstring of pyspark.RDD.saveAsSequenceFile:4: ERROR: Unexpected indentation.
      /Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.LogisticRegressionWithSGD.train:14: ERROR: Unexpected indentation.
      /Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.LogisticRegressionWithSGD.train:16: WARNING: Definition list ends without a blank line; unexpected unindent.
      /Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.LogisticRegressionWithSGD.train:17: WARNING: Block quote ends without a blank line; unexpected unindent.
      /Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.SVMWithSGD.train:14: ERROR: Unexpected indentation.
      /Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.SVMWithSGD.train:16: WARNING: Definition list ends without a blank line; unexpected unindent.
      /Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.SVMWithSGD.train:17: WARNING: Block quote ends without a blank line; unexpected unindent.
      /Users/<user>/MyRepos/Scala/spark/python/docs/pyspark.mllib.rst:50: WARNING: missing attribute mentioned in :members: or __all__: module pyspark.mllib.regression, attribute RidgeRegressionModelLinearRegressionWithSGD
      /Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/tree.py:docstring of pyspark.mllib.tree.DecisionTreeModel.predict:3: ERROR: Unexpected indentation.
      ...
      checking consistency... /Users/<user>/MyRepos/Scala/spark/python/docs/modules.rst:: WARNING: document isn't included in any toctree
      ...
      copying static files... WARNING: html_static_path entry u'/Users/<user>/MyRepos/Scala/spark/python/docs/_static' does not exist
      ...
      build succeeded, 12 warnings.
      ```
      
      Author: cocoatomo <cocoatomo77@gmail.com>
      
      Closes #2653 from cocoatomo/issues/3773-sphinx-build-warnings and squashes the following commits:
      
      6f65661 [cocoatomo] [SPARK-3773][PySpark][Doc] Sphinx build warning
      2300eb58
  8. Oct 01, 2014
    • Davies Liu's avatar
      [SPARK-3749] [PySpark] fix bugs in broadcast large closure of RDD · abf588f4
      Davies Liu authored
      1. broadcast is triggle unexpected
      2. fd is leaked in JVM (also leak in parallelize())
      3. broadcast is not unpersisted in JVM after RDD is not be used any more.
      
      cc JoshRosen , sorry for these stupid bugs.
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2603 from davies/fix_broadcast and squashes the following commits:
      
      080a743 [Davies Liu] fix bugs in broadcast large closure of RDD
      abf588f4
  9. Sep 30, 2014
    • Davies Liu's avatar
      [SPARK-3478] [PySpark] Profile the Python tasks · c5414b68
      Davies Liu authored
      This patch add profiling support for PySpark, it will show the profiling results
      before the driver exits, here is one example:
      
      ```
      ============================================================
      Profile of RDD<id=3>
      ============================================================
               5146507 function calls (5146487 primitive calls) in 71.094 seconds
      
         Ordered by: internal time, cumulative time
      
         ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        5144576   68.331    0.000   68.331    0.000 statcounter.py:44(merge)
             20    2.735    0.137   71.071    3.554 statcounter.py:33(__init__)
             20    0.017    0.001    0.017    0.001 {cPickle.dumps}
           1024    0.003    0.000    0.003    0.000 t.py:16(<lambda>)
             20    0.001    0.000    0.001    0.000 {reduce}
             21    0.001    0.000    0.001    0.000 {cPickle.loads}
             20    0.001    0.000    0.001    0.000 copy_reg.py:95(_slotnames)
             41    0.001    0.000    0.001    0.000 serializers.py:461(read_int)
             40    0.001    0.000    0.002    0.000 serializers.py:179(_batched)
             62    0.000    0.000    0.000    0.000 {method 'read' of 'file' objects}
             20    0.000    0.000   71.072    3.554 rdd.py:863(<lambda>)
             20    0.000    0.000    0.001    0.000 serializers.py:198(load_stream)
          40/20    0.000    0.000   71.072    3.554 rdd.py:2093(pipeline_func)
             41    0.000    0.000    0.002    0.000 serializers.py:130(load_stream)
             40    0.000    0.000   71.072    1.777 rdd.py:304(func)
             20    0.000    0.000   71.094    3.555 worker.py:82(process)
      ```
      
      Also, use can show profile result manually by `sc.show_profiles()` or dump it into disk
      by `sc.dump_profiles(path)`, such as
      
      ```python
      >>> sc._conf.set("spark.python.profile", "true")
      >>> rdd = sc.parallelize(range(100)).map(str)
      >>> rdd.count()
      100
      >>> sc.show_profiles()
      ============================================================
      Profile of RDD<id=1>
      ============================================================
               284 function calls (276 primitive calls) in 0.001 seconds
      
         Ordered by: internal time, cumulative time
      
         ncalls  tottime  percall  cumtime  percall filename:lineno(function)
              4    0.000    0.000    0.000    0.000 serializers.py:198(load_stream)
              4    0.000    0.000    0.000    0.000 {reduce}
           12/4    0.000    0.000    0.001    0.000 rdd.py:2092(pipeline_func)
              4    0.000    0.000    0.000    0.000 {cPickle.loads}
              4    0.000    0.000    0.000    0.000 {cPickle.dumps}
            104    0.000    0.000    0.000    0.000 rdd.py:852(<genexpr>)
              8    0.000    0.000    0.000    0.000 serializers.py:461(read_int)
             12    0.000    0.000    0.000    0.000 rdd.py:303(func)
      ```
      The profiling is disabled by default, can be enabled by "spark.python.profile=true".
      
      Also, users can dump the results into disks automatically for future analysis, by "spark.python.profile.dump=path_to_dump"
      
      This is bugfix of #2351 cc JoshRosen
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2556 from davies/profiler and squashes the following commits:
      
      e68df5a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
      858e74c [Davies Liu] compatitable with python 2.6
      7ef2aa0 [Davies Liu] bugfix, add tests for show_profiles and dump_profiles()
      2b0daf2 [Davies Liu] fix docs
      7a56c24 [Davies Liu] bugfix
      cba9463 [Davies Liu] move show_profiles and dump_profiles to SparkContext
      fb9565b [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
      116d52a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
      09d02c3 [Davies Liu] Merge branch 'master' into profiler
      c23865c [Davies Liu] Merge branch 'master' into profiler
      15d6f18 [Davies Liu] add docs for two configs
      dadee1a [Davies Liu] add docs string and clear profiles after show or dump
      4f8309d [Davies Liu] address comment, add tests
      0a5b6eb [Davies Liu] fix Python UDF
      4b20494 [Davies Liu] add profile for python
      c5414b68
  10. Sep 26, 2014
    • Josh Rosen's avatar
      Revert "[SPARK-3478] [PySpark] Profile the Python tasks" · f872e4fb
      Josh Rosen authored
      This reverts commit 1aa549ba.
      f872e4fb
    • Davies Liu's avatar
      [SPARK-3478] [PySpark] Profile the Python tasks · 1aa549ba
      Davies Liu authored
      This patch add profiling support for PySpark, it will show the profiling results
      before the driver exits, here is one example:
      
      ```
      ============================================================
      Profile of RDD<id=3>
      ============================================================
               5146507 function calls (5146487 primitive calls) in 71.094 seconds
      
         Ordered by: internal time, cumulative time
      
         ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        5144576   68.331    0.000   68.331    0.000 statcounter.py:44(merge)
             20    2.735    0.137   71.071    3.554 statcounter.py:33(__init__)
             20    0.017    0.001    0.017    0.001 {cPickle.dumps}
           1024    0.003    0.000    0.003    0.000 t.py:16(<lambda>)
             20    0.001    0.000    0.001    0.000 {reduce}
             21    0.001    0.000    0.001    0.000 {cPickle.loads}
             20    0.001    0.000    0.001    0.000 copy_reg.py:95(_slotnames)
             41    0.001    0.000    0.001    0.000 serializers.py:461(read_int)
             40    0.001    0.000    0.002    0.000 serializers.py:179(_batched)
             62    0.000    0.000    0.000    0.000 {method 'read' of 'file' objects}
             20    0.000    0.000   71.072    3.554 rdd.py:863(<lambda>)
             20    0.000    0.000    0.001    0.000 serializers.py:198(load_stream)
          40/20    0.000    0.000   71.072    3.554 rdd.py:2093(pipeline_func)
             41    0.000    0.000    0.002    0.000 serializers.py:130(load_stream)
             40    0.000    0.000   71.072    1.777 rdd.py:304(func)
             20    0.000    0.000   71.094    3.555 worker.py:82(process)
      ```
      
      Also, use can show profile result manually by `sc.show_profiles()` or dump it into disk
      by `sc.dump_profiles(path)`, such as
      
      ```python
      >>> sc._conf.set("spark.python.profile", "true")
      >>> rdd = sc.parallelize(range(100)).map(str)
      >>> rdd.count()
      100
      >>> sc.show_profiles()
      ============================================================
      Profile of RDD<id=1>
      ============================================================
               284 function calls (276 primitive calls) in 0.001 seconds
      
         Ordered by: internal time, cumulative time
      
         ncalls  tottime  percall  cumtime  percall filename:lineno(function)
              4    0.000    0.000    0.000    0.000 serializers.py:198(load_stream)
              4    0.000    0.000    0.000    0.000 {reduce}
           12/4    0.000    0.000    0.001    0.000 rdd.py:2092(pipeline_func)
              4    0.000    0.000    0.000    0.000 {cPickle.loads}
              4    0.000    0.000    0.000    0.000 {cPickle.dumps}
            104    0.000    0.000    0.000    0.000 rdd.py:852(<genexpr>)
              8    0.000    0.000    0.000    0.000 serializers.py:461(read_int)
             12    0.000    0.000    0.000    0.000 rdd.py:303(func)
      ```
      The profiling is disabled by default, can be enabled by "spark.python.profile=true".
      
      Also, users can dump the results into disks automatically for future analysis, by "spark.python.profile.dump=path_to_dump"
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2351 from davies/profiler and squashes the following commits:
      
      7ef2aa0 [Davies Liu] bugfix, add tests for show_profiles and dump_profiles()
      2b0daf2 [Davies Liu] fix docs
      7a56c24 [Davies Liu] bugfix
      cba9463 [Davies Liu] move show_profiles and dump_profiles to SparkContext
      fb9565b [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
      116d52a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
      09d02c3 [Davies Liu] Merge branch 'master' into profiler
      c23865c [Davies Liu] Merge branch 'master' into profiler
      15d6f18 [Davies Liu] add docs for two configs
      dadee1a [Davies Liu] add docs string and clear profiles after show or dump
      4f8309d [Davies Liu] address comment, add tests
      0a5b6eb [Davies Liu] fix Python UDF
      4b20494 [Davies Liu] add profile for python
      1aa549ba
  11. Sep 24, 2014
    • Aaron Staple's avatar
      [SPARK-546] Add full outer join to RDD and DStream. · 8ca4ecb6
      Aaron Staple authored
      leftOuterJoin and rightOuterJoin are already implemented.  This patch adds fullOuterJoin.
      
      Author: Aaron Staple <aaron.staple@gmail.com>
      
      Closes #1395 from staple/SPARK-546 and squashes the following commits:
      
      1f5595c [Aaron Staple] Fix python style
      7ac0aa9 [Aaron Staple] [SPARK-546] Add full outer join to RDD and DStream.
      3b5d137 [Aaron Staple] In JavaPairDStream, make class tag specification in rightOuterJoin consistent with other functions.
      31f2956 [Aaron Staple] Fix left outer join documentation comments.
      8ca4ecb6
  12. Sep 19, 2014
    • Davies Liu's avatar
      [SPARK-3491] [MLlib] [PySpark] use pickle to serialize data in MLlib · fce5e251
      Davies Liu authored
      Currently, we serialize the data between JVM and Python case by case manually, this cannot scale to support so many APIs in MLlib.
      
      This patch will try to address this problem by serialize the data using pickle protocol, using Pyrolite library to serialize/deserialize in JVM. Pickle protocol can be easily extended to support customized class.
      
      All the modules are refactored to use this protocol.
      
      Known issues: There will be some performance regression (both CPU and memory, the serialized data increased)
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2378 from davies/pickle_mllib and squashes the following commits:
      
      dffbba2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into pickle_mllib
      810f97f [Davies Liu] fix equal of matrix
      032cd62 [Davies Liu] add more type check and conversion for user_product
      bd738ab [Davies Liu] address comments
      e431377 [Davies Liu] fix cache of rdd, refactor
      19d0967 [Davies Liu] refactor Picklers
      2511e76 [Davies Liu] cleanup
      1fccf1a [Davies Liu] address comments
      a2cc855 [Davies Liu] fix tests
      9ceff73 [Davies Liu] test size of serialized Rating
      44e0551 [Davies Liu] fix cache
      a379a81 [Davies Liu] fix pickle array in python2.7
      df625c7 [Davies Liu] Merge commit '154d141' into pickle_mllib
      154d141 [Davies Liu] fix autobatchedpickler
      44736d7 [Davies Liu] speed up pickling array in Python 2.7
      e1d1bfc [Davies Liu] refactor
      708dc02 [Davies Liu] fix tests
      9dcfb63 [Davies Liu] fix style
      88034f0 [Davies Liu] rafactor, address comments
      46a501e [Davies Liu] choose batch size automatically
      df19464 [Davies Liu] memorize the module and class name during pickleing
      f3506c5 [Davies Liu] Merge branch 'master' into pickle_mllib
      722dd96 [Davies Liu] cleanup _common.py
      0ee1525 [Davies Liu] remove outdated tests
      b02e34f [Davies Liu] remove _common.py
      84c721d [Davies Liu] Merge branch 'master' into pickle_mllib
      4d7963e [Davies Liu] remove muanlly serialization
      6d26b03 [Davies Liu] fix tests
      c383544 [Davies Liu] classification
      f2a0856 [Davies Liu] mllib/regression
      d9f691f [Davies Liu] mllib/util
      cccb8b1 [Davies Liu] mllib/tree
      8fe166a [Davies Liu] Merge branch 'pickle' into pickle_mllib
      aa2287e [Davies Liu] random
      f1544c4 [Davies Liu] refactor clustering
      52d1350 [Davies Liu] use new protocol in mllib/stat
      b30ef35 [Davies Liu] use pickle to serialize data for mllib/recommendation
      f44f771 [Davies Liu] enable tests about array
      3908f5c [Davies Liu] Merge branch 'master' into pickle
      c77c87b [Davies Liu] cleanup debugging code
      60e4e2f [Davies Liu] support unpickle array.array for Python 2.6
      fce5e251
  13. Sep 18, 2014
    • Davies Liu's avatar
      [SPARK-3554] [PySpark] use broadcast automatically for large closure · e77fa81a
      Davies Liu authored
      Py4j can not handle large string efficiently, so we should use broadcast for large closure automatically. (Broadcast use local filesystem to pass through data).
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2417 from davies/command and squashes the following commits:
      
      fbf4e97 [Davies Liu] bugfix
      aefd508 [Davies Liu] use broadcast automatically for large closure
      e77fa81a
  14. Sep 16, 2014
    • Matthew Farrellee's avatar
      [SPARK-3519] add distinct(n) to PySpark · 9d5fa763
      Matthew Farrellee authored
      Added missing rdd.distinct(numPartitions) and associated tests
      
      Author: Matthew Farrellee <matt@redhat.com>
      
      Closes #2383 from mattf/SPARK-3519 and squashes the following commits:
      
      30b837a [Matthew Farrellee] Combine test cases to save on JVM startups
      6bc4a2c [Matthew Farrellee] [SPARK-3519] add distinct(n) to SchemaRDD in PySpark
      7a17f2b [Matthew Farrellee] [SPARK-3519] add distinct(n) to PySpark
      9d5fa763
  15. Sep 15, 2014
    • Aaron Staple's avatar
      [SPARK-1087] Move python traceback utilities into new traceback_utils.py file. · 60050f42
      Aaron Staple authored
      Also made some cosmetic cleanups.
      
      Author: Aaron Staple <aaron.staple@gmail.com>
      
      Closes #2385 from staple/SPARK-1087 and squashes the following commits:
      
      7b3bb13 [Aaron Staple] Address review comments, cosmetic cleanups.
      10ba6e1 [Aaron Staple] [SPARK-1087] Move python traceback utilities into new traceback_utils.py file.
      60050f42
  16. Sep 12, 2014
  17. Sep 08, 2014
    • Sandy Ryza's avatar
      SPARK-2978. Transformation with MR shuffle semantics · 16a73c24
      Sandy Ryza authored
      I didn't add this to the transformations list in the docs because it's kind of obscure, but would be happy to do so if others think it would be helpful.
      
      Author: Sandy Ryza <sandy@cloudera.com>
      
      Closes #2274 from sryza/sandy-spark-2978 and squashes the following commits:
      
      4a5332a [Sandy Ryza] Fix Java test
      c04b447 [Sandy Ryza] Fix Python doc and add back deleted code
      433ad5b [Sandy Ryza] Add Java test
      4c25a54 [Sandy Ryza] Add s at the end and a couple other fixes
      9b0ba99 [Sandy Ryza] Fix compilation
      36e0571 [Sandy Ryza] Fix import ordering
      48c12c2 [Sandy Ryza] Add Java version and additional doc
      e5381cd [Sandy Ryza] Fix python style warnings
      f147634 [Sandy Ryza] SPARK-2978. Transformation with MR shuffle semantics
      16a73c24
  18. Sep 06, 2014
    • Davies Liu's avatar
      [SPARK-2334] fix AttributeError when call PipelineRDD.id() · 110fb8b2
      Davies Liu authored
      The underline JavaRDD for PipelineRDD is created lazily, it's delayed until call _jrdd.
      
      The id of JavaRDD is cached as `_id`, it saves a RPC call in py4j for later calls.
      
      closes #1276
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2296 from davies/id and squashes the following commits:
      
      e197958 [Davies Liu] fix style
      9721716 [Davies Liu] fix id of PipelineRDD
      110fb8b2
    • Holden Karau's avatar
      Spark-3406 add a default storage level to python RDD persist API · da35330e
      Holden Karau authored
      Author: Holden Karau <holden@pigscanfly.ca>
      
      Closes #2280 from holdenk/SPARK-3406-Python-RDD-persist-api-does-not-have-default-storage-level and squashes the following commits:
      
      33eaade [Holden Karau] As Josh pointed out, sql also override persist. Make persist behave the same as in the underlying RDD as well
      e658227 [Holden Karau] Fix the test I added
      e95a6c5 [Holden Karau] The Python persist function did not have a default storageLevel unlike the Scala API. Noticed this issue because we got a bug report back from the book where we had documented it as if it was the same as the Scala API
      da35330e
  19. Sep 05, 2014
    • Andrew Ash's avatar
      SPARK-3211 .take() is OOM-prone with empty partitions · ba5bcadd
      Andrew Ash authored
      Instead of jumping straight from 1 partition to all partitions, do exponential
      growth and double the number of partitions to attempt each time instead.
      
      Fix proposed by Paul Nepywoda
      
      Author: Andrew Ash <andrew@andrewash.com>
      
      Closes #2117 from ash211/SPARK-3211 and squashes the following commits:
      
      8b2299a [Andrew Ash] Quadruple instead of double for a minor speedup
      e5f7e4d [Andrew Ash] Update comment to better reflect what we're doing
      09a27f7 [Andrew Ash] Update PySpark to be less OOM-prone as well
      3a156b8 [Andrew Ash] SPARK-3211 .take() is OOM-prone with empty partitions
      ba5bcadd
  20. Sep 03, 2014
    • Davies Liu's avatar
      [SPARK-3309] [PySpark] Put all public API in __all__ · 6481d274
      Davies Liu authored
      Put all public API in __all__, also put them all in pyspark.__init__.py, then we can got all the documents for public API by `pydoc pyspark`. It also can be used by other programs (such as Sphinx or Epydoc) to generate only documents for public APIs.
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2205 from davies/public and squashes the following commits:
      
      c6c5567 [Davies Liu] fix message
      f7b35be [Davies Liu] put SchemeRDD, Row in pyspark.sql module
      7e3016a [Davies Liu] add __all__ in mllib
      6281b48 [Davies Liu] fix doc for SchemaRDD
      6caab21 [Davies Liu] add public interfaces into pyspark.__init__.py
      6481d274
  21. Sep 02, 2014
    • Davies Liu's avatar
      [SPARK-2871] [PySpark] add countApproxDistinct() API · e2c901b4
      Davies Liu authored
      RDD.countApproxDistinct(relativeSD=0.05):
      
              :: Experimental ::
              Return approximate number of distinct elements in the RDD.
      
              The algorithm used is based on streamlib's implementation of
              "HyperLogLog in Practice: Algorithmic Engineering of a State
              of The Art Cardinality Estimation Algorithm", available
              <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
      
              This support all the types of objects, which is supported by
              Pyrolite, nearly all builtin types.
      
              param relativeSD Relative accuracy. Smaller values create
                                 counters that require more space.
                                 It must be greater than 0.000017.
      
              >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
              >>> 950 < n < 1050
              True
              >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()
              >>> 18 < n < 22
              True
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2142 from davies/countApproxDistinct and squashes the following commits:
      
      e20da47 [Davies Liu] remove the correction in Python
      c38c4e4 [Davies Liu] fix doc tests
      2ab157c [Davies Liu] fix doc tests
      9d2565f [Davies Liu] add commments and link for hash collision correction
      d306492 [Davies Liu] change range of hash of tuple to [0, maxint]
      ded624f [Davies Liu] calculate hash in Python
      4cba98f [Davies Liu] add more tests
      a85a8c6 [Davies Liu] Merge branch 'master' into countApproxDistinct
      e97e342 [Davies Liu] add countApproxDistinct()
      e2c901b4
  22. Aug 27, 2014
    • Davies Liu's avatar
      [SPARK-2871] [PySpark] add RDD.lookup(key) · 4fa2fda8
      Davies Liu authored
      RDD.lookup(key)
      
              Return the list of values in the RDD for key `key`. This operation
              is done efficiently if the RDD has a known partitioner by only
              searching the partition that the key maps to.
      
              >>> l = range(1000)
              >>> rdd = sc.parallelize(zip(l, l), 10)
              >>> rdd.lookup(42)  # slow
              [42]
              >>> sorted = rdd.sortByKey()
              >>> sorted.lookup(42)  # fast
              [42]
      
      It also clean up the code in RDD.py, and fix several bugs (related to preservesPartitioning).
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2093 from davies/lookup and squashes the following commits:
      
      1789cd4 [Davies Liu] `f` in foreach could be generator or not.
      2871b80 [Davies Liu] Merge branch 'master' into lookup
      c6390ea [Davies Liu] address all comments
      0f1bce8 [Davies Liu] add test case for lookup()
      be0e8ba [Davies Liu] fix preservesPartitioning
      eb1305d [Davies Liu] add RDD.lookup(key)
      4fa2fda8
  23. Aug 26, 2014
    • Davies Liu's avatar
      [SPARK-3073] [PySpark] use external sort in sortBy() and sortByKey() · f1e71d4c
      Davies Liu authored
      Using external sort to support sort large datasets in reduce stage.
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #1978 from davies/sort and squashes the following commits:
      
      bbcd9ba [Davies Liu] check spilled bytes in tests
      b125d2f [Davies Liu] add test for external sort in rdd
      eae0176 [Davies Liu] choose different disks from different processes and instances
      1f075ed [Davies Liu] Merge branch 'master' into sort
      eb53ca6 [Davies Liu] Merge branch 'master' into sort
      644abaf [Davies Liu] add license in LICENSE
      19f7873 [Davies Liu] improve tests
      55602ee [Davies Liu] use external sort in sortBy() and sortByKey()
      f1e71d4c
    • Davies Liu's avatar
      [SPARK-2871] [PySpark] add histgram() API · 3cedc4f4
      Davies Liu authored
      RDD.histogram(buckets)
      
              Compute a histogram using the provided buckets. The buckets
              are all open to the right except for the last which is closed.
              e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
              which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
              and 50 we would have a histogram of 1,0,1.
      
              If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
              this can be switched from an O(log n) inseration to O(1) per
              element(where n = # buckets).
      
              Buckets must be sorted and not contain any duplicates, must be
              at least two elements.
      
              If `buckets` is a number, it will generates buckets which is
              evenly spaced between the minimum and maximum of the RDD. For
              example, if the min value is 0 and the max is 100, given buckets
              as 2, the resulting buckets will be [0,50) [50,100]. buckets must
              be at least 1 If the RDD contains infinity, NaN throws an exception
              If the elements in RDD do not vary (max == min) always returns
              a single bucket.
      
              It will return an tuple of buckets and histogram.
      
              >>> rdd = sc.parallelize(range(51))
              >>> rdd.histogram(2)
              ([0, 25, 50], [25, 26])
              >>> rdd.histogram([0, 5, 25, 50])
              ([0, 5, 25, 50], [5, 20, 26])
              >>> rdd.histogram([0, 15, 30, 45, 60], True)
              ([0, 15, 30, 45, 60], [15, 15, 15, 6])
              >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])
              >>> rdd.histogram(("a", "b", "c"))
              (('a', 'b', 'c'), [2, 2])
      
      closes #122, it's duplicated.
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2091 from davies/histgram and squashes the following commits:
      
      a322f8a [Davies Liu] fix deprecation of e.message
      84e85fa [Davies Liu] remove evenBuckets, add more tests (including str)
      d9a0722 [Davies Liu] address comments
      0e18a2d [Davies Liu] add histgram() API
      3cedc4f4
  24. Aug 24, 2014
    • Davies Liu's avatar
      [SPARK-2871] [PySpark] add zipWithIndex() and zipWithUniqueId() · fb0db772
      Davies Liu authored
      RDD.zipWithIndex()
      
              Zips this RDD with its element indices.
      
              The ordering is first based on the partition index and then the
              ordering of items within each partition. So the first item in
              the first partition gets index 0, and the last item in the last
              partition receives the largest index.
      
              This method needs to trigger a spark job when this RDD contains
              more than one partitions.
      
              >>> sc.parallelize(range(4), 2).zipWithIndex().collect()
              [(0, 0), (1, 1), (2, 2), (3, 3)]
      
      RDD.zipWithUniqueId()
      
              Zips this RDD with generated unique Long ids.
      
              Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
              n is the number of partitions. So there may exist gaps, but this
              method won't trigger a spark job, which is different from
              L{zipWithIndex}
      
              >>> sc.parallelize(range(4), 2).zipWithUniqueId().collect()
              [(0, 0), (2, 1), (1, 2), (3, 3)]
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2092 from davies/zipWith and squashes the following commits:
      
      cebe5bf [Davies Liu] improve test cases, reverse the order of index
      0d2a128 [Davies Liu] add zipWithIndex() and zipWithUniqueId()
      fb0db772
  25. Aug 23, 2014
    • Davies Liu's avatar
      [SPARK-2871] [PySpark] add approx API for RDD · 8df4dad4
      Davies Liu authored
      RDD.countApprox(self, timeout, confidence=0.95)
      
              :: Experimental ::
              Approximate version of count() that returns a potentially incomplete
              result within a timeout, even if not all tasks have finished.
      
              >>> rdd = sc.parallelize(range(1000), 10)
              >>> rdd.countApprox(1000, 1.0)
              1000
      
      RDD.sumApprox(self, timeout, confidence=0.95)
      
              Approximate operation to return the sum within a timeout
              or meet the confidence.
      
              >>> rdd = sc.parallelize(range(1000), 10)
              >>> r = sum(xrange(1000))
              >>> (rdd.sumApprox(1000) - r) / r < 0.05
      
      RDD.meanApprox(self, timeout, confidence=0.95)
      
              :: Experimental ::
              Approximate operation to return the mean within a timeout
              or meet the confidence.
      
              >>> rdd = sc.parallelize(range(1000), 10)
              >>> r = sum(xrange(1000)) / 1000.0
              >>> (rdd.meanApprox(1000) - r) / r < 0.05
              True
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2095 from davies/approx and squashes the following commits:
      
      e8c252b [Davies Liu] add approx API for RDD
      8df4dad4
    • Davies Liu's avatar
      [SPARK-2871] [PySpark] add `key` argument for max(), min() and top(n) · db436e36
      Davies Liu authored
      RDD.max(key=None)
      
              param key: A function used to generate key for comparing
      
              >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0])
              >>> rdd.max()
              43.0
              >>> rdd.max(key=str)
              5.0
      
      RDD.min(key=None)
      
              Find the minimum item in this RDD.
      
              param key: A function used to generate key for comparing
      
              >>> rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0])
              >>> rdd.min()
              2.0
              >>> rdd.min(key=str)
              10.0
      
      RDD.top(num, key=None)
      
              Get the top N elements from a RDD.
      
              Note: It returns the list sorted in descending order.
              >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
              [12]
              >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
              [6, 5]
              >>> sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str)
              [4, 3, 2]
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2094 from davies/cmp and squashes the following commits:
      
      ccbaf25 [Davies Liu] add `key` to top()
      ad7e374 [Davies Liu] fix tests
      2f63512 [Davies Liu] change `comp` to `key` in min/max
      dd91e08 [Davies Liu] add `comp` argument for RDD.max() and RDD.min()
      db436e36
  26. Aug 20, 2014
    • Davies Liu's avatar
      [SPARK-3141] [PySpark] fix sortByKey() with take() · 0a7ef633
      Davies Liu authored
      Fix sortByKey() with take()
      
      The function `f` used in mapPartitions should always return an iterator.
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2045 from davies/fix_sortbykey and squashes the following commits:
      
      1160f59 [Davies Liu] fix sortByKey() with take()
      0a7ef633
  27. Aug 19, 2014
    • Davies Liu's avatar
      [SPARK-2790] [PySpark] fix zip with serializers which have different batch sizes. · d7e80c25
      Davies Liu authored
      If two RDDs have different batch size in serializers, then it will try to re-serialize the one with smaller batch size, then call RDD.zip() in Spark.
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #1894 from davies/zip and squashes the following commits:
      
      c4652ea [Davies Liu] add more test cases
      6d05fc8 [Davies Liu] Merge branch 'master' into zip
      813b1e4 [Davies Liu] add more tests for failed cases
      a4aafda [Davies Liu] fix zip with serializers which have different batch sizes.
      d7e80c25
  28. Aug 18, 2014
    • Josh Rosen's avatar
      [SPARK-3114] [PySpark] Fix Python UDFs in Spark SQL. · 1f1819b2
      Josh Rosen authored
      This fixes SPARK-3114, an issue where we inadvertently broke Python UDFs in Spark SQL.
      
      This PR modifiers the test runner script to always run the PySpark SQL tests, irrespective of whether SparkSQL itself has been modified.  It also includes Davies' fix for the bug.
      
      Closes #2026.
      
      Author: Josh Rosen <joshrosen@apache.org>
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2027 from JoshRosen/pyspark-sql-fix and squashes the following commits:
      
      9af2708 [Davies Liu] bugfix: disable compression of command
      0d8d3a4 [Josh Rosen] Always run Python Spark SQL tests.
      1f1819b2
    • Davies Liu's avatar
      [SPARK-3103] [PySpark] fix saveAsTextFile() with utf-8 · d1d0ee41
      Davies Liu authored
      bugfix: It will raise an exception when it try to encode non-ASCII strings into unicode. It should only encode unicode as "utf-8".
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #2018 from davies/fix_utf8 and squashes the following commits:
      
      4db7967 [Davies Liu] fix saveAsTextFile() with utf-8
      d1d0ee41
  29. Aug 16, 2014
    • Davies Liu's avatar
      [SPARK-1065] [PySpark] improve supporting for large broadcast · 2fc8aca0
      Davies Liu authored
      Passing large object by py4j is very slow (cost much memory), so pass broadcast objects via files (similar to parallelize()).
      
      Add an option to keep object in driver (it's False by default) to save memory in driver.
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #1912 from davies/broadcast and squashes the following commits:
      
      e06df4a [Davies Liu] load broadcast from disk in driver automatically
      db3f232 [Davies Liu] fix serialization of accumulator
      631a827 [Davies Liu] Merge branch 'master' into broadcast
      c7baa8c [Davies Liu] compress serrialized broadcast and command
      9a7161f [Davies Liu] fix doc tests
      e93cf4b [Davies Liu] address comments: add test
      6226189 [Davies Liu] improve large broadcast
      2fc8aca0
  30. Aug 13, 2014
    • Davies Liu's avatar
      [SPARK-2983] [PySpark] improve performance of sortByKey() · 434bea1c
      Davies Liu authored
      1. skip partitionBy() when numOfPartition is 1
      2. use bisect_left (O(lg(N))) instread of loop (O(N)) in
      rangePartitioner
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #1898 from davies/sort and squashes the following commits:
      
      0a9608b [Davies Liu] Merge branch 'master' into sort
      1cf9565 [Davies Liu] improve performance of sortByKey()
      434bea1c
  31. Aug 06, 2014
    • RJ Nowling's avatar
      [PySpark] Add blanklines to Python docstrings so example code renders correctly · e537b33c
      RJ Nowling authored
      Author: RJ Nowling <rnowling@gmail.com>
      
      Closes #1808 from rnowling/pyspark_docs and squashes the following commits:
      
      c06d774 [RJ Nowling] Add blanklines to Python docstrings so example code renders correctly
      e537b33c
    • Nicholas Chammas's avatar
      [SPARK-2627] [PySpark] have the build enforce PEP 8 automatically · d614967b
      Nicholas Chammas authored
      As described in [SPARK-2627](https://issues.apache.org/jira/browse/SPARK-2627), we'd like Python code to automatically be checked for PEP 8 compliance by Jenkins. This pull request aims to do that.
      
      Notes:
      * We may need to install [`pep8`](https://pypi.python.org/pypi/pep8) on the build server.
      * I'm expecting tests to fail now that PEP 8 compliance is being checked as part of the build. I'm fine with cleaning up any remaining PEP 8 violations as part of this pull request.
      * I did not understand why the RAT and scalastyle reports are saved to text files. I did the same for the PEP 8 check, but only so that the console output style can match those for the RAT and scalastyle checks. The PEP 8 report is removed right after the check is complete.
      * Updates to the ["Contributing to Spark"](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark) guide will be submitted elsewhere, as I don't believe that text is part of the Spark repo.
      
      Author: Nicholas Chammas <nicholas.chammas@gmail.com>
      Author: nchammas <nicholas.chammas@gmail.com>
      
      Closes #1744 from nchammas/master and squashes the following commits:
      
      274b238 [Nicholas Chammas] [SPARK-2627] [PySpark] minor indentation changes
      983d963 [nchammas] Merge pull request #5 from apache/master
      1db5314 [nchammas] Merge pull request #4 from apache/master
      0e0245f [Nicholas Chammas] [SPARK-2627] undo erroneous whitespace fixes
      bf30942 [Nicholas Chammas] [SPARK-2627] PEP8: comment spacing
      6db9a44 [nchammas] Merge pull request #3 from apache/master
      7b4750e [Nicholas Chammas] merge upstream changes
      91b7584 [Nicholas Chammas] [SPARK-2627] undo unnecessary line breaks
      44e3e56 [Nicholas Chammas] [SPARK-2627] use tox.ini to exclude files
      b09fae2 [Nicholas Chammas] don't wrap comments unnecessarily
      bfb9f9f [Nicholas Chammas] [SPARK-2627] keep up with the PEP 8 fixes
      9da347f [nchammas] Merge pull request #2 from apache/master
      aa5b4b5 [Nicholas Chammas] [SPARK-2627] follow Spark bash style for if blocks
      d0a83b9 [Nicholas Chammas] [SPARK-2627] check that pep8 downloaded fine
      dffb5dd [Nicholas Chammas] [SPARK-2627] download pep8 at runtime
      a1ce7ae [Nicholas Chammas] [SPARK-2627] space out test report sections
      21da538 [Nicholas Chammas] [SPARK-2627] it's PEP 8, not PEP8
      6f4900b [Nicholas Chammas] [SPARK-2627] more misc PEP 8 fixes
      fe57ed0 [Nicholas Chammas] removing merge conflict backups
      9c01d4c [nchammas] Merge pull request #1 from apache/master
      9a66cb0 [Nicholas Chammas] resolving merge conflicts
      a31ccc4 [Nicholas Chammas] [SPARK-2627] miscellaneous PEP 8 fixes
      beaa9ac [Nicholas Chammas] [SPARK-2627] fail check on non-zero status
      723ed39 [Nicholas Chammas] always delete the report file
      0541ebb [Nicholas Chammas] [SPARK-2627] call Python linter from run-tests
      12440fa [Nicholas Chammas] [SPARK-2627] add Scala linter
      61c07b9 [Nicholas Chammas] [SPARK-2627] add Python linter
      75ad552 [Nicholas Chammas] make check output style consistent
      d614967b
  32. Aug 01, 2014
    • Davies Liu's avatar
      [SPARK-2010] [PySpark] [SQL] support nested structure in SchemaRDD · 880eabec
      Davies Liu authored
      Convert Row in JavaSchemaRDD into Array[Any] and unpickle them as tuple in Python, then convert them into namedtuple, so use can access fields just like attributes.
      
      This will let nested structure can be accessed as object, also it will reduce the size of serialized data and better performance.
      
      root
       |-- field1: integer (nullable = true)
       |-- field2: string (nullable = true)
       |-- field3: struct (nullable = true)
       |    |-- field4: integer (nullable = true)
       |    |-- field5: array (nullable = true)
       |    |    |-- element: integer (containsNull = false)
       |-- field6: array (nullable = true)
       |    |-- element: struct (containsNull = false)
       |    |    |-- field7: string (nullable = true)
      
      Then we can access them by row.field3.field5[0]  or row.field6[5].field7
      
      It also will infer the schema in Python, convert Row/dict/namedtuple/objects into tuple before serialization, then call applySchema in JVM. During inferSchema(), the top level of dict in row will be StructType, but any nested dictionary will be MapType.
      
      You can use pyspark.sql.Row to convert unnamed structure into Row object, make the RDD can be inferable. Such as:
      
      ctx.inferSchema(rdd.map(lambda x: Row(a=x[0], b=x[1]))
      
      Or you could use Row to create a class just like namedtuple, for example:
      
      Person = Row("name", "age")
      ctx.inferSchema(rdd.map(lambda x: Person(*x)))
      
      Also, you can call applySchema to apply an schema to a RDD of tuple/list and turn it into a SchemaRDD. The `schema` should be StructType, see the API docs for details.
      
      schema = StructType([StructField("name, StringType, True),
                                          StructType("age", IntegerType, True)])
      ctx.applySchema(rdd, schema)
      
      PS: In order to use namedtuple to inferSchema, you should make namedtuple picklable.
      
      Author: Davies Liu <davies.liu@gmail.com>
      
      Closes #1598 from davies/nested and squashes the following commits:
      
      f1d15b6 [Davies Liu] verify schema with the first few rows
      8852aaf [Davies Liu] check type of schema
      abe9e6e [Davies Liu] address comments
      61b2292 [Davies Liu] add @deprecated to pythonToJavaMap
      1e5b801 [Davies Liu] improve cache of classes
      51aa135 [Davies Liu] use Row to infer schema
      e9c0d5c [Davies Liu] remove string typed schema
      353a3f2 [Davies Liu] fix code style
      63de8f8 [Davies Liu] fix typo
      c79ca67 [Davies Liu] fix serialization of nested data
      6b258b5 [Davies Liu] fix pep8
      9d8447c [Davies Liu] apply schema provided by string of names
      f5df97f [Davies Liu] refactor, address comments
      9d9af55 [Davies Liu] use arrry to applySchema and infer schema in Python
      84679b3 [Davies Liu] Merge branch 'master' of github.com:apache/spark into nested
      0eaaf56 [Davies Liu] fix doc tests
      b3559b4 [Davies Liu] use generated Row instead of namedtuple
      c4ddc30 [Davies Liu] fix conflict between name of fields and variables
      7f6f251 [Davies Liu] address all comments
      d69d397 [Davies Liu] refactor
      2cc2d45 [Davies Liu] refactor
      182fb46 [Davies Liu] refactor
      bc6e9e1 [Davies Liu] switch to new Schema API
      547bf3e [Davies Liu] Merge branch 'master' into nested
      a435b5a [Davies Liu] add docs and code refactor
      2c8debc [Davies Liu] Merge branch 'master' into nested
      644665a [Davies Liu] use tuple and namedtuple for schemardd
      880eabec
  33. Jul 30, 2014
    • Kan Zhang's avatar
      [SPARK-2024] Add saveAsSequenceFile to PySpark · 94d1f46f
      Kan Zhang authored
      JIRA issue: https://issues.apache.org/jira/browse/SPARK-2024
      
      This PR is a followup to #455 and adds capabilities for saving PySpark RDDs using SequenceFile or any Hadoop OutputFormats.
      
      * Added RDD methods ```saveAsSequenceFile```, ```saveAsHadoopFile``` and ```saveAsHadoopDataset```, for both old and new MapReduce APIs.
      
      * Default converter for converting common data types to Writables. Users may specify custom converters to convert to desired data types.
      
      * No out-of-box support for reading/writing arrays, since ArrayWritable itself doesn't have a no-arg constructor for creating an empty instance upon reading. Users need to provide ArrayWritable subtypes. Custom converters for converting arrays to suitable ArrayWritable subtypes are also needed when writing. When reading, the default converter will convert any custom ArrayWritable subtypes to ```Object[]``` and they get pickled to Python tuples.
      
      * Added HBase and Cassandra output examples to show how custom output formats and converters can be used.
      
      cc MLnick mateiz ahirreddy pwendell
      
      Author: Kan Zhang <kzhang@apache.org>
      
      Closes #1338 from kanzhang/SPARK-2024 and squashes the following commits:
      
      c01e3ef [Kan Zhang] [SPARK-2024] code formatting
      6591e37 [Kan Zhang] [SPARK-2024] renaming pickled -> pickledRDD
      d998ad6 [Kan Zhang] [SPARK-2024] refectoring to get method params below 10
      57a7a5e [Kan Zhang] [SPARK-2024] correcting typo
      75ca5bd [Kan Zhang] [SPARK-2024] Better type checking for batch serialized RDD
      0bdec55 [Kan Zhang] [SPARK-2024] Refactoring newly added tests
      9f39ff4 [Kan Zhang] [SPARK-2024] Adding 2 saveAsHadoopDataset tests
      0c134f3 [Kan Zhang] [SPARK-2024] Test refactoring and adding couple unbatched cases
      7a176df [Kan Zhang] [SPARK-2024] Add saveAsSequenceFile to PySpark
      94d1f46f
  34. Jul 26, 2014
    • Josh Rosen's avatar
      [SPARK-2601] [PySpark] Fix Py4J error when transforming pickleFiles · ba46bbed
      Josh Rosen authored
      Similar to SPARK-1034, the problem was that Py4J didn’t cope well with the fake ClassTags used in the Java API.  It doesn’t look like there’s any reason why PythonRDD needs to take a ClassTag, since it just ignores the type of the previous RDD, so I removed the type parameter and we no longer pass ClassTags from Python.
      
      Author: Josh Rosen <joshrosen@apache.org>
      
      Closes #1605 from JoshRosen/spark-2601 and squashes the following commits:
      
      b68e118 [Josh Rosen] Fix Py4J error when transforming pickleFiles [SPARK-2601]
      ba46bbed
Loading