From 77d046ec47a9bfa6323aa014869844c28e18e049 Mon Sep 17 00:00:00 2001
From: Sergey Serebryakov <sserebryakov@tesla.com>
Date: Mon, 21 Aug 2017 08:21:25 +0100
Subject: [PATCH] [SPARK-21782][CORE] Repartition creates skews when
 numPartitions is a power of 2

## Problem
When an RDD (particularly with a low item-per-partition ratio) is repartitioned to numPartitions = power of 2, the resulting partitions are very uneven-sized, due to using fixed seed to initialize PRNG, and using the PRNG only once. See details in https://issues.apache.org/jira/browse/SPARK-21782

## What changes were proposed in this pull request?
Instead of directly using `0, 1, 2,...` seeds to initialize `Random`, hash them with `scala.util.hashing.byteswap32()`.

## How was this patch tested?
`build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.rdd.RDDSuite test`

Author: Sergey Serebryakov <sserebryakov@tesla.com>

Closes #18990 from megaserg/repartition-skew.
---
 core/src/main/scala/org/apache/spark/rdd/RDD.scala      | 3 ++-
 core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 6 ++++--
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 5435f59ea0..8798dfc925 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.io.Codec
 import scala.language.implicitConversions
 import scala.reflect.{classTag, ClassTag}
+import scala.util.hashing
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
 import org.apache.hadoop.io.{BytesWritable, NullWritable, Text}
@@ -448,7 +449,7 @@ abstract class RDD[T: ClassTag](
     if (shuffle) {
       /** Distributes elements evenly across output partitions, starting from a random partition. */
       val distributePartition = (index: Int, items: Iterator[T]) => {
-        var position = (new Random(index)).nextInt(numPartitions)
+        var position = (new Random(hashing.byteswap32(index))).nextInt(numPartitions)
         items.map { t =>
           // Note that the hash code of the key will just be the key itself. The HashPartitioner
           // will mod it with the number of total partitions.
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 386c0060f9..e994d724c4 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -347,16 +347,18 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
       val partitions = repartitioned.glom().collect()
       // assert all elements are present
       assert(repartitioned.collect().sortWith(_ > _).toSeq === input.toSeq.sortWith(_ > _).toSeq)
-      // assert no bucket is overloaded
+      // assert no bucket is overloaded or empty
       for (partition <- partitions) {
         val avg = input.size / finalPartitions
         val maxPossible = avg + initialPartitions
-        assert(partition.length <=  maxPossible)
+        assert(partition.length <= maxPossible)
+        assert(!partition.isEmpty)
       }
     }
 
     testSplitPartitions(Array.fill(100)(1), 10, 20)
     testSplitPartitions(Array.fill(10000)(1) ++ Array.fill(10000)(2), 20, 100)
+    testSplitPartitions(Array.fill(1000)(1), 250, 128)
   }
 
   test("coalesced RDDs") {
-- 
GitLab