From 24705d0f46ce536bf829660f4506dcffd9ff799a Mon Sep 17 00:00:00 2001
From: seanm <sean.mcnamara@webtrends.com>
Date: Wed, 10 Jul 2013 10:33:11 -0700
Subject: [PATCH] adding takeOrdered() to RDD

---
 core/src/main/scala/spark/RDD.scala      | 12 ++++++++++++
 core/src/test/scala/spark/RDDSuite.scala | 18 ++++++++++++++++++
 2 files changed, 30 insertions(+)

diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 106fb2960f..af52040fa6 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -781,6 +781,18 @@ abstract class RDD[T: ClassManifest](
     }.toArray
   }
 
+  /**
+   * Returns the top K elements from this RDD as defined by
+   * the specified implicit Ordering[T] and maintains the
+   * ordering.
+   * @param num the number of top elements to return
+   * @param ord the implicit ordering for T
+   * @return an array of top elements
+   */
+  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
+    top(num)(ord.reverse).sorted(ord)
+  }
+
   /**
    * Save this RDD as a text file, using string representations of elements.
    */
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index e41ae385c0..fe17d1d5e7 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -252,6 +252,24 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(topK.sorted === Array("b", "a"))
   }
 
+  test("takeOrdered with predefined ordering") {
+    val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+    val rdd = sc.makeRDD(nums, 2)
+    val sortedTopK = rdd.takeOrdered(5)
+    assert(sortedTopK.size === 5)
+    assert(sortedTopK === Array(1, 2, 3, 4, 5))
+  }
+
+  test("takeOrdered with custom ordering") {
+    val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+    implicit val ord = implicitly[Ordering[Int]].reverse
+    val rdd = sc.makeRDD(nums, 2)
+    val sortedTopK = rdd.takeOrdered(5)
+    assert(sortedTopK.size === 5)
+    assert(sortedTopK === Array(10, 9, 8, 7, 6))
+    assert(sortedTopK === nums.sorted(ord).take(5))
+  }
+
   test("takeSample") {
     val data = sc.parallelize(1 to 100, 2)
     for (seed <- 1 to 5) {
-- 
GitLab