Skip to content
Snippets Groups Projects
Commit 066ce0ff authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #492 from JoshRosen/pyspark-split-rename

Change numSplits to numPartitions in PySpark
parents 3b9f9294 2c966c98
No related branches found
No related tags found
No related merge requests found
...@@ -32,13 +32,13 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ...@@ -32,13 +32,13 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
""" """
def _do_python_join(rdd, other, numSplits, dispatch): def _do_python_join(rdd, other, numPartitions, dispatch):
vs = rdd.map(lambda (k, v): (k, (1, v))) vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v))) ws = other.map(lambda (k, v): (k, (2, v)))
return vs.union(ws).groupByKey(numSplits).flatMapValues(dispatch) return vs.union(ws).groupByKey(numPartitions).flatMapValues(dispatch)
def python_join(rdd, other, numSplits): def python_join(rdd, other, numPartitions):
def dispatch(seq): def dispatch(seq):
vbuf, wbuf = [], [] vbuf, wbuf = [], []
for (n, v) in seq: for (n, v) in seq:
...@@ -47,10 +47,10 @@ def python_join(rdd, other, numSplits): ...@@ -47,10 +47,10 @@ def python_join(rdd, other, numSplits):
elif n == 2: elif n == 2:
wbuf.append(v) wbuf.append(v)
return [(v, w) for v in vbuf for w in wbuf] return [(v, w) for v in vbuf for w in wbuf]
return _do_python_join(rdd, other, numSplits, dispatch) return _do_python_join(rdd, other, numPartitions, dispatch)
def python_right_outer_join(rdd, other, numSplits): def python_right_outer_join(rdd, other, numPartitions):
def dispatch(seq): def dispatch(seq):
vbuf, wbuf = [], [] vbuf, wbuf = [], []
for (n, v) in seq: for (n, v) in seq:
...@@ -61,10 +61,10 @@ def python_right_outer_join(rdd, other, numSplits): ...@@ -61,10 +61,10 @@ def python_right_outer_join(rdd, other, numSplits):
if not vbuf: if not vbuf:
vbuf.append(None) vbuf.append(None)
return [(v, w) for v in vbuf for w in wbuf] return [(v, w) for v in vbuf for w in wbuf]
return _do_python_join(rdd, other, numSplits, dispatch) return _do_python_join(rdd, other, numPartitions, dispatch)
def python_left_outer_join(rdd, other, numSplits): def python_left_outer_join(rdd, other, numPartitions):
def dispatch(seq): def dispatch(seq):
vbuf, wbuf = [], [] vbuf, wbuf = [], []
for (n, v) in seq: for (n, v) in seq:
...@@ -75,10 +75,10 @@ def python_left_outer_join(rdd, other, numSplits): ...@@ -75,10 +75,10 @@ def python_left_outer_join(rdd, other, numSplits):
if not wbuf: if not wbuf:
wbuf.append(None) wbuf.append(None)
return [(v, w) for v in vbuf for w in wbuf] return [(v, w) for v in vbuf for w in wbuf]
return _do_python_join(rdd, other, numSplits, dispatch) return _do_python_join(rdd, other, numPartitions, dispatch)
def python_cogroup(rdd, other, numSplits): def python_cogroup(rdd, other, numPartitions):
vs = rdd.map(lambda (k, v): (k, (1, v))) vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v))) ws = other.map(lambda (k, v): (k, (2, v)))
def dispatch(seq): def dispatch(seq):
...@@ -89,4 +89,4 @@ def python_cogroup(rdd, other, numSplits): ...@@ -89,4 +89,4 @@ def python_cogroup(rdd, other, numSplits):
elif n == 2: elif n == 2:
wbuf.append(v) wbuf.append(v)
return (vbuf, wbuf) return (vbuf, wbuf)
return vs.union(ws).groupByKey(numSplits).mapValues(dispatch) return vs.union(ws).groupByKey(numPartitions).mapValues(dispatch)
...@@ -215,7 +215,7 @@ class RDD(object): ...@@ -215,7 +215,7 @@ class RDD(object):
yield pair yield pair
return java_cartesian.flatMap(unpack_batches) return java_cartesian.flatMap(unpack_batches)
def groupBy(self, f, numSplits=None): def groupBy(self, f, numPartitions=None):
""" """
Return an RDD of grouped items. Return an RDD of grouped items.
...@@ -224,7 +224,7 @@ class RDD(object): ...@@ -224,7 +224,7 @@ class RDD(object):
>>> sorted([(x, sorted(y)) for (x, y) in result]) >>> sorted([(x, sorted(y)) for (x, y) in result])
[(0, [2, 8]), (1, [1, 1, 3, 5])] [(0, [2, 8]), (1, [1, 1, 3, 5])]
""" """
return self.map(lambda x: (f(x), x)).groupByKey(numSplits) return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
def pipe(self, command, env={}): def pipe(self, command, env={}):
""" """
...@@ -274,7 +274,7 @@ class RDD(object): ...@@ -274,7 +274,7 @@ class RDD(object):
def reduce(self, f): def reduce(self, f):
""" """
Reduces the elements of this RDD using the specified commutative and Reduces the elements of this RDD using the specified commutative and
associative binary operator. associative binary operator.
>>> from operator import add >>> from operator import add
...@@ -422,22 +422,22 @@ class RDD(object): ...@@ -422,22 +422,22 @@ class RDD(object):
""" """
return dict(self.collect()) return dict(self.collect())
def reduceByKey(self, func, numSplits=None): def reduceByKey(self, func, numPartitions=None):
""" """
Merge the values for each key using an associative reduce function. Merge the values for each key using an associative reduce function.
This will also perform the merging locally on each mapper before This will also perform the merging locally on each mapper before
sending results to a reducer, similarly to a "combiner" in MapReduce. sending results to a reducer, similarly to a "combiner" in MapReduce.
Output will be hash-partitioned with C{numSplits} splits, or the Output will be hash-partitioned with C{numPartitions} partitions, or
default parallelism level if C{numSplits} is not specified. the default parallelism level if C{numPartitions} is not specified.
>>> from operator import add >>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect()) >>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)] [('a', 2), ('b', 1)]
""" """
return self.combineByKey(lambda x: x, func, func, numSplits) return self.combineByKey(lambda x: x, func, func, numPartitions)
def reduceByKeyLocally(self, func): def reduceByKeyLocally(self, func):
""" """
...@@ -474,7 +474,7 @@ class RDD(object): ...@@ -474,7 +474,7 @@ class RDD(object):
""" """
return self.map(lambda x: x[0]).countByValue() return self.map(lambda x: x[0]).countByValue()
def join(self, other, numSplits=None): def join(self, other, numPartitions=None):
""" """
Return an RDD containing all pairs of elements with matching keys in Return an RDD containing all pairs of elements with matching keys in
C{self} and C{other}. C{self} and C{other}.
...@@ -489,9 +489,9 @@ class RDD(object): ...@@ -489,9 +489,9 @@ class RDD(object):
>>> sorted(x.join(y).collect()) >>> sorted(x.join(y).collect())
[('a', (1, 2)), ('a', (1, 3))] [('a', (1, 2)), ('a', (1, 3))]
""" """
return python_join(self, other, numSplits) return python_join(self, other, numPartitions)
def leftOuterJoin(self, other, numSplits=None): def leftOuterJoin(self, other, numPartitions=None):
""" """
Perform a left outer join of C{self} and C{other}. Perform a left outer join of C{self} and C{other}.
...@@ -506,9 +506,9 @@ class RDD(object): ...@@ -506,9 +506,9 @@ class RDD(object):
>>> sorted(x.leftOuterJoin(y).collect()) >>> sorted(x.leftOuterJoin(y).collect())
[('a', (1, 2)), ('b', (4, None))] [('a', (1, 2)), ('b', (4, None))]
""" """
return python_left_outer_join(self, other, numSplits) return python_left_outer_join(self, other, numPartitions)
def rightOuterJoin(self, other, numSplits=None): def rightOuterJoin(self, other, numPartitions=None):
""" """
Perform a right outer join of C{self} and C{other}. Perform a right outer join of C{self} and C{other}.
...@@ -523,10 +523,10 @@ class RDD(object): ...@@ -523,10 +523,10 @@ class RDD(object):
>>> sorted(y.rightOuterJoin(x).collect()) >>> sorted(y.rightOuterJoin(x).collect())
[('a', (2, 1)), ('b', (None, 4))] [('a', (2, 1)), ('b', (None, 4))]
""" """
return python_right_outer_join(self, other, numSplits) return python_right_outer_join(self, other, numPartitions)
# TODO: add option to control map-side combining # TODO: add option to control map-side combining
def partitionBy(self, numSplits, partitionFunc=hash): def partitionBy(self, numPartitions, partitionFunc=hash):
""" """
Return a copy of the RDD partitioned using the specified partitioner. Return a copy of the RDD partitioned using the specified partitioner.
...@@ -535,22 +535,22 @@ class RDD(object): ...@@ -535,22 +535,22 @@ class RDD(object):
>>> set(sets[0]).intersection(set(sets[1])) >>> set(sets[0]).intersection(set(sets[1]))
set([]) set([])
""" """
if numSplits is None: if numPartitions is None:
numSplits = self.ctx.defaultParallelism numPartitions = self.ctx.defaultParallelism
# Transferring O(n) objects to Java is too expensive. Instead, we'll # Transferring O(n) objects to Java is too expensive. Instead, we'll
# form the hash buckets in Python, transferring O(numSplits) objects # form the hash buckets in Python, transferring O(numPartitions) objects
# to Java. Each object is a (splitNumber, [objects]) pair. # to Java. Each object is a (splitNumber, [objects]) pair.
def add_shuffle_key(split, iterator): def add_shuffle_key(split, iterator):
buckets = defaultdict(list) buckets = defaultdict(list)
for (k, v) in iterator: for (k, v) in iterator:
buckets[partitionFunc(k) % numSplits].append((k, v)) buckets[partitionFunc(k) % numPartitions].append((k, v))
for (split, items) in buckets.iteritems(): for (split, items) in buckets.iteritems():
yield str(split) yield str(split)
yield dump_pickle(Batch(items)) yield dump_pickle(Batch(items))
keyed = PipelinedRDD(self, add_shuffle_key) keyed = PipelinedRDD(self, add_shuffle_key)
keyed._bypass_serializer = True keyed._bypass_serializer = True
pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
partitioner = self.ctx._jvm.PythonPartitioner(numSplits, partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
id(partitionFunc)) id(partitionFunc))
jrdd = pairRDD.partitionBy(partitioner).values() jrdd = pairRDD.partitionBy(partitioner).values()
rdd = RDD(jrdd, self.ctx) rdd = RDD(jrdd, self.ctx)
...@@ -561,7 +561,7 @@ class RDD(object): ...@@ -561,7 +561,7 @@ class RDD(object):
# TODO: add control over map-side aggregation # TODO: add control over map-side aggregation
def combineByKey(self, createCombiner, mergeValue, mergeCombiners, def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
numSplits=None): numPartitions=None):
""" """
Generic function to combine the elements for each key using a custom Generic function to combine the elements for each key using a custom
set of aggregation functions. set of aggregation functions.
...@@ -586,8 +586,8 @@ class RDD(object): ...@@ -586,8 +586,8 @@ class RDD(object):
>>> sorted(x.combineByKey(str, add, add).collect()) >>> sorted(x.combineByKey(str, add, add).collect())
[('a', '11'), ('b', '1')] [('a', '11'), ('b', '1')]
""" """
if numSplits is None: if numPartitions is None:
numSplits = self.ctx.defaultParallelism numPartitions = self.ctx.defaultParallelism
def combineLocally(iterator): def combineLocally(iterator):
combiners = {} combiners = {}
for (k, v) in iterator: for (k, v) in iterator:
...@@ -597,7 +597,7 @@ class RDD(object): ...@@ -597,7 +597,7 @@ class RDD(object):
combiners[k] = mergeValue(combiners[k], v) combiners[k] = mergeValue(combiners[k], v)
return combiners.iteritems() return combiners.iteritems()
locally_combined = self.mapPartitions(combineLocally) locally_combined = self.mapPartitions(combineLocally)
shuffled = locally_combined.partitionBy(numSplits) shuffled = locally_combined.partitionBy(numPartitions)
def _mergeCombiners(iterator): def _mergeCombiners(iterator):
combiners = {} combiners = {}
for (k, v) in iterator: for (k, v) in iterator:
...@@ -609,10 +609,10 @@ class RDD(object): ...@@ -609,10 +609,10 @@ class RDD(object):
return shuffled.mapPartitions(_mergeCombiners) return shuffled.mapPartitions(_mergeCombiners)
# TODO: support variant with custom partitioner # TODO: support variant with custom partitioner
def groupByKey(self, numSplits=None): def groupByKey(self, numPartitions=None):
""" """
Group the values for each key in the RDD into a single sequence. Group the values for each key in the RDD into a single sequence.
Hash-partitions the resulting RDD with into numSplits partitions. Hash-partitions the resulting RDD with into numPartitions partitions.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(x.groupByKey().collect()) >>> sorted(x.groupByKey().collect())
...@@ -630,7 +630,7 @@ class RDD(object): ...@@ -630,7 +630,7 @@ class RDD(object):
return a + b return a + b
return self.combineByKey(createCombiner, mergeValue, mergeCombiners, return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
numSplits) numPartitions)
# TODO: add tests # TODO: add tests
def flatMapValues(self, f): def flatMapValues(self, f):
...@@ -659,7 +659,7 @@ class RDD(object): ...@@ -659,7 +659,7 @@ class RDD(object):
return self.cogroup(other) return self.cogroup(other)
# TODO: add variant with custom parittioner # TODO: add variant with custom parittioner
def cogroup(self, other, numSplits=None): def cogroup(self, other, numPartitions=None):
""" """
For each key k in C{self} or C{other}, return a resulting RDD that For each key k in C{self} or C{other}, return a resulting RDD that
contains a tuple with the list of values for that key in C{self} as well contains a tuple with the list of values for that key in C{self} as well
...@@ -670,7 +670,7 @@ class RDD(object): ...@@ -670,7 +670,7 @@ class RDD(object):
>>> sorted(x.cogroup(y).collect()) >>> sorted(x.cogroup(y).collect())
[('a', ([1], [2])), ('b', ([4], []))] [('a', ([1], [2])), ('b', ([4], []))]
""" """
return python_cogroup(self, other, numSplits) return python_cogroup(self, other, numPartitions)
# TODO: `lookup` is disabled because we can't make direct comparisons based # TODO: `lookup` is disabled because we can't make direct comparisons based
# on the key; we need to compare the hash of the key to the hash of the # on the key; we need to compare the hash of the key to the hash of the
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment