Skip to content
Snippets Groups Projects
Commit 340cc54e authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #471 from stephenh/parallelrdd

Move ParallelCollection into spark.rdd package.
parents 3260b612 e7713adb
No related branches found
No related tags found
No related merge requests found
...@@ -39,7 +39,7 @@ import spark.broadcast._ ...@@ -39,7 +39,7 @@ import spark.broadcast._
import spark.deploy.LocalSparkCluster import spark.deploy.LocalSparkCluster
import spark.partial.ApproximateEvaluator import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult import spark.partial.PartialResult
import rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD} import rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler} import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler}
import spark.scheduler.local.LocalScheduler import spark.scheduler.local.LocalScheduler
import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler} import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
...@@ -216,7 +216,7 @@ class SparkContext( ...@@ -216,7 +216,7 @@ class SparkContext(
/** Distribute a local Scala collection to form an RDD. */ /** Distribute a local Scala collection to form an RDD. */
def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
new ParallelCollection[T](this, seq, numSlices, Map[Int, Seq[String]]()) new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
} }
/** Distribute a local Scala collection to form an RDD. */ /** Distribute a local Scala collection to form an RDD. */
...@@ -229,7 +229,7 @@ class SparkContext( ...@@ -229,7 +229,7 @@ class SparkContext(
* Create a new partition for each collection item. */ * Create a new partition for each collection item. */
def makeRDD[T: ClassManifest](seq: Seq[(T, Seq[String])]): RDD[T] = { def makeRDD[T: ClassManifest](seq: Seq[(T, Seq[String])]): RDD[T] = {
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollection[T](this, seq.map(_._1), seq.size, indexToPrefs) new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
} }
/** /**
......
package spark package spark.rdd
import scala.collection.immutable.NumericRange import scala.collection.immutable.NumericRange
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.Map import scala.collection.Map
import spark.{RDD, TaskContext, SparkContext, Split}
private[spark] class ParallelCollectionSplit[T: ClassManifest]( private[spark] class ParallelCollectionSplit[T: ClassManifest](
val rddId: Long, val rddId: Long,
...@@ -22,7 +23,7 @@ private[spark] class ParallelCollectionSplit[T: ClassManifest]( ...@@ -22,7 +23,7 @@ private[spark] class ParallelCollectionSplit[T: ClassManifest](
override val index: Int = slice override val index: Int = slice
} }
private[spark] class ParallelCollection[T: ClassManifest]( private[spark] class ParallelCollectionRDD[T: ClassManifest](
@transient sc: SparkContext, @transient sc: SparkContext,
@transient data: Seq[T], @transient data: Seq[T],
numSlices: Int, numSlices: Int,
...@@ -33,26 +34,20 @@ private[spark] class ParallelCollection[T: ClassManifest]( ...@@ -33,26 +34,20 @@ private[spark] class ParallelCollection[T: ClassManifest](
// instead. // instead.
// UPDATE: A parallel collection can be checkpointed to HDFS, which achieves this goal. // UPDATE: A parallel collection can be checkpointed to HDFS, which achieves this goal.
@transient var splits_ : Array[Split] = { override def getSplits: Array[Split] = {
val slices = ParallelCollection.slice(data, numSlices).toArray val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionSplit(id, i, slices(i))).toArray slices.indices.map(i => new ParallelCollectionSplit(id, i, slices(i))).toArray
} }
override def getSplits = splits_
override def compute(s: Split, context: TaskContext) = override def compute(s: Split, context: TaskContext) =
s.asInstanceOf[ParallelCollectionSplit[T]].iterator s.asInstanceOf[ParallelCollectionSplit[T]].iterator
override def getPreferredLocations(s: Split): Seq[String] = { override def getPreferredLocations(s: Split): Seq[String] = {
locationPrefs.getOrElse(s.index, Nil) locationPrefs.getOrElse(s.index, Nil)
} }
override def clearDependencies() {
splits_ = null
}
} }
private object ParallelCollection { private object ParallelCollectionRDD {
/** /**
* Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range * Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
* collections specially, encoding the slices as other Ranges to minimize memory cost. This makes * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
......
package spark package spark.rdd
import scala.collection.immutable.NumericRange import scala.collection.immutable.NumericRange
...@@ -11,7 +11,7 @@ import org.scalacheck.Prop._ ...@@ -11,7 +11,7 @@ import org.scalacheck.Prop._
class ParallelCollectionSplitSuite extends FunSuite with Checkers { class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("one element per slice") { test("one element per slice") {
val data = Array(1, 2, 3) val data = Array(1, 2, 3)
val slices = ParallelCollection.slice(data, 3) val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3) assert(slices.size === 3)
assert(slices(0).mkString(",") === "1") assert(slices(0).mkString(",") === "1")
assert(slices(1).mkString(",") === "2") assert(slices(1).mkString(",") === "2")
...@@ -20,14 +20,14 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { ...@@ -20,14 +20,14 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("one slice") { test("one slice") {
val data = Array(1, 2, 3) val data = Array(1, 2, 3)
val slices = ParallelCollection.slice(data, 1) val slices = ParallelCollectionRDD.slice(data, 1)
assert(slices.size === 1) assert(slices.size === 1)
assert(slices(0).mkString(",") === "1,2,3") assert(slices(0).mkString(",") === "1,2,3")
} }
test("equal slices") { test("equal slices") {
val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
val slices = ParallelCollection.slice(data, 3) val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3) assert(slices.size === 3)
assert(slices(0).mkString(",") === "1,2,3") assert(slices(0).mkString(",") === "1,2,3")
assert(slices(1).mkString(",") === "4,5,6") assert(slices(1).mkString(",") === "4,5,6")
...@@ -36,7 +36,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { ...@@ -36,7 +36,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("non-equal slices") { test("non-equal slices") {
val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val slices = ParallelCollection.slice(data, 3) val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3) assert(slices.size === 3)
assert(slices(0).mkString(",") === "1,2,3") assert(slices(0).mkString(",") === "1,2,3")
assert(slices(1).mkString(",") === "4,5,6") assert(slices(1).mkString(",") === "4,5,6")
...@@ -45,7 +45,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { ...@@ -45,7 +45,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("splitting exclusive range") { test("splitting exclusive range") {
val data = 0 until 100 val data = 0 until 100
val slices = ParallelCollection.slice(data, 3) val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3) assert(slices.size === 3)
assert(slices(0).mkString(",") === (0 to 32).mkString(",")) assert(slices(0).mkString(",") === (0 to 32).mkString(","))
assert(slices(1).mkString(",") === (33 to 65).mkString(",")) assert(slices(1).mkString(",") === (33 to 65).mkString(","))
...@@ -54,7 +54,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { ...@@ -54,7 +54,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("splitting inclusive range") { test("splitting inclusive range") {
val data = 0 to 100 val data = 0 to 100
val slices = ParallelCollection.slice(data, 3) val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3) assert(slices.size === 3)
assert(slices(0).mkString(",") === (0 to 32).mkString(",")) assert(slices(0).mkString(",") === (0 to 32).mkString(","))
assert(slices(1).mkString(",") === (33 to 66).mkString(",")) assert(slices(1).mkString(",") === (33 to 66).mkString(","))
...@@ -63,24 +63,24 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { ...@@ -63,24 +63,24 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("empty data") { test("empty data") {
val data = new Array[Int](0) val data = new Array[Int](0)
val slices = ParallelCollection.slice(data, 5) val slices = ParallelCollectionRDD.slice(data, 5)
assert(slices.size === 5) assert(slices.size === 5)
for (slice <- slices) assert(slice.size === 0) for (slice <- slices) assert(slice.size === 0)
} }
test("zero slices") { test("zero slices") {
val data = Array(1, 2, 3) val data = Array(1, 2, 3)
intercept[IllegalArgumentException] { ParallelCollection.slice(data, 0) } intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, 0) }
} }
test("negative number of slices") { test("negative number of slices") {
val data = Array(1, 2, 3) val data = Array(1, 2, 3)
intercept[IllegalArgumentException] { ParallelCollection.slice(data, -5) } intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, -5) }
} }
test("exclusive ranges sliced into ranges") { test("exclusive ranges sliced into ranges") {
val data = 1 until 100 val data = 1 until 100
val slices = ParallelCollection.slice(data, 3) val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3) assert(slices.size === 3)
assert(slices.map(_.size).reduceLeft(_+_) === 99) assert(slices.map(_.size).reduceLeft(_+_) === 99)
assert(slices.forall(_.isInstanceOf[Range])) assert(slices.forall(_.isInstanceOf[Range]))
...@@ -88,7 +88,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { ...@@ -88,7 +88,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("inclusive ranges sliced into ranges") { test("inclusive ranges sliced into ranges") {
val data = 1 to 100 val data = 1 to 100
val slices = ParallelCollection.slice(data, 3) val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3) assert(slices.size === 3)
assert(slices.map(_.size).reduceLeft(_+_) === 100) assert(slices.map(_.size).reduceLeft(_+_) === 100)
assert(slices.forall(_.isInstanceOf[Range])) assert(slices.forall(_.isInstanceOf[Range]))
...@@ -97,7 +97,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { ...@@ -97,7 +97,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("large ranges don't overflow") { test("large ranges don't overflow") {
val N = 100 * 1000 * 1000 val N = 100 * 1000 * 1000
val data = 0 until N val data = 0 until N
val slices = ParallelCollection.slice(data, 40) val slices = ParallelCollectionRDD.slice(data, 40)
assert(slices.size === 40) assert(slices.size === 40)
for (i <- 0 until 40) { for (i <- 0 until 40) {
assert(slices(i).isInstanceOf[Range]) assert(slices(i).isInstanceOf[Range])
...@@ -117,7 +117,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { ...@@ -117,7 +117,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
(tuple: (List[Int], Int)) => (tuple: (List[Int], Int)) =>
val d = tuple._1 val d = tuple._1
val n = tuple._2 val n = tuple._2
val slices = ParallelCollection.slice(d, n) val slices = ParallelCollectionRDD.slice(d, n)
("n slices" |: slices.size == n) && ("n slices" |: slices.size == n) &&
("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) && ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1)) ("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1))
...@@ -134,7 +134,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { ...@@ -134,7 +134,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
} yield (a until b by step, n) } yield (a until b by step, n)
val prop = forAll(gen) { val prop = forAll(gen) {
case (d: Range, n: Int) => case (d: Range, n: Int) =>
val slices = ParallelCollection.slice(d, n) val slices = ParallelCollectionRDD.slice(d, n)
("n slices" |: slices.size == n) && ("n slices" |: slices.size == n) &&
("all ranges" |: slices.forall(_.isInstanceOf[Range])) && ("all ranges" |: slices.forall(_.isInstanceOf[Range])) &&
("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) && ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
...@@ -152,7 +152,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { ...@@ -152,7 +152,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
} yield (a to b by step, n) } yield (a to b by step, n)
val prop = forAll(gen) { val prop = forAll(gen) {
case (d: Range, n: Int) => case (d: Range, n: Int) =>
val slices = ParallelCollection.slice(d, n) val slices = ParallelCollectionRDD.slice(d, n)
("n slices" |: slices.size == n) && ("n slices" |: slices.size == n) &&
("all ranges" |: slices.forall(_.isInstanceOf[Range])) && ("all ranges" |: slices.forall(_.isInstanceOf[Range])) &&
("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) && ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
...@@ -163,7 +163,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { ...@@ -163,7 +163,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("exclusive ranges of longs") { test("exclusive ranges of longs") {
val data = 1L until 100L val data = 1L until 100L
val slices = ParallelCollection.slice(data, 3) val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3) assert(slices.size === 3)
assert(slices.map(_.size).reduceLeft(_+_) === 99) assert(slices.map(_.size).reduceLeft(_+_) === 99)
assert(slices.forall(_.isInstanceOf[NumericRange[_]])) assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
...@@ -171,7 +171,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { ...@@ -171,7 +171,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("inclusive ranges of longs") { test("inclusive ranges of longs") {
val data = 1L to 100L val data = 1L to 100L
val slices = ParallelCollection.slice(data, 3) val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3) assert(slices.size === 3)
assert(slices.map(_.size).reduceLeft(_+_) === 100) assert(slices.map(_.size).reduceLeft(_+_) === 100)
assert(slices.forall(_.isInstanceOf[NumericRange[_]])) assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
...@@ -179,7 +179,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { ...@@ -179,7 +179,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("exclusive ranges of doubles") { test("exclusive ranges of doubles") {
val data = 1.0 until 100.0 by 1.0 val data = 1.0 until 100.0 by 1.0
val slices = ParallelCollection.slice(data, 3) val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3) assert(slices.size === 3)
assert(slices.map(_.size).reduceLeft(_+_) === 99) assert(slices.map(_.size).reduceLeft(_+_) === 99)
assert(slices.forall(_.isInstanceOf[NumericRange[_]])) assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
...@@ -187,7 +187,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { ...@@ -187,7 +187,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("inclusive ranges of doubles") { test("inclusive ranges of doubles") {
val data = 1.0 to 100.0 by 1.0 val data = 1.0 to 100.0 by 1.0
val slices = ParallelCollection.slice(data, 3) val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3) assert(slices.size === 3)
assert(slices.map(_.size).reduceLeft(_+_) === 100) assert(slices.map(_.size).reduceLeft(_+_) === 100)
assert(slices.forall(_.isInstanceOf[NumericRange[_]])) assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment