Skip to content
Snippets Groups Projects
Commit 62079816 authored by Antonio's avatar Antonio
Browse files

Added fixes to sorting

parent e93f6226
No related branches found
No related tags found
No related merge requests found
......@@ -377,7 +377,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
override val partitioner = prev.partitioner
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = {
prev.iterator(split).toList
prev.iterator(split).toArray
.sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator
}
}
......
......@@ -36,27 +36,22 @@ class RangePartitioner[K <% Ordered[K],V](partitions: Int, rdd: RDD[(K,V)], asce
val rddSample = rdd.sample(true, frac, 1).collect.toList
.sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1)
.map(_._1)
val bucketSize:Float = rddSample.size / partitions
val bucketSize = rddSample.size / partitions
val rangeBounds = rddSample.zipWithIndex.filter(_._2 % bucketSize == 0)
.map(_._1).slice(1, partitions)
def getPartition(key: Any): Int = {
key match {
case k:K => {
val p =
rangeBounds.zipWithIndex.foldLeft(0) {
case (part, (bound, index)) =>
if (k > bound) index + 1 else part
}
if (ascending) p else numPartitions-1-p
val k = key.asInstanceOf[K]
val p = rangeBounds.zipWithIndex.foldLeft(0) {
case (part, (bound, index)) =>
if (k > bound) index + 1 else part
}
case _ => 0
}
if (ascending) p else numPartitions-1-p
}
override def equals(other: Any): Boolean = other match {
case r: RangePartitioner[K,V] =>
r.numPartitions == numPartitions
case r: RangePartitioner[_,_] =>
r.numPartitions == numPartitions & r.rangeBounds == rangeBounds
case _ => false
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment