diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 5435f59ea0d28e29265999f5d66e2c7c61106f77..8798dfc925362b82b4453571825365e81ef84ee9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import scala.io.Codec import scala.language.implicitConversions import scala.reflect.{classTag, ClassTag} +import scala.util.hashing import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.io.{BytesWritable, NullWritable, Text} @@ -448,7 +449,7 @@ abstract class RDD[T: ClassTag]( if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { - var position = (new Random(index)).nextInt(numPartitions) + var position = (new Random(hashing.byteswap32(index))).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 386c0060f9c41ffbd62da5de2b3dde8b32813ce0..e994d724c462f822cd466d0afeaf4d6322de2916 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -347,16 +347,18 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { val partitions = repartitioned.glom().collect() // assert all elements are present assert(repartitioned.collect().sortWith(_ > _).toSeq === input.toSeq.sortWith(_ > _).toSeq) - // assert no bucket is overloaded + // assert no bucket is overloaded or empty for (partition <- partitions) { val avg = input.size / finalPartitions val maxPossible = avg + initialPartitions - assert(partition.length <= maxPossible) + assert(partition.length <= maxPossible) + assert(!partition.isEmpty) } } testSplitPartitions(Array.fill(100)(1), 10, 20) testSplitPartitions(Array.fill(10000)(1) ++ Array.fill(10000)(2), 20, 100) + testSplitPartitions(Array.fill(1000)(1), 250, 128) } test("coalesced RDDs") {