From e7713adb99f6b377c2c2b79dba08d2ccf5fa8909 Mon Sep 17 00:00:00 2001
From: Stephen Haberman <stephen@exigencecorp.com>
Date: Sat, 16 Feb 2013 13:20:48 -0600
Subject: [PATCH] Move ParallelCollection into spark.rdd package.

---
 core/src/main/scala/spark/SparkContext.scala  |  6 +--
 .../ParallelCollectionRDD.scala}              | 17 +++-----
 .../ParallelCollectionSplitSuite.scala        | 40 +++++++++----------
 3 files changed, 29 insertions(+), 34 deletions(-)
 rename core/src/main/scala/spark/{ParallelCollection.scala => rdd/ParallelCollectionRDD.scala} (90%)
 rename core/src/test/scala/spark/{ => rdd}/ParallelCollectionSplitSuite.scala (83%)

diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 0efc00d5dd..047b57dc1f 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -39,7 +39,7 @@ import spark.broadcast._
 import spark.deploy.LocalSparkCluster
 import spark.partial.ApproximateEvaluator
 import spark.partial.PartialResult
-import rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD}
+import rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
 import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler}
 import spark.scheduler.local.LocalScheduler
 import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
@@ -216,7 +216,7 @@ class SparkContext(
 
   /** Distribute a local Scala collection to form an RDD. */
   def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
-    new ParallelCollection[T](this, seq, numSlices, Map[Int, Seq[String]]())
+    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
   }
 
   /** Distribute a local Scala collection to form an RDD. */
@@ -229,7 +229,7 @@ class SparkContext(
     * Create a new partition for each collection item. */
    def makeRDD[T: ClassManifest](seq: Seq[(T, Seq[String])]): RDD[T] = {
     val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
-    new ParallelCollection[T](this, seq.map(_._1), seq.size, indexToPrefs)
+    new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
   }
 
   /**
diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
similarity index 90%
rename from core/src/main/scala/spark/ParallelCollection.scala
rename to core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
index 10adcd53ec..e703794787 100644
--- a/core/src/main/scala/spark/ParallelCollection.scala
+++ b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
@@ -1,8 +1,9 @@
-package spark
+package spark.rdd
 
 import scala.collection.immutable.NumericRange
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.Map
+import spark.{RDD, TaskContext, SparkContext, Split}
 
 private[spark] class ParallelCollectionSplit[T: ClassManifest](
     val rddId: Long,
@@ -22,7 +23,7 @@ private[spark] class ParallelCollectionSplit[T: ClassManifest](
   override val index: Int = slice
 }
 
-private[spark] class ParallelCollection[T: ClassManifest](
+private[spark] class ParallelCollectionRDD[T: ClassManifest](
     @transient sc: SparkContext,
     @transient data: Seq[T],
     numSlices: Int,
@@ -33,26 +34,20 @@ private[spark] class ParallelCollection[T: ClassManifest](
   // instead.
   // UPDATE: A parallel collection can be checkpointed to HDFS, which achieves this goal.
 
-  @transient var splits_ : Array[Split] = {
-    val slices = ParallelCollection.slice(data, numSlices).toArray
+  override def getSplits: Array[Split] = {
+    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
     slices.indices.map(i => new ParallelCollectionSplit(id, i, slices(i))).toArray
   }
 
-  override def getSplits = splits_
-
   override def compute(s: Split, context: TaskContext) =
     s.asInstanceOf[ParallelCollectionSplit[T]].iterator
 
   override def getPreferredLocations(s: Split): Seq[String] = {
     locationPrefs.getOrElse(s.index, Nil)
   }
-
-  override def clearDependencies() {
-    splits_ = null
-  }
 }
 
-private object ParallelCollection {
+private object ParallelCollectionRDD {
   /**
    * Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
    * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
diff --git a/core/src/test/scala/spark/ParallelCollectionSplitSuite.scala b/core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala
similarity index 83%
rename from core/src/test/scala/spark/ParallelCollectionSplitSuite.scala
rename to core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala
index 450c69bd58..d27a2538e4 100644
--- a/core/src/test/scala/spark/ParallelCollectionSplitSuite.scala
+++ b/core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala
@@ -1,4 +1,4 @@
-package spark
+package spark.rdd
 
 import scala.collection.immutable.NumericRange
 
@@ -11,7 +11,7 @@ import org.scalacheck.Prop._
 class ParallelCollectionSplitSuite extends FunSuite with Checkers {
   test("one element per slice") {
     val data = Array(1, 2, 3)
-    val slices = ParallelCollection.slice(data, 3)
+    val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
     assert(slices(0).mkString(",") === "1")
     assert(slices(1).mkString(",") === "2")
@@ -20,14 +20,14 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
   
   test("one slice") {
     val data = Array(1, 2, 3)
-    val slices = ParallelCollection.slice(data, 1)
+    val slices = ParallelCollectionRDD.slice(data, 1)
     assert(slices.size === 1)
     assert(slices(0).mkString(",") === "1,2,3")
   }
   
   test("equal slices") {
     val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
-    val slices = ParallelCollection.slice(data, 3)
+    val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
     assert(slices(0).mkString(",") === "1,2,3")
     assert(slices(1).mkString(",") === "4,5,6")
@@ -36,7 +36,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
   
   test("non-equal slices") {
     val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
-    val slices = ParallelCollection.slice(data, 3)
+    val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
     assert(slices(0).mkString(",") === "1,2,3")
     assert(slices(1).mkString(",") === "4,5,6")
@@ -45,7 +45,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
 
   test("splitting exclusive range") {
     val data = 0 until 100
-    val slices = ParallelCollection.slice(data, 3)
+    val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
     assert(slices(0).mkString(",") === (0 to 32).mkString(","))
     assert(slices(1).mkString(",") === (33 to 65).mkString(","))
@@ -54,7 +54,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
 
   test("splitting inclusive range") {
     val data = 0 to 100
-    val slices = ParallelCollection.slice(data, 3)
+    val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
     assert(slices(0).mkString(",") === (0 to 32).mkString(","))
     assert(slices(1).mkString(",") === (33 to 66).mkString(","))
@@ -63,24 +63,24 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
   
   test("empty data") {
     val data = new Array[Int](0)
-    val slices = ParallelCollection.slice(data, 5)
+    val slices = ParallelCollectionRDD.slice(data, 5)
     assert(slices.size === 5)
     for (slice <- slices) assert(slice.size === 0)
   }
  
   test("zero slices") {
     val data = Array(1, 2, 3)
-    intercept[IllegalArgumentException] { ParallelCollection.slice(data, 0) }
+    intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, 0) }
   }
 
   test("negative number of slices") {
     val data = Array(1, 2, 3)
-    intercept[IllegalArgumentException] { ParallelCollection.slice(data, -5) }
+    intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, -5) }
   }
   
   test("exclusive ranges sliced into ranges") {
     val data = 1 until 100
-    val slices = ParallelCollection.slice(data, 3)
+    val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
     assert(slices.map(_.size).reduceLeft(_+_) === 99)
     assert(slices.forall(_.isInstanceOf[Range]))
@@ -88,7 +88,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
   
   test("inclusive ranges sliced into ranges") {
     val data = 1 to 100
-    val slices = ParallelCollection.slice(data, 3)
+    val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
     assert(slices.map(_.size).reduceLeft(_+_) === 100)
     assert(slices.forall(_.isInstanceOf[Range]))
@@ -97,7 +97,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
   test("large ranges don't overflow") {
     val N = 100 * 1000 * 1000
     val data = 0 until N
-    val slices = ParallelCollection.slice(data, 40)
+    val slices = ParallelCollectionRDD.slice(data, 40)
     assert(slices.size === 40)
     for (i <- 0 until 40) {
       assert(slices(i).isInstanceOf[Range])
@@ -117,7 +117,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
       (tuple: (List[Int], Int)) =>
         val d = tuple._1
         val n = tuple._2
-        val slices = ParallelCollection.slice(d, n)
+        val slices = ParallelCollectionRDD.slice(d, n)
         ("n slices"    |: slices.size == n) &&
         ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
         ("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1))
@@ -134,7 +134,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
     } yield (a until b by step, n)
     val prop = forAll(gen) {
       case (d: Range, n: Int) =>
-        val slices = ParallelCollection.slice(d, n)
+        val slices = ParallelCollectionRDD.slice(d, n)
         ("n slices"    |: slices.size == n) &&
         ("all ranges"  |: slices.forall(_.isInstanceOf[Range])) &&
         ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
@@ -152,7 +152,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
     } yield (a to b by step, n)
     val prop = forAll(gen) {
       case (d: Range, n: Int) =>
-        val slices = ParallelCollection.slice(d, n)
+        val slices = ParallelCollectionRDD.slice(d, n)
         ("n slices"    |: slices.size == n) &&
         ("all ranges"  |: slices.forall(_.isInstanceOf[Range])) &&
         ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
@@ -163,7 +163,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
   
   test("exclusive ranges of longs") {
     val data = 1L until 100L
-    val slices = ParallelCollection.slice(data, 3)
+    val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
     assert(slices.map(_.size).reduceLeft(_+_) === 99)
     assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
@@ -171,7 +171,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
   
   test("inclusive ranges of longs") {
     val data = 1L to 100L
-    val slices = ParallelCollection.slice(data, 3)
+    val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
     assert(slices.map(_.size).reduceLeft(_+_) === 100)
     assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
@@ -179,7 +179,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
   
   test("exclusive ranges of doubles") {
     val data = 1.0 until 100.0 by 1.0
-    val slices = ParallelCollection.slice(data, 3)
+    val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
     assert(slices.map(_.size).reduceLeft(_+_) === 99)
     assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
@@ -187,7 +187,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
   
   test("inclusive ranges of doubles") {
     val data = 1.0 to 100.0 by 1.0
-    val slices = ParallelCollection.slice(data, 3)
+    val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
     assert(slices.map(_.size).reduceLeft(_+_) === 100)
     assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
-- 
GitLab