Skip to content
Snippets Groups Projects
Commit 1e35e969 authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Davies Liu
Browse files

[SPARK-17817] [PYSPARK] [FOLLOWUP] PySpark RDD Repartitioning Results in...

[SPARK-17817] [PYSPARK] [FOLLOWUP] PySpark RDD Repartitioning Results in Highly Skewed Partition Sizes

## What changes were proposed in this pull request?

This change is a followup for #15389 which calls `_to_java_object_rdd()` to solve this issue. Due to the concern of the possible expensive cost of the call, we can choose to decrease the batch size to solve this issue too.

Simple benchmark:

    import time
    num_partitions = 20000
    a = sc.parallelize(range(int(1e6)), 2)
    start = time.time()
    l = a.repartition(num_partitions).glom().map(len).collect()
    end = time.time()
    print(end - start)

Before: 419.447577953
_to_java_object_rdd(): 421.916361094
decreasing the batch size: 423.712255955

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #15445 from viirya/repartition-batch-size.
parent cd106b05
No related branches found
No related tags found
No related merge requests found
......@@ -2029,12 +2029,12 @@ class RDD(object):
[[1, 2, 3, 4, 5]]
"""
if shuffle:
# In Scala's repartition code, we will distribute elements evenly across output
# partitions. However, the RDD from Python is serialized as a single binary data,
# so the distribution fails and produces highly skewed partitions. We need to
# convert it to a RDD of java object before repartitioning.
data_java_rdd = self._to_java_object_rdd().coalesce(numPartitions, shuffle)
jrdd = self.ctx._jvm.SerDeUtil.javaToPython(data_java_rdd)
# Decrease the batch size in order to distribute evenly the elements across output
# partitions. Otherwise, repartition will possibly produce highly skewed partitions.
batchSize = min(10, self.ctx._batchSize or 1024)
ser = BatchedSerializer(PickleSerializer(), batchSize)
selfCopy = self._reserialize(ser)
jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle)
else:
jrdd = self._jrdd.coalesce(numPartitions, shuffle)
return RDD(jrdd, self.ctx, self._jrdd_deserializer)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment