Skip to content
Snippets Groups Projects
Commit 7dd9fc67 authored by Kan Zhang's avatar Kan Zhang Committed by Matei Zaharia
Browse files

[SPARK-1837] NumericRange should be partitioned in the same way as other...

... sequences

Author: Kan Zhang <kzhang@apache.org>

Closes #776 from kanzhang/SPARK-1837 and squashes the following commits:

e48f018 [Kan Zhang] [SPARK-1837] code refactoring
67c33b5 [Kan Zhang] minor change
403f9b1 [Kan Zhang] [SPARK-1837] NumericRange should be partitioned in the same way as other sequences
parent b52603b0
No related branches found
No related tags found
No related merge requests found
...@@ -117,6 +117,15 @@ private object ParallelCollectionRDD { ...@@ -117,6 +117,15 @@ private object ParallelCollectionRDD {
if (numSlices < 1) { if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of slices required") throw new IllegalArgumentException("Positive number of slices required")
} }
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map(i => {
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
})
}
seq match { seq match {
case r: Range.Inclusive => { case r: Range.Inclusive => {
val sign = if (r.step < 0) { val sign = if (r.step < 0) {
...@@ -128,18 +137,17 @@ private object ParallelCollectionRDD { ...@@ -128,18 +137,17 @@ private object ParallelCollectionRDD {
r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices) r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
} }
case r: Range => { case r: Range => {
(0 until numSlices).map(i => { positions(r.length, numSlices).map({
val start = ((i * r.length.toLong) / numSlices).toInt case (start, end) =>
val end = (((i + 1) * r.length.toLong) / numSlices).toInt new Range(r.start + start * r.step, r.start + end * r.step, r.step)
new Range(r.start + start * r.step, r.start + end * r.step, r.step) }).toSeq.asInstanceOf[Seq[Seq[T]]]
}).asInstanceOf[Seq[Seq[T]]]
} }
case nr: NumericRange[_] => { case nr: NumericRange[_] => {
// For ranges of Long, Double, BigInteger, etc // For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices) val slices = new ArrayBuffer[Seq[T]](numSlices)
val sliceSize = (nr.size + numSlices - 1) / numSlices // Round up to catch everything
var r = nr var r = nr
for (i <- 0 until numSlices) { for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]] slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize) r = r.drop(sliceSize)
} }
...@@ -147,11 +155,10 @@ private object ParallelCollectionRDD { ...@@ -147,11 +155,10 @@ private object ParallelCollectionRDD {
} }
case _ => { case _ => {
val array = seq.toArray // To prevent O(n^2) operations for List etc val array = seq.toArray // To prevent O(n^2) operations for List etc
(0 until numSlices).map(i => { positions(array.length, numSlices).map({
val start = ((i * array.length.toLong) / numSlices).toInt case (start, end) =>
val end = (((i + 1) * array.length.toLong) / numSlices).toInt array.slice(start, end).toSeq
array.slice(start, end).toSeq }).toSeq
})
} }
} }
} }
......
...@@ -111,6 +111,24 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { ...@@ -111,6 +111,24 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices.forall(_.isInstanceOf[Range])) assert(slices.forall(_.isInstanceOf[Range]))
} }
test("identical slice sizes between Range and NumericRange") {
val r = ParallelCollectionRDD.slice(1 to 7, 4)
val nr = ParallelCollectionRDD.slice(1L to 7L, 4)
assert(r.size === 4)
for (i <- 0 until r.size) {
assert(r(i).size === nr(i).size)
}
}
test("identical slice sizes between List and NumericRange") {
val r = ParallelCollectionRDD.slice(List(1, 2), 4)
val nr = ParallelCollectionRDD.slice(1L to 2L, 4)
assert(r.size === 4)
for (i <- 0 until r.size) {
assert(r(i).size === nr(i).size)
}
}
test("large ranges don't overflow") { test("large ranges don't overflow") {
val N = 100 * 1000 * 1000 val N = 100 * 1000 * 1000
val data = 0 until N val data = 0 until N
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment