Skip to content
Snippets Groups Projects
Commit 85b8f2c6 authored by Josh Rosen's avatar Josh Rosen
Browse files

Add epydoc API documentation for PySpark.

parent 2d98fff0
No related branches found
No related tags found
No related merge requests found
...@@ -25,10 +25,12 @@ To mark a block of code in your markdown to be syntax highlighted by jekyll duri ...@@ -25,10 +25,12 @@ To mark a block of code in your markdown to be syntax highlighted by jekyll duri
// supported languages too. // supported languages too.
{% endhighlight %} {% endhighlight %}
## Scaladoc ## API Docs (Scaladoc and Epydoc)
You can build just the Spark scaladoc by running `sbt/sbt doc` from the SPARK_PROJECT_ROOT directory. You can build just the Spark scaladoc by running `sbt/sbt doc` from the SPARK_PROJECT_ROOT directory.
When you run `jekyll` in the docs directory, it will also copy over the scala doc for the various Spark subprojects into the docs directory (and then also into the _site directory). We use a jekyll plugin to run `sbt/sbt doc` before building the site so if you haven't run it (recently) it may take some time as it generates all of the scaladoc. Similarly, you can build just the PySpark epydoc by running `epydoc --config epydoc.conf` from the SPARK_PROJECT_ROOT/pyspark directory.
NOTE: To skip the step of building and copying over the scaladoc when you build the docs, run `SKIP_SCALADOC=1 jekyll`. When you run `jekyll` in the docs directory, it will also copy over the scaladoc for the various Spark subprojects into the docs directory (and then also into the _site directory). We use a jekyll plugin to run `sbt/sbt doc` before building the site so if you haven't run it (recently) it may take some time as it generates all of the scaladoc. The jekyll plugin also generates the PySpark docs using [epydoc](http://epydoc.sourceforge.net/).
NOTE: To skip the step of building and copying over the scaladoc when you build the docs, run `SKIP_SCALADOC=1 jekyll`. Similarly, `SKIP_EPYDOC=1 jekyll` will skip PySpark API doc generation.
...@@ -49,8 +49,14 @@ ...@@ -49,8 +49,14 @@
<li><a href="java-programming-guide.html">Java</a></li> <li><a href="java-programming-guide.html">Java</a></li>
</ul> </ul>
</li> </li>
<li><a href="api/core/index.html">API (Scaladoc)</a></li> <li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">API<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="api/core/index.html">Scala/Java (Scaladoc)</a></li>
<li><a href="api/pyspark/index.html">Python (Epydoc)</a></li>
</ul>
</li>
<li class="dropdown"> <li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a> <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
......
...@@ -28,3 +28,20 @@ if ENV['SKIP_SCALADOC'] != '1' ...@@ -28,3 +28,20 @@ if ENV['SKIP_SCALADOC'] != '1'
cp_r(source + "/.", dest) cp_r(source + "/.", dest)
end end
end end
if ENV['SKIP_EPYDOC'] != '1'
puts "Moving to pyspark directory and building epydoc."
cd("../pyspark")
puts `epydoc --config epydoc.conf`
puts "Moving back into docs dir."
cd("../docs")
puts "echo making directory pyspark"
mkdir_p "pyspark"
puts "cp -r ../pyspark/docs/. api/pyspark"
cp_r("../pyspark/docs/.", "api/pyspark")
cd("..")
end
[epydoc] # Epydoc section marker (required by ConfigParser)
# Information about the project.
name: PySpark
url: http://spark-project.org
# The list of modules to document. Modules can be named using
# dotted names, module filenames, or package directory names.
# This option may be repeated.
modules: pyspark
# Write html output to the directory "apidocs"
output: html
target: docs/
private: no
exclude: pyspark.cloudpickle pyspark.worker pyspark.join pyspark.serializers
pyspark.java_gateway pyspark.examples pyspark.shell
...@@ -11,6 +11,11 @@ from py4j.java_collections import ListConverter ...@@ -11,6 +11,11 @@ from py4j.java_collections import ListConverter
class SparkContext(object): class SparkContext(object):
"""
Main entry point for Spark functionality. A SparkContext represents the
connection to a Spark cluster, and can be used to create L{RDD}s and
broadcast variables on that cluster.
"""
gateway = launch_gateway() gateway = launch_gateway()
jvm = gateway.jvm jvm = gateway.jvm
...@@ -36,10 +41,16 @@ class SparkContext(object): ...@@ -36,10 +41,16 @@ class SparkContext(object):
self._jsc.stop() self._jsc.stop()
def stop(self): def stop(self):
"""
Shut down the SparkContext.
"""
self._jsc.stop() self._jsc.stop()
self._jsc = None self._jsc = None
def parallelize(self, c, numSlices=None): def parallelize(self, c, numSlices=None):
"""
Distribute a local Python collection to form an RDD.
"""
numSlices = numSlices or self.defaultParallelism numSlices = numSlices or self.defaultParallelism
# Calling the Java parallelize() method with an ArrayList is too slow, # Calling the Java parallelize() method with an ArrayList is too slow,
# because it sends O(n) Py4J commands. As an alternative, serialized # because it sends O(n) Py4J commands. As an alternative, serialized
...@@ -53,17 +64,30 @@ class SparkContext(object): ...@@ -53,17 +64,30 @@ class SparkContext(object):
return RDD(jrdd, self) return RDD(jrdd, self)
def textFile(self, name, minSplits=None): def textFile(self, name, minSplits=None):
"""
Read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an
RDD of Strings.
"""
minSplits = minSplits or min(self.defaultParallelism, 2) minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jsc.textFile(name, minSplits) jrdd = self._jsc.textFile(name, minSplits)
return RDD(jrdd, self) return RDD(jrdd, self)
def union(self, rdds): def union(self, rdds):
"""
Build the union of a list of RDDs
"""
first = rdds[0]._jrdd first = rdds[0]._jrdd
rest = [x._jrdd for x in rdds[1:]] rest = [x._jrdd for x in rdds[1:]]
rest = ListConverter().convert(rest, self.gateway._gateway_client) rest = ListConverter().convert(rest, self.gateway._gateway_client)
return RDD(self._jsc.union(first, rest), self) return RDD(self._jsc.union(first, rest), self)
def broadcast(self, value): def broadcast(self, value):
"""
Broadcast a read-only variable to the cluster, returning a C{Broadcast}
object for reading it in distributed functions. The variable will be
sent to each cluster only once.
"""
jbroadcast = self._jsc.broadcast(bytearray(dump_pickle(value))) jbroadcast = self._jsc.broadcast(bytearray(dump_pickle(value)))
return Broadcast(jbroadcast.id(), value, jbroadcast, return Broadcast(jbroadcast.id(), value, jbroadcast,
self._pickled_broadcast_vars) self._pickled_broadcast_vars)
...@@ -18,24 +18,50 @@ from pyspark.join import python_join, python_left_outer_join, \ ...@@ -18,24 +18,50 @@ from pyspark.join import python_join, python_left_outer_join, \
from py4j.java_collections import ListConverter, MapConverter from py4j.java_collections import ListConverter, MapConverter
__all__ = ["RDD"]
class RDD(object): class RDD(object):
"""
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
Represents an immutable, partitioned collection of elements that can be
operated on in parallel.
"""
def __init__(self, jrdd, ctx): def __init__(self, jrdd, ctx):
self._jrdd = jrdd self._jrdd = jrdd
self.is_cached = False self.is_cached = False
self.ctx = ctx self.ctx = ctx
@property
def context(self):
"""
The L{SparkContext} that this RDD was created on.
"""
return self.ctx
def cache(self): def cache(self):
"""
Persist this RDD with the default storage level (C{MEMORY_ONLY}).
"""
self.is_cached = True self.is_cached = True
self._jrdd.cache() self._jrdd.cache()
return self return self
# TODO persist(self, storageLevel)
def map(self, f, preservesPartitioning=False): def map(self, f, preservesPartitioning=False):
"""
Return a new RDD containing the distinct elements in this RDD.
"""
def func(iterator): return imap(f, iterator) def func(iterator): return imap(f, iterator)
return PipelinedRDD(self, func, preservesPartitioning) return PipelinedRDD(self, func, preservesPartitioning)
def flatMap(self, f): def flatMap(self, f, preservesPartitioning=False):
""" """
Return a new RDD by first applying a function to all elements of this
RDD, and then flattening the results.
>>> rdd = sc.parallelize([2, 3, 4]) >>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
[1, 1, 1, 2, 2, 3] [1, 1, 1, 2, 2, 3]
...@@ -43,19 +69,25 @@ class RDD(object): ...@@ -43,19 +69,25 @@ class RDD(object):
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
""" """
def func(iterator): return chain.from_iterable(imap(f, iterator)) def func(iterator): return chain.from_iterable(imap(f, iterator))
return self.mapPartitions(func) return self.mapPartitions(func, preservesPartitioning)
def mapPartitions(self, f): def mapPartitions(self, f, preservesPartitioning=False):
""" """
Return a new RDD by applying a function to each partition of this RDD.
>>> rdd = sc.parallelize([1, 2, 3, 4], 2) >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(iterator): yield sum(iterator) >>> def f(iterator): yield sum(iterator)
>>> rdd.mapPartitions(f).collect() >>> rdd.mapPartitions(f).collect()
[3, 7] [3, 7]
""" """
return PipelinedRDD(self, f) return PipelinedRDD(self, f, preservesPartitioning)
# TODO: mapPartitionsWithSplit
def filter(self, f): def filter(self, f):
""" """
Return a new RDD containing only the elements that satisfy a predicate.
>>> rdd = sc.parallelize([1, 2, 3, 4, 5]) >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect() >>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4] [2, 4]
...@@ -65,6 +97,8 @@ class RDD(object): ...@@ -65,6 +97,8 @@ class RDD(object):
def distinct(self): def distinct(self):
""" """
Return a new RDD containing the distinct elements in this RDD.
>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
[1, 2, 3] [1, 2, 3]
""" """
...@@ -72,21 +106,25 @@ class RDD(object): ...@@ -72,21 +106,25 @@ class RDD(object):
.reduceByKey(lambda x, _: x) \ .reduceByKey(lambda x, _: x) \
.map(lambda (x, _): x) .map(lambda (x, _): x)
def sample(self, withReplacement, fraction, seed): # TODO: sampling needs to be re-implemented due to Batch
jrdd = self._jrdd.sample(withReplacement, fraction, seed) #def sample(self, withReplacement, fraction, seed):
return RDD(jrdd, self.ctx) # jrdd = self._jrdd.sample(withReplacement, fraction, seed)
# return RDD(jrdd, self.ctx)
def takeSample(self, withReplacement, num, seed): #def takeSample(self, withReplacement, num, seed):
vals = self._jrdd.takeSample(withReplacement, num, seed) # vals = self._jrdd.takeSample(withReplacement, num, seed)
return [load_pickle(bytes(x)) for x in vals] # return [load_pickle(bytes(x)) for x in vals]
def union(self, other): def union(self, other):
""" """
Return the union of this RDD and another one.
>>> rdd = sc.parallelize([1, 1, 2, 3]) >>> rdd = sc.parallelize([1, 1, 2, 3])
>>> rdd.union(rdd).collect() >>> rdd.union(rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3] [1, 1, 2, 3, 1, 1, 2, 3]
# Union of batched and unbatched RDDs: Union of batched and unbatched RDDs (internal test):
>>> batchedRDD = sc.parallelize([Batch([1, 2, 3, 4, 5])]) >>> batchedRDD = sc.parallelize([Batch([1, 2, 3, 4, 5])])
>>> rdd.union(batchedRDD).collect() >>> rdd.union(batchedRDD).collect()
[1, 1, 2, 3, 1, 2, 3, 4, 5] [1, 1, 2, 3, 1, 2, 3, 4, 5]
...@@ -95,6 +133,8 @@ class RDD(object): ...@@ -95,6 +133,8 @@ class RDD(object):
def __add__(self, other): def __add__(self, other):
""" """
Return the union of this RDD and another one.
>>> rdd = sc.parallelize([1, 1, 2, 3]) >>> rdd = sc.parallelize([1, 1, 2, 3])
>>> (rdd + rdd).collect() >>> (rdd + rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3] [1, 1, 2, 3, 1, 1, 2, 3]
...@@ -107,6 +147,9 @@ class RDD(object): ...@@ -107,6 +147,9 @@ class RDD(object):
def glom(self): def glom(self):
""" """
Return an RDD created by coalescing all elements within each partition
into a list.
>>> rdd = sc.parallelize([1, 2, 3, 4], 2) >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> rdd.glom().first() >>> rdd.glom().first()
[1, 2] [1, 2]
...@@ -116,6 +159,10 @@ class RDD(object): ...@@ -116,6 +159,10 @@ class RDD(object):
def cartesian(self, other): def cartesian(self, other):
""" """
Return the Cartesian product of this RDD and another one, that is, the
RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
C{b} is in C{other}.
>>> rdd = sc.parallelize([1, 2]) >>> rdd = sc.parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect()) >>> sorted(rdd.cartesian(rdd).collect())
[(1, 1), (1, 2), (2, 1), (2, 2)] [(1, 1), (1, 2), (2, 1), (2, 2)]
...@@ -124,6 +171,8 @@ class RDD(object): ...@@ -124,6 +171,8 @@ class RDD(object):
def groupBy(self, f, numSplits=None): def groupBy(self, f, numSplits=None):
""" """
Return an RDD of grouped items.
>>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
>>> result = rdd.groupBy(lambda x: x % 2).collect() >>> result = rdd.groupBy(lambda x: x % 2).collect()
>>> sorted([(x, sorted(y)) for (x, y) in result]) >>> sorted([(x, sorted(y)) for (x, y) in result])
...@@ -133,6 +182,8 @@ class RDD(object): ...@@ -133,6 +182,8 @@ class RDD(object):
def pipe(self, command, env={}): def pipe(self, command, env={}):
""" """
Return an RDD created by piping elements to a forked external process.
>>> sc.parallelize([1, 2, 3]).pipe('cat').collect() >>> sc.parallelize([1, 2, 3]).pipe('cat').collect()
['1', '2', '3'] ['1', '2', '3']
""" """
...@@ -148,12 +199,17 @@ class RDD(object): ...@@ -148,12 +199,17 @@ class RDD(object):
def foreach(self, f): def foreach(self, f):
""" """
Applies a function to all elements of this RDD.
>>> def f(x): print x >>> def f(x): print x
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
""" """
self.map(f).collect() # Force evaluation self.map(f).collect() # Force evaluation
def collect(self): def collect(self):
"""
Return a list that contains all of the elements in this RDD.
"""
picklesInJava = self._jrdd.rdd().collect() picklesInJava = self._jrdd.rdd().collect()
return list(self._collect_array_through_file(picklesInJava)) return list(self._collect_array_through_file(picklesInJava))
...@@ -176,6 +232,9 @@ class RDD(object): ...@@ -176,6 +232,9 @@ class RDD(object):
def reduce(self, f): def reduce(self, f):
""" """
Reduces the elements of this RDD using the specified associative binary
operator.
>>> from operator import add >>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15 15
...@@ -198,9 +257,11 @@ class RDD(object): ...@@ -198,9 +257,11 @@ class RDD(object):
""" """
Aggregate the elements of each partition, and then the results for all Aggregate the elements of each partition, and then the results for all
the partitions, using a given associative function and a neutral "zero the partitions, using a given associative function and a neutral "zero
value." The function op(t1, t2) is allowed to modify t1 and return it value."
The function C{op(t1, t2)} is allowed to modify C{t1} and return it
as its result value to avoid object allocation; however, it should not as its result value to avoid object allocation; however, it should not
modify t2. modify C{t2}.
>>> from operator import add >>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
...@@ -218,6 +279,8 @@ class RDD(object): ...@@ -218,6 +279,8 @@ class RDD(object):
def sum(self): def sum(self):
""" """
Add up the elements in this RDD.
>>> sc.parallelize([1.0, 2.0, 3.0]).sum() >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
6.0 6.0
""" """
...@@ -225,6 +288,8 @@ class RDD(object): ...@@ -225,6 +288,8 @@ class RDD(object):
def count(self): def count(self):
""" """
Return the number of elements in this RDD.
>>> sc.parallelize([2, 3, 4]).count() >>> sc.parallelize([2, 3, 4]).count()
3 3
>>> sc.parallelize([Batch([2, 3, 4])]).count() >>> sc.parallelize([Batch([2, 3, 4])]).count()
...@@ -234,6 +299,9 @@ class RDD(object): ...@@ -234,6 +299,9 @@ class RDD(object):
def countByValue(self): def countByValue(self):
""" """
Return the count of each unique value in this RDD as a dictionary of
(value, count) pairs.
>>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
[(1, 2), (2, 3)] [(1, 2), (2, 3)]
""" """
...@@ -250,6 +318,12 @@ class RDD(object): ...@@ -250,6 +318,12 @@ class RDD(object):
def take(self, num): def take(self, num):
""" """
Take the first num elements of the RDD.
This currently scans the partitions *one by one*, so it will be slow if
a lot of partitions are required. In that case, use L{collect} to get
the whole RDD instead.
>>> sc.parallelize([2, 3, 4]).take(2) >>> sc.parallelize([2, 3, 4]).take(2)
[2, 3] [2, 3]
""" """
...@@ -258,12 +332,18 @@ class RDD(object): ...@@ -258,12 +332,18 @@ class RDD(object):
def first(self): def first(self):
""" """
Return the first element in this RDD.
>>> sc.parallelize([2, 3, 4]).first() >>> sc.parallelize([2, 3, 4]).first()
2 2
""" """
return self.take(1)[0] return self.take(1)[0]
# TODO: add test and fix for use with Batch
def saveAsTextFile(self, path): def saveAsTextFile(self, path):
"""
Save this RDD as a text file, using string representations of elements.
"""
def func(iterator): def func(iterator):
return (str(x).encode("utf-8") for x in iterator) return (str(x).encode("utf-8") for x in iterator)
keyed = PipelinedRDD(self, func) keyed = PipelinedRDD(self, func)
...@@ -274,6 +354,8 @@ class RDD(object): ...@@ -274,6 +354,8 @@ class RDD(object):
def collectAsMap(self): def collectAsMap(self):
""" """
Return the key-value pairs in this RDD to the master as a dictionary.
>>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
>>> m[1] >>> m[1]
2 2
...@@ -284,6 +366,14 @@ class RDD(object): ...@@ -284,6 +366,14 @@ class RDD(object):
def reduceByKey(self, func, numSplits=None): def reduceByKey(self, func, numSplits=None):
""" """
Merge the values for each key using an associative reduce function.
This will also perform the merging locally on each mapper before
sending results to a reducer, similarly to a "combiner" in MapReduce.
Output will be hash-partitioned with C{numSplits} splits, or the
default parallelism level if C{numSplits} is not specified.
>>> from operator import add >>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect()) >>> sorted(rdd.reduceByKey(add).collect())
...@@ -293,6 +383,12 @@ class RDD(object): ...@@ -293,6 +383,12 @@ class RDD(object):
def reduceByKeyLocally(self, func): def reduceByKeyLocally(self, func):
""" """
Merge the values for each key using an associative reduce function, but
return the results immediately to the master as a dictionary.
This will also perform the merging locally on each mapper before
sending results to a reducer, similarly to a "combiner" in MapReduce.
>>> from operator import add >>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKeyLocally(add).items()) >>> sorted(rdd.reduceByKeyLocally(add).items())
...@@ -311,6 +407,9 @@ class RDD(object): ...@@ -311,6 +407,9 @@ class RDD(object):
def countByKey(self): def countByKey(self):
""" """
Count the number of elements for each key, and return the result to the
master as a dictionary.
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.countByKey().items()) >>> sorted(rdd.countByKey().items())
[('a', 2), ('b', 1)] [('a', 2), ('b', 1)]
...@@ -319,6 +418,14 @@ class RDD(object): ...@@ -319,6 +418,14 @@ class RDD(object):
def join(self, other, numSplits=None): def join(self, other, numSplits=None):
""" """
Return an RDD containing all pairs of elements with matching keys in
C{self} and C{other}.
Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
(k, v1) is in C{self} and (k, v2) is in C{other}.
Performs a hash join across the cluster.
>>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2), ("a", 3)]) >>> y = sc.parallelize([("a", 2), ("a", 3)])
>>> sorted(x.join(y).collect()) >>> sorted(x.join(y).collect())
...@@ -328,6 +435,14 @@ class RDD(object): ...@@ -328,6 +435,14 @@ class RDD(object):
def leftOuterJoin(self, other, numSplits=None): def leftOuterJoin(self, other, numSplits=None):
""" """
Perform a left outer join of C{self} and C{other}.
For each element (k, v) in C{self}, the resulting RDD will either
contain all pairs (k, (v, w)) for w in C{other}, or the pair
(k, (v, None)) if no elements in other have key k.
Hash-partitions the resulting RDD into the given number of partitions.
>>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)]) >>> y = sc.parallelize([("a", 2)])
>>> sorted(x.leftOuterJoin(y).collect()) >>> sorted(x.leftOuterJoin(y).collect())
...@@ -337,6 +452,14 @@ class RDD(object): ...@@ -337,6 +452,14 @@ class RDD(object):
def rightOuterJoin(self, other, numSplits=None): def rightOuterJoin(self, other, numSplits=None):
""" """
Perform a right outer join of C{self} and C{other}.
For each element (k, w) in C{other}, the resulting RDD will either
contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
if no elements in C{self} have key k.
Hash-partitions the resulting RDD into the given number of partitions.
>>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)]) >>> y = sc.parallelize([("a", 2)])
>>> sorted(y.rightOuterJoin(x).collect()) >>> sorted(y.rightOuterJoin(x).collect())
...@@ -344,8 +467,11 @@ class RDD(object): ...@@ -344,8 +467,11 @@ class RDD(object):
""" """
return python_right_outer_join(self, other, numSplits) return python_right_outer_join(self, other, numSplits)
# TODO: add option to control map-side combining
def partitionBy(self, numSplits, hashFunc=hash): def partitionBy(self, numSplits, hashFunc=hash):
""" """
Return a copy of the RDD partitioned using the specified partitioner.
>>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
>>> sets = pairs.partitionBy(2).glom().collect() >>> sets = pairs.partitionBy(2).glom().collect()
>>> set(sets[0]).intersection(set(sets[1])) >>> set(sets[0]).intersection(set(sets[1]))
...@@ -371,9 +497,27 @@ class RDD(object): ...@@ -371,9 +497,27 @@ class RDD(object):
jrdd = jrdd.map(self.ctx.jvm.ExtractValue()) jrdd = jrdd.map(self.ctx.jvm.ExtractValue())
return RDD(jrdd, self.ctx) return RDD(jrdd, self.ctx)
# TODO: add control over map-side aggregation
def combineByKey(self, createCombiner, mergeValue, mergeCombiners, def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
numSplits=None): numSplits=None):
""" """
Generic function to combine the elements for each key using a custom
set of aggregation functions.
Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
type" C. Note that V and C can be different -- for example, one might
group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
Users provide three functions:
- C{createCombiner}, which turns a V into a C (e.g., creates
a one-element list)
- C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
a list)
- C{mergeCombiners}, to combine two C's into a single one.
In addition, users can control the partitioning of the output RDD.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> def f(x): return x >>> def f(x): return x
>>> def add(a, b): return a + str(b) >>> def add(a, b): return a + str(b)
...@@ -402,8 +546,12 @@ class RDD(object): ...@@ -402,8 +546,12 @@ class RDD(object):
return combiners.iteritems() return combiners.iteritems()
return shuffled.mapPartitions(_mergeCombiners) return shuffled.mapPartitions(_mergeCombiners)
# TODO: support variant with custom partitioner
def groupByKey(self, numSplits=None): def groupByKey(self, numSplits=None):
""" """
Group the values for each key in the RDD into a single sequence.
Hash-partitions the resulting RDD with into numSplits partitions.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(x.groupByKey().collect()) >>> sorted(x.groupByKey().collect())
[('a', [1, 1]), ('b', [1])] [('a', [1, 1]), ('b', [1])]
...@@ -422,20 +570,39 @@ class RDD(object): ...@@ -422,20 +570,39 @@ class RDD(object):
return self.combineByKey(createCombiner, mergeValue, mergeCombiners, return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
numSplits) numSplits)
# TODO: add tests
def flatMapValues(self, f): def flatMapValues(self, f):
"""
Pass each value in the key-value pair RDD through a flatMap function
without changing the keys; this also retains the original RDD's
partitioning.
"""
flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
return self.flatMap(flat_map_fn) return self.flatMap(flat_map_fn, preservesPartitioning=True)
def mapValues(self, f): def mapValues(self, f):
"""
Pass each value in the key-value pair RDD through a map function
without changing the keys; this also retains the original RDD's
partitioning.
"""
map_values_fn = lambda (k, v): (k, f(v)) map_values_fn = lambda (k, v): (k, f(v))
return self.map(map_values_fn, preservesPartitioning=True) return self.map(map_values_fn, preservesPartitioning=True)
# TODO: support varargs cogroup of several RDDs. # TODO: support varargs cogroup of several RDDs.
def groupWith(self, other): def groupWith(self, other):
"""
Alias for cogroup.
"""
return self.cogroup(other) return self.cogroup(other)
# TODO: add variant with custom parittioner
def cogroup(self, other, numSplits=None): def cogroup(self, other, numSplits=None):
""" """
For each key k in C{self} or C{other}, return a resulting RDD that
contains a tuple with the list of values for that key in C{self} as well
as C{other}.
>>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)]) >>> y = sc.parallelize([("a", 2)])
>>> sorted(x.cogroup(y).collect()) >>> sorted(x.cogroup(y).collect())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment