From 65e8406029a0fe1e1c5c5d033d335b43f6743a04 Mon Sep 17 00:00:00 2001
From: Josh Rosen <joshrosen@eecs.berkeley.edu>
Date: Fri, 24 Aug 2012 21:07:26 -0700
Subject: [PATCH] Implement fold() in Python API.

---
 pyspark/pyspark/rdd.py | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py
index 7d280d8844..af7703fdfc 100644
--- a/pyspark/pyspark/rdd.py
+++ b/pyspark/pyspark/rdd.py
@@ -141,7 +141,25 @@ class RDD(object):
         vals = PipelinedRDD(self, func).collect()
         return reduce(f, vals)
 
-    # TODO: fold
+    def fold(self, zeroValue, op):
+        """
+        Aggregate the elements of each partition, and then the results for all
+        the partitions, using a given associative function and a neutral "zero
+        value." The function op(t1, t2) is allowed to modify t1 and return it
+        as its result value to avoid object allocation; however, it should not
+        modify t2.
+
+        >>> from operator import add
+        >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
+        15
+        """
+        def func(iterator):
+            acc = zeroValue
+            for obj in iterator:
+                acc = op(obj, acc)
+            yield acc
+        vals = PipelinedRDD(self, func).collect()
+        return reduce(op, vals, zeroValue)
 
     # TODO: aggregate
 
-- 
GitLab