From b859853ba47b6323af0e31a4e2099e943221e1b1 Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@apache.org>
Date: Wed, 26 Mar 2014 00:09:44 -0700
Subject: [PATCH] SPARK-1321 Use Guava's top k implementation rather than our
 BoundedPriorityQueue based implementation

Also updated the documentation for top and takeOrdered.

On my simple test of sorting 100 million (Int, Int) tuples using Spark, Guava's top k implementation (in Ordering) is much faster than the BoundedPriorityQueue implementation for roughly sorted input (10 - 20X faster), and still faster for purely random input (2 - 5X).

Author: Reynold Xin <rxin@apache.org>

Closes #229 from rxin/takeOrdered and squashes the following commits:

0d11844 [Reynold Xin] Use Guava's top k implementation rather than our BoundedPriorityQueue based implementation. Also updated the documentation for top and takeOrdered.
---
 .../main/scala/org/apache/spark/rdd/RDD.scala | 49 +++++++++++++------
 .../apache/spark/util/collection/Utils.scala  | 39 +++++++++++++++
 2 files changed, 72 insertions(+), 16 deletions(-)
 create mode 100644 core/src/main/scala/org/apache/spark/util/collection/Utils.scala

diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 4f9d39f865..6af42248a5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -927,32 +927,49 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
-   * Returns the top K elements from this RDD as defined by
-   * the specified implicit Ordering[T].
+   * Returns the top K (largest) elements from this RDD as defined by the specified
+   * implicit Ordering[T]. This does the opposite of [[takeOrdered]]. For example:
+   * {{{
+   *   sc.parallelize([10, 4, 2, 12, 3]).top(1)
+   *   // returns [12]
+   *
+   *   sc.parallelize([2, 3, 4, 5, 6]).top(2)
+   *   // returns [6, 5]
+   * }}}
+   *
    * @param num the number of top elements to return
    * @param ord the implicit ordering for T
    * @return an array of top elements
    */
-  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
-    mapPartitions { items =>
-      val queue = new BoundedPriorityQueue[T](num)
-      queue ++= items
-      Iterator.single(queue)
-    }.reduce { (queue1, queue2) =>
-      queue1 ++= queue2
-      queue1
-    }.toArray.sorted(ord.reverse)
-  }
+  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)
 
   /**
-   * Returns the first K elements from this RDD as defined by
-   * the specified implicit Ordering[T] and maintains the
-   * ordering.
+   * Returns the first K (smallest) elements from this RDD as defined by the specified
+   * implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]].
+   * For example:
+   * {{{
+   *   sc.parallelize([10, 4, 2, 12, 3]).takeOrdered(1)
+   *   // returns [12]
+   *
+   *   sc.parallelize([2, 3, 4, 5, 6]).takeOrdered(2)
+   *   // returns [2, 3]
+   * }}}
+   *
    * @param num the number of top elements to return
    * @param ord the implicit ordering for T
    * @return an array of top elements
    */
-  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
+  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
+    mapPartitions { items =>
+      // Priority keeps the largest elements, so let's reverse the ordering.
+      val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
+      queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
+      Iterator.single(queue)
+    }.reduce { (queue1, queue2) =>
+      queue1 ++= queue2
+      queue1
+    }.toArray.sorted(ord)
+  }
 
   /**
    * Returns the max of this RDD as defined by the implicit Ordering[T].
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
new file mode 100644
index 0000000000..c5268c0fae
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.JavaConversions.{collectionAsScalaIterable, asJavaIterator}
+
+import com.google.common.collect.{Ordering => GuavaOrdering}
+
+/**
+ * Utility functions for collections.
+ */
+private[spark] object Utils {
+
+  /**
+   * Returns the first K elements from the input as defined by the specified implicit Ordering[T]
+   * and maintains the ordering.
+   */
+  def takeOrdered[T](input: Iterator[T], num: Int)(implicit ord: Ordering[T]): Iterator[T] = {
+    val ordering = new GuavaOrdering[T] {
+      override def compare(l: T, r: T) = ord.compare(l, r)
+    }
+    collectionAsScalaIterable(ordering.leastOf(asJavaIterator(input), num)).iterator
+  }
+}
-- 
GitLab