diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 295fe81ce69e4c15907bb0b870be7806b2ce338f..4982a1aa1553b6fe9958cb86eb92d50fb6f43894 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -359,28 +359,28 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( def getValueClass() = implicitly[ClassManifest[V]].erasure } - class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( - self: RDD[(K, V)]) - extends Logging - with Serializable { +class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( + self: RDD[(K, V)]) + extends Logging + with Serializable { - def sortByKey(ascending: Boolean = true): RDD[(K,V)] = { - val rangePartitionedRDD = self.partitionBy(new RangePartitioner(self.splits.size, self, ascending)) - new SortedRDD(rangePartitionedRDD, ascending) - } + def sortByKey(ascending: Boolean = true): RDD[(K,V)] = { + val rangePartitionedRDD = self.partitionBy(new RangePartitioner(self.splits.size, self, ascending)) + new SortedRDD(rangePartitionedRDD, ascending) } - - class SortedRDD[K <% Ordered[K], V](prev: RDD[(K, V)], ascending: Boolean) - extends RDD[(K, V)](prev.context) { - - override def splits = prev.splits - override val partitioner = prev.partitioner - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = { - prev.iterator(split).toArray - .sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator - } - } +} + +class SortedRDD[K <% Ordered[K], V](prev: RDD[(K, V)], ascending: Boolean) + extends RDD[(K, V)](prev.context) { + + override def splits = prev.splits + override val partitioner = prev.partitioner + override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = { + prev.iterator(split).toArray + .sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator + } +} class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev.context) { override def splits = prev.splits