diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 106fb2960f57af092f23ee266ebde72c561bd181..af52040fa6872afa7eb0e20a6a0976c5b9190982 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -781,6 +781,18 @@ abstract class RDD[T: ClassManifest]( }.toArray } + /** + * Returns the top K elements from this RDD as defined by + * the specified implicit Ordering[T] and maintains the + * ordering. + * @param num the number of top elements to return + * @param ord the implicit ordering for T + * @return an array of top elements + */ + def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = { + top(num)(ord.reverse).sorted(ord) + } + /** * Save this RDD as a text file, using string representations of elements. */ diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index e41ae385c0d833323ca9bbcda89c5d64520a8fe2..fe17d1d5e7147f8754100e36ee9c2ad3932c898d 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -252,6 +252,24 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(topK.sorted === Array("b", "a")) } + test("takeOrdered with predefined ordering") { + val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + val rdd = sc.makeRDD(nums, 2) + val sortedTopK = rdd.takeOrdered(5) + assert(sortedTopK.size === 5) + assert(sortedTopK === Array(1, 2, 3, 4, 5)) + } + + test("takeOrdered with custom ordering") { + val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + implicit val ord = implicitly[Ordering[Int]].reverse + val rdd = sc.makeRDD(nums, 2) + val sortedTopK = rdd.takeOrdered(5) + assert(sortedTopK.size === 5) + assert(sortedTopK === Array(10, 9, 8, 7, 6)) + assert(sortedTopK === nums.sorted(ord).take(5)) + } + test("takeSample") { val data = sc.parallelize(1 to 100, 2) for (seed <- 1 to 5) {