From f90ad5d426cb726079c490a9bb4b1100e2b4e602 Mon Sep 17 00:00:00 2001
From: Niklas Wilcke <1wilcke@informatik.uni-hamburg.de>
Date: Tue, 4 Nov 2014 09:57:03 -0800
Subject: [PATCH] [Spark-4060] [MLlib] exposing special rdd functions to the
 public

Author: Niklas Wilcke <1wilcke@informatik.uni-hamburg.de>

Closes #2907 from numbnut/master and squashes the following commits:

7f7c767 [Niklas Wilcke] [Spark-4060] [MLlib] exposing special rdd functions to the public, #2907
---
 .../spark/mllib/evaluation/AreaUnderCurve.scala       |  2 +-
 .../org/apache/spark/mllib/rdd/RDDFunctions.scala     | 11 ++++++-----
 .../scala/org/apache/spark/mllib/rdd/SlidingRDD.scala |  5 +++--
 .../apache/spark/mllib/rdd/RDDFunctionsSuite.scala    |  6 +++---
 4 files changed, 13 insertions(+), 11 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
index 7858ec6024..078fbfbe4f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
@@ -43,7 +43,7 @@ private[evaluation] object AreaUnderCurve {
    */
   def of(curve: RDD[(Double, Double)]): Double = {
     curve.sliding(2).aggregate(0.0)(
-      seqOp = (auc: Double, points: Seq[(Double, Double)]) => auc + trapezoid(points),
+      seqOp = (auc: Double, points: Array[(Double, Double)]) => auc + trapezoid(points),
       combOp = _ + _
     )
   }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
index b5e403bc8c..57c0768084 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
@@ -20,6 +20,7 @@ package org.apache.spark.mllib.rdd
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.HashPartitioner
 import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.RDD
@@ -28,8 +29,8 @@ import org.apache.spark.util.Utils
 /**
  * Machine learning specific RDD functions.
  */
-private[mllib]
-class RDDFunctions[T: ClassTag](self: RDD[T]) {
+@DeveloperApi
+class RDDFunctions[T: ClassTag](self: RDD[T]) extends Serializable {
 
   /**
    * Returns a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding
@@ -39,10 +40,10 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) {
    * trigger a Spark job if the parent RDD has more than one partitions and the window size is
    * greater than 1.
    */
-  def sliding(windowSize: Int): RDD[Seq[T]] = {
+  def sliding(windowSize: Int): RDD[Array[T]] = {
     require(windowSize > 0, s"Sliding window size must be positive, but got $windowSize.")
     if (windowSize == 1) {
-      self.map(Seq(_))
+      self.map(Array(_))
     } else {
       new SlidingRDD[T](self, windowSize)
     }
@@ -112,7 +113,7 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) {
   }
 }
 
-private[mllib]
+@DeveloperApi
 object RDDFunctions {
 
   /** Implicit conversion from an RDD to RDDFunctions. */
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
index dd80782c0f..35e81fcb3d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
@@ -45,15 +45,16 @@ class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T]
  */
 private[mllib]
 class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int)
-  extends RDD[Seq[T]](parent) {
+  extends RDD[Array[T]](parent) {
 
   require(windowSize > 1, s"Window size must be greater than 1, but got $windowSize.")
 
-  override def compute(split: Partition, context: TaskContext): Iterator[Seq[T]] = {
+  override def compute(split: Partition, context: TaskContext): Iterator[Array[T]] = {
     val part = split.asInstanceOf[SlidingRDDPartition[T]]
     (firstParent[T].iterator(part.prev, context) ++ part.tail)
       .sliding(windowSize)
       .withPartial(false)
+      .map(_.toArray)
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] =
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
index 27a19f7932..4ef67a40b9 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
@@ -42,9 +42,9 @@ class RDDFunctionsSuite extends FunSuite with LocalSparkContext {
     val data = Seq(Seq(1, 2, 3), Seq.empty[Int], Seq(4), Seq.empty[Int], Seq(5, 6, 7))
     val rdd = sc.parallelize(data, data.length).flatMap(s => s)
     assert(rdd.partitions.size === data.length)
-    val sliding = rdd.sliding(3)
-    val expected = data.flatMap(x => x).sliding(3).toList
-    assert(sliding.collect().toList === expected)
+    val sliding = rdd.sliding(3).collect().toSeq.map(_.toSeq)
+    val expected = data.flatMap(x => x).sliding(3).toSeq.map(_.toSeq)
+    assert(sliding === expected)
   }
 
   test("treeAggregate") {
-- 
GitLab