Skip to content
Snippets Groups Projects
Commit e8801d44 authored by ryanlecompte's avatar ryanlecompte
Browse files

use delegation for BoundedPriorityQueue, add Java API

parent 93b3f5e5
No related branches found
No related tags found
No related merge requests found
......@@ -731,19 +731,14 @@ abstract class RDD[T: ClassManifest](
* @return an array of top elements
*/
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
val topK = mapPartitions { items =>
mapPartitions { items =>
val queue = new BoundedPriorityQueue[T](num)
queue ++= items
Iterator.single(queue)
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}
val builder = Array.newBuilder[T]
builder.sizeHint(topK.size)
builder ++= topK
builder.result()
}.toArray
}
/**
......
......@@ -86,7 +86,6 @@ JavaRDDLike[T, JavaRDD[T]] {
*/
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))
}
object JavaRDD {
......
package spark.api.java
import java.util.{List => JList}
import java.util.{List => JList, Comparator}
import scala.Tuple2
import scala.collection.JavaConversions._
......@@ -351,4 +351,29 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def toDebugString(): String = {
rdd.toDebugString
}
/**
* Returns the top K elements from this RDD as defined by
* the specified Comparator[T].
* @param num the number of top elements to return
* @param comp the comparator that defines the order
* @return an array of top elements
*/
def top(num: Int, comp: Comparator[T]): JList[T] = {
import scala.collection.JavaConversions._
val topElems = rdd.top(num)(Ordering.comparatorToOrdering(comp))
val arr: java.util.Collection[T] = topElems.toSeq
new java.util.ArrayList(arr)
}
/**
* Returns the top K elements from this RDD using the
* natural ordering for T.
* @param num the number of top elements to return
* @return an array of top elements
*/
def top(num: Int): JList[T] = {
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
top(num, comp)
}
}
package spark.util
import java.io.Serializable
import java.util.{PriorityQueue => JPriorityQueue}
import scala.collection.generic.Growable
import scala.collection.JavaConverters._
/**
* Bounded priority queue. This class modifies the original PriorityQueue's
* add/offer methods such that only the top K elements are retained. The top
* K elements are defined by an implicit Ordering[A].
* Bounded priority queue. This class wraps the original PriorityQueue
* class and modifies it such that only the top K elements are retained.
* The top K elements are defined by an implicit Ordering[A].
*/
class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
extends JPriorityQueue[A](maxSize, ord) with Growable[A] {
extends Iterable[A] with Growable[A] with Serializable {
override def offer(a: A): Boolean = {
if (size < maxSize) super.offer(a)
else maybeReplaceLowest(a)
}
private val underlying = new JPriorityQueue[A](maxSize, ord)
override def add(a: A): Boolean = offer(a)
override def iterator: Iterator[A] = underlying.iterator.asScala
override def ++=(xs: TraversableOnce[A]): this.type = {
xs.foreach(add)
xs.foreach { this += _ }
this
}
override def +=(elem: A): this.type = {
add(elem)
if (size < maxSize) underlying.offer(elem)
else maybeReplaceLowest(elem)
this
}
......@@ -32,17 +32,14 @@ class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
this += elem1 += elem2 ++= elems
}
override def clear() { underlying.clear() }
private def maybeReplaceLowest(a: A): Boolean = {
val head = peek()
val head = underlying.peek()
if (head != null && ord.gt(a, head)) {
poll()
super.offer(a)
underlying.poll()
underlying.offer(a)
} else false
}
}
object BoundedPriorityQueue {
import scala.collection.JavaConverters._
implicit def asIterable[A](queue: BoundedPriorityQueue[A]): Iterable[A] = queue.asScala
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment