From 46307b2cd3e504d5cf5c7121b903eaa9c4da4c4b Mon Sep 17 00:00:00 2001 From: Peng <peng.meng@intel.com> Date: Wed, 19 Jul 2017 09:56:48 +0100 Subject: [PATCH] [SPARK-21401][ML][MLLIB] add poll function for BoundedPriorityQueue ## What changes were proposed in this pull request? The most of BoundedPriorityQueue usages in ML/MLLIB are: Get the value of BoundedPriorityQueue, then sort it. For example, in Word2Vec: pq.toSeq.sortBy(-_._2) in ALS, pq.toArray.sorted() The test results show using pq.poll is much faster than sort the value. It is good to add the poll function for BoundedPriorityQueue. ## How was this patch tested? The existing UT Author: Peng <peng.meng@intel.com> Author: Peng Meng <peng.meng@intel.com> Closes #18620 from mpjlu/add-poll. --- .../spark/util/BoundedPriorityQueue.scala | 4 ++ .../util/BoundedPriorityQueueSuite.scala | 51 +++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/util/BoundedPriorityQueueSuite.scala diff --git a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala index 1b2b1932e0..eff0aa4453 100644 --- a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala +++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala @@ -51,6 +51,10 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Orderin this } + def poll(): A = { + underlying.poll() + } + override def +=(elem1: A, elem2: A, elems: A*): this.type = { this += elem1 += elem2 ++= elems } diff --git a/core/src/test/scala/org/apache/spark/util/BoundedPriorityQueueSuite.scala b/core/src/test/scala/org/apache/spark/util/BoundedPriorityQueueSuite.scala new file mode 100644 index 0000000000..9465ca70e9 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/BoundedPriorityQueueSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.spark.SparkFunSuite + +class BoundedPriorityQueueSuite extends SparkFunSuite { + test("BoundedPriorityQueue poll test") { + val pq = new BoundedPriorityQueue[Double](4) + + pq += 0.1 + pq += 1.5 + pq += 1.0 + pq += 0.3 + pq += 0.01 + + assert(pq.isEmpty == false) + assert(pq.poll() == 0.1) + assert(pq.poll() == 0.3) + assert(pq.poll() == 1.0) + assert(pq.poll() == 1.5) + assert(pq.isEmpty == true) + + val pq2 = new BoundedPriorityQueue[(Int, Double)](4)(Ordering.by(_._2)) + pq2 += 1 -> 0.5 + pq2 += 5 -> 0.1 + pq2 += 3 -> 0.3 + pq2 += 4 -> 0.2 + pq2 += 1 -> 0.4 + + assert(pq2.poll()._2 == 0.2) + assert(pq2.poll()._2 == 0.3) + assert(pq2.poll()._2 == 0.4) + assert(pq2.poll()._2 == 0.5) + } +} -- GitLab