Skip to content
Snippets Groups Projects
Commit b859853b authored by Reynold Xin's avatar Reynold Xin
Browse files

SPARK-1321 Use Guava's top k implementation rather than our...

SPARK-1321 Use Guava's top k implementation rather than our BoundedPriorityQueue based implementation

Also updated the documentation for top and takeOrdered.

On my simple test of sorting 100 million (Int, Int) tuples using Spark, Guava's top k implementation (in Ordering) is much faster than the BoundedPriorityQueue implementation for roughly sorted input (10 - 20X faster), and still faster for purely random input (2 - 5X).

Author: Reynold Xin <rxin@apache.org>

Closes #229 from rxin/takeOrdered and squashes the following commits:

0d11844 [Reynold Xin] Use Guava's top k implementation rather than our BoundedPriorityQueue based implementation. Also updated the documentation for top and takeOrdered.
parent 4f7d547b
No related branches found
No related tags found
No related merge requests found
......@@ -927,32 +927,49 @@ abstract class RDD[T: ClassTag](
}
/**
* Returns the top K elements from this RDD as defined by
* the specified implicit Ordering[T].
* Returns the top K (largest) elements from this RDD as defined by the specified
* implicit Ordering[T]. This does the opposite of [[takeOrdered]]. For example:
* {{{
* sc.parallelize([10, 4, 2, 12, 3]).top(1)
* // returns [12]
*
* sc.parallelize([2, 3, 4, 5, 6]).top(2)
* // returns [6, 5]
* }}}
*
* @param num the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
mapPartitions { items =>
val queue = new BoundedPriorityQueue[T](num)
queue ++= items
Iterator.single(queue)
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord.reverse)
}
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)
/**
* Returns the first K elements from this RDD as defined by
* the specified implicit Ordering[T] and maintains the
* ordering.
* Returns the first K (smallest) elements from this RDD as defined by the specified
* implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]].
* For example:
* {{{
* sc.parallelize([10, 4, 2, 12, 3]).takeOrdered(1)
* // returns [12]
*
* sc.parallelize([2, 3, 4, 5, 6]).takeOrdered(2)
* // returns [2, 3]
* }}}
*
* @param num the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
/**
* Returns the max of this RDD as defined by the implicit Ordering[T].
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util.collection
import scala.collection.JavaConversions.{collectionAsScalaIterable, asJavaIterator}
import com.google.common.collect.{Ordering => GuavaOrdering}
/**
* Utility functions for collections.
*/
private[spark] object Utils {
/**
* Returns the first K elements from the input as defined by the specified implicit Ordering[T]
* and maintains the ordering.
*/
def takeOrdered[T](input: Iterator[T], num: Int)(implicit ord: Ordering[T]): Iterator[T] = {
val ordering = new GuavaOrdering[T] {
override def compare(l: T, r: T) = ord.compare(l, r)
}
collectionAsScalaIterable(ordering.leastOf(asJavaIterator(input), num)).iterator
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment