diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala index 2f57c9c0fdc2b0b9add6bc49e77ce9748997babb..996ca2a8771e5c5d30b88ff0b97f0a2e3c85b955 100644 --- a/bagel/src/main/scala/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/spark/bagel/Bagel.scala @@ -30,8 +30,7 @@ object Bagel extends Logging { val aggregated = agg(verts, aggregator) val combinedMsgs = msgs.combineByKey( - combiner.createCombiner, combiner.mergeMsg, combiner.mergeCombiners, - splits, partitioner) + combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner) val grouped = combinedMsgs.groupWith(verts) val (processed, numMsgs, numActiveVerts) = comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep)) diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala index 7084ff97d90d28e20785483ca8d450db73e7eced..8ce7abd03f6bd2fd5fa684cb0fea9b3cb4fd4b61 100644 --- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala +++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala @@ -105,7 +105,6 @@ object WikipediaPageRankStandalone { ranks = (contribs.combineByKey((x: Double) => x, (x: Double, y: Double) => x + y, (x: Double, y: Double) => x + y, - numSplits, partitioner) .mapValues(sum => a/n + (1-a)*sum)) } diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 8b63d1aba1eeff4fd9a0c1fc99f37a87d0a9a7ec..e880f9872f23c0ac11161c120761853ba6f3160f 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -60,7 +60,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, - numSplits: Int, partitioner: Partitioner): RDD[(K, C)] = { val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) new ShuffledRDD(self, aggregator, partitioner) @@ -70,21 +69,15 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numSplits: Int): RDD[(K, C)] = { - combineByKey(createCombiner, mergeValue, mergeCombiners, numSplits, - new HashPartitioner(numSplits)) + combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits)) } - def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = { - combineByKey[V]((v: V) => v, func, func, numSplits) + def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { + combineByKey[V]((v: V) => v, func, func, partitioner) } - def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = { - def createCombiner(v: V) = ArrayBuffer(v) - def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v - def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 - val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, mergeCombiners _, numSplits) - bufs.asInstanceOf[RDD[(K, Seq[V])]] + def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = { + reduceByKey(new HashPartitioner(numSplits), func) } def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { @@ -92,100 +85,90 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, mergeCombiners _, partitioner.numPartitions, partitioner) + createCombiner _, mergeValue _, mergeCombiners _, partitioner) bufs.asInstanceOf[RDD[(K, Seq[V])]] } + def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = { + groupByKey(new HashPartitioner(numSplits)) + } + def partitionBy(partitioner: Partitioner): RDD[(K, V)] = { def createCombiner(v: V) = ArrayBuffer(v) def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, mergeCombiners _, partitioner.numPartitions, partitioner) + createCombiner _, mergeValue _, mergeCombiners _, partitioner) bufs.flatMapValues(buf => buf) } - def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = { - val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } - val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } - (vs ++ ws).groupByKey(numSplits).flatMap { - case (k, seq) => { - val vbuf = new ArrayBuffer[V] - val wbuf = new ArrayBuffer[W] - seq.foreach(_ match { - case Left(v) => vbuf += v - case Right(w) => wbuf += w - }) - for (v <- vbuf; w <- wbuf) yield (k, (v, w)) - } + def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { + this.cogroup(other, partitioner).flatMapValues { + case (vs, ws) => + for (v <- vs.iterator; w <- ws.iterator) yield (v, w) } } - def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = { - val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } - val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } - (vs ++ ws).groupByKey(numSplits).flatMap { - case (k, seq) => { - val vbuf = new ArrayBuffer[V] - val wbuf = new ArrayBuffer[Option[W]] - seq.foreach(_ match { - case Left(v) => vbuf += v - case Right(w) => wbuf += Some(w) - }) - if (wbuf.isEmpty) { - wbuf += None + def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { + this.cogroup(other, partitioner).flatMapValues { + case (vs, ws) => + if (ws.isEmpty) { + vs.iterator.map(v => (v, None)) + } else { + for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w)) } - for (v <- vbuf; w <- wbuf) yield (k, (v, w)) - } } } - def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = { - val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } - val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } - (vs ++ ws).groupByKey(numSplits).flatMap { - case (k, seq) => { - val vbuf = new ArrayBuffer[Option[V]] - val wbuf = new ArrayBuffer[W] - seq.foreach(_ match { - case Left(v) => vbuf += Some(v) - case Right(w) => wbuf += w - }) - if (vbuf.isEmpty) { - vbuf += None + def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) + : RDD[(K, (Option[V], W))] = { + this.cogroup(other, partitioner).flatMapValues { + case (vs, ws) => + if (vs.isEmpty) { + ws.iterator.map(w => (None, w)) + } else { + for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w) } - for (v <- vbuf; w <- wbuf) yield (k, (v, w)) - } } } def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] = { - combineByKey(createCombiner, mergeValue, mergeCombiners, defaultParallelism) + combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) } def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { - reduceByKey(func, defaultParallelism) + reduceByKey(defaultPartitioner(self), func) } def groupByKey(): RDD[(K, Seq[V])] = { - groupByKey(defaultParallelism) + groupByKey(defaultPartitioner(self)) } def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { - join(other, defaultParallelism) + join(other, defaultPartitioner(self, other)) + } + + def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = { + join(other, new HashPartitioner(numSplits)) } def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { - leftOuterJoin(other, defaultParallelism) + leftOuterJoin(other, defaultPartitioner(self, other)) + } + + def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = { + leftOuterJoin(other, new HashPartitioner(numSplits)) } def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { - rightOuterJoin(other, defaultParallelism) + rightOuterJoin(other, defaultPartitioner(self, other)) } - def defaultParallelism = self.context.defaultParallelism + def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = { + rightOuterJoin(other, new HashPartitioner(numSplits)) + } def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*) @@ -194,42 +177,72 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( new MappedValuesRDD(self, cleanF) } - def flatMapValues[U](f: V => Traversable[U]): RDD[(K, U)] = { + def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { val cleanF = self.context.clean(f) new FlatMappedValuesRDD(self, cleanF) } - def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { - val part = self.partitioner match { - case Some(p) => p - case None => new HashPartitioner(defaultParallelism) - } + def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { val cg = new CoGroupedRDD[K]( Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), - part) - val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)( - classManifest[K], - Manifests.seqSeqManifest) + partitioner) + val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) prfs.mapValues { case Seq(vs, ws) => (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) } } - def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { - val part = self.partitioner match { - case Some(p) => p - case None => new HashPartitioner(defaultParallelism) - } - new CoGroupedRDD[K]( + val cg = new CoGroupedRDD[K]( Seq(self.asInstanceOf[RDD[(_, _)]], other1.asInstanceOf[RDD[(_, _)]], other2.asInstanceOf[RDD[(_, _)]]), - part).map { - case (k, Seq(vs, w1s, w2s)) => - (k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])) + partitioner) + val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) + prfs.mapValues { + case Seq(vs, w1s, w2s) => + (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]) + } + } + + def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner(self, other)) + } + + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + cogroup(other1, other2, defaultPartitioner(self, other1, other2)) + } + + def cogroup[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Seq[V], Seq[W]))] = { + cogroup(other, new HashPartitioner(numSplits)) + } + + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numSplits: Int) + : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + cogroup(other1, other2, new HashPartitioner(numSplits)) + } + + def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner(self, other)) + } + + def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + cogroup(other1, other2, defaultPartitioner(self, other1, other2)) + } + + /** + * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of + * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner. + */ + def defaultPartitioner(rdds: RDD[_]*): Partitioner = { + for (r <- rdds if r.partitioner != None) { + return r.partitioner.get } + return new HashPartitioner(self.context.defaultParallelism) } def lookup(key: K): Seq[V] = { @@ -376,6 +389,7 @@ class SortedRDD[K <% Ordered[K], V](prev: RDD[(K, V)], ascending: Boolean) override def splits = prev.splits override val partitioner = prev.partitioner override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = { prev.iterator(split).toArray .sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator @@ -389,16 +403,15 @@ class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)] override def compute(split: Split) = prev.iterator(split).map{case (k, v) => (k, f(v))} } -class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => Traversable[U]) +class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U]) extends RDD[(K, U)](prev.context) { override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) override val partitioner = prev.partitioner + override def compute(split: Split) = { - prev.iterator(split).toStream.flatMap { - case (k, v) => f(v).map(x => (k, x)) - }.iterator + prev.iterator(split).flatMap { case (k, v) => f(v).map(x => (k, x)) } } } diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala index ac61fe3b54526da22a0d812a485da167651a686e..024a4580acce5f4e10ad29c935c47fedaf12e0bb 100644 --- a/core/src/main/scala/spark/Partitioner.scala +++ b/core/src/main/scala/spark/Partitioner.scala @@ -26,8 +26,9 @@ class HashPartitioner(partitions: Int) extends Partitioner { } class RangePartitioner[K <% Ordered[K]: ClassManifest, V]( - partitions: Int, rdd: RDD[(K,V)], - ascending: Boolean = true) + partitions: Int, + @transient rdd: RDD[(K,V)], + private val ascending: Boolean = true) extends Partitioner { private val rangeBounds: Array[K] = { @@ -65,7 +66,7 @@ class RangePartitioner[K <% Ordered[K]: ClassManifest, V]( override def equals(other: Any): Boolean = other match { case r: RangePartitioner[_,_] => - r.rangeBounds.sameElements(rangeBounds) + r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending case _ => false } diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index fa53d9be2c045de5bd0ba15a0597fdeb75761b74..4c4b2ee30d604b963ebd17a6dccdf9a6dc70a915 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -83,7 +83,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) - def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] = + def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] = new FlatMappedRDD(this, sc.clean(f)) def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f)) @@ -275,7 +275,7 @@ class MappedRDD[U: ClassManifest, T: ClassManifest]( class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], - f: T => Traversable[U]) + f: T => TraversableOnce[U]) extends RDD[U](prev.context) { override def splits = prev.splits diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index cfd6dc8b2aa3550e0f47dfdfbcc85732a72cd050..68ccab24db3867af2747529ae0264a37e6905bc3 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -76,6 +76,12 @@ object Utils { } } catch { case e: IOException => ; } } + // Add a shutdown hook to delete the temp dir when the JVM exits + Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + dir) { + override def run() { + Utils.deleteRecursively(dir) + } + }) return dir } diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala index cdf05fe5de8ba40cee9a522cb055aae9798f1ff2..06049749a91702593aac42f36b46800e9de42f66 100644 --- a/core/src/main/scala/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/spark/broadcast/Broadcast.scala @@ -33,7 +33,7 @@ object Broadcast extends Logging with Serializable { def initialize (isMaster__ : Boolean): Unit = synchronized { if (!initialized) { val broadcastFactoryClass = System.getProperty( - "spark.broadcast.factory", "spark.broadcast.DfsBroadcastFactory") + "spark.broadcast.factory", "spark.broadcast.HttpBroadcastFactory") broadcastFactory = Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory] @@ -219,4 +219,4 @@ class SpeedTracker extends Serializable { } override def toString = sourceToSpeedMap.toString -} \ No newline at end of file +} diff --git a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala index 341746d18e1c7be8e23da30ed18d2b8406ad6b65..b18908f789fc6c701011c1a09e86a3b6b792b024 100644 --- a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala @@ -7,6 +7,6 @@ package spark.broadcast * entire Spark job. */ trait BroadcastFactory { - def initialize (isMaster: Boolean): Unit - def newBroadcast[T] (value_ : T, isLocal: Boolean): Broadcast[T] -} \ No newline at end of file + def initialize(isMaster: Boolean): Unit + def newBroadcast[T](value_ : T, isLocal: Boolean): Broadcast[T] +} diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala new file mode 100644 index 0000000000000000000000000000000000000000..471481659116e07bea7492bc51c7200198d85747 --- /dev/null +++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala @@ -0,0 +1,110 @@ +package spark.broadcast + +import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} + +import java.io._ +import java.net._ +import java.util.UUID + +import it.unimi.dsi.fastutil.io.FastBufferedInputStream +import it.unimi.dsi.fastutil.io.FastBufferedOutputStream + +import spark._ + +class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean) +extends Broadcast[T] with Logging with Serializable { + + def value = value_ + + HttpBroadcast.synchronized { + HttpBroadcast.values.put(uuid, 0, value_) + } + + if (!isLocal) { + HttpBroadcast.write(uuid, value_) + } + + // Called by JVM when deserializing an object + private def readObject(in: ObjectInputStream): Unit = { + in.defaultReadObject() + HttpBroadcast.synchronized { + val cachedVal = HttpBroadcast.values.get(uuid, 0) + if (cachedVal != null) { + value_ = cachedVal.asInstanceOf[T] + } else { + logInfo("Started reading broadcast variable " + uuid) + val start = System.nanoTime + value_ = HttpBroadcast.read(uuid).asInstanceOf[T] + HttpBroadcast.values.put(uuid, 0, value_) + val time = (System.nanoTime - start) / 1e9 + logInfo("Reading broadcast variable " + uuid + " took " + time + " s") + } + } + } +} + +class HttpBroadcastFactory extends BroadcastFactory { + def initialize(isMaster: Boolean): Unit = HttpBroadcast.initialize(isMaster) + def newBroadcast[T](value_ : T, isLocal: Boolean) = new HttpBroadcast[T](value_, isLocal) +} + +private object HttpBroadcast extends Logging { + val values = SparkEnv.get.cache.newKeySpace() + + private var initialized = false + + private var broadcastDir: File = null + private var compress: Boolean = false + private var bufferSize: Int = 65536 + private var serverUri: String = null + private var server: HttpServer = null + + def initialize(isMaster: Boolean): Unit = { + synchronized { + if (!initialized) { + bufferSize = System.getProperty("spark.buffer.size", "65536").toInt + compress = System.getProperty("spark.compress", "false").toBoolean + if (isMaster) { + createServer() + } + serverUri = System.getProperty("spark.httpBroadcast.uri") + initialized = true + } + } + } + + private def createServer() { + broadcastDir = Utils.createTempDir() + server = new HttpServer(broadcastDir) + server.start() + serverUri = server.uri + System.setProperty("spark.httpBroadcast.uri", serverUri) + logInfo("Broadcast server started at " + serverUri) + } + + def write(uuid: UUID, value: Any) { + val file = new File(broadcastDir, "broadcast-" + uuid) + val out: OutputStream = if (compress) { + new LZFOutputStream(new FileOutputStream(file)) // Does its own buffering + } else { + new FastBufferedOutputStream(new FileOutputStream(file), bufferSize) + } + val ser = SparkEnv.get.serializer.newInstance() + val serOut = ser.outputStream(out) + serOut.writeObject(value) + serOut.close() + } + + def read(uuid: UUID): Any = { + val url = serverUri + "/broadcast-" + uuid + var in = if (compress) { + new LZFInputStream(new URL(url).openStream()) // Does its own buffering + } else { + new FastBufferedInputStream(new URL(url).openStream(), bufferSize) + } + val ser = SparkEnv.get.serializer.newInstance() + val serIn = ser.inputStream(in) + val obj = serIn.readObject() + serIn.close() + } +} diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..750703de30332b6dfb51051462636be44ed4711d --- /dev/null +++ b/core/src/test/scala/spark/BroadcastSuite.scala @@ -0,0 +1,23 @@ +package spark + +import org.scalatest.FunSuite + +class BroadcastSuite extends FunSuite { + test("basic broadcast") { + val sc = new SparkContext("local", "test") + val list = List(1, 2, 3, 4) + val listBroadcast = sc.broadcast(list) + val results = sc.parallelize(1 to 2).map(x => (x, listBroadcast.value.sum)) + assert(results.collect.toSet === Set((1, 10), (2, 10))) + sc.stop() + } + + test("broadcast variables accessed in multiple threads") { + val sc = new SparkContext("local[10]", "test") + val list = List(1, 2, 3, 4) + val listBroadcast = sc.broadcast(list) + val results = sc.parallelize(1 to 10).map(x => (x, listBroadcast.value.sum)) + assert(results.collect.toSet === (1 to 10).map(x => (x, 10)).toSet) + sc.stop() + } +} diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..7f7f9493dc995d58b9dbb29e7177bc1697997b49 --- /dev/null +++ b/core/src/test/scala/spark/PartitioningSuite.scala @@ -0,0 +1,101 @@ +package spark + +import org.scalatest.FunSuite + +import scala.collection.mutable.ArrayBuffer + +import SparkContext._ + +class PartitioningSuite extends FunSuite { + test("HashPartitioner equality") { + val p2 = new HashPartitioner(2) + val p4 = new HashPartitioner(4) + val anotherP4 = new HashPartitioner(4) + assert(p2 === p2) + assert(p4 === p4) + assert(p2 != p4) + assert(p4 != p2) + assert(p4 === anotherP4) + assert(anotherP4 === p4) + } + + test("RangePartitioner equality") { + val sc = new SparkContext("local", "test") + + // Make an RDD where all the elements are the same so that the partition range bounds + // are deterministically all the same. + val rdd = sc.parallelize(Seq(1, 1, 1, 1)).map(x => (x, x)) + + val p2 = new RangePartitioner(2, rdd) + val p4 = new RangePartitioner(4, rdd) + val anotherP4 = new RangePartitioner(4, rdd) + val descendingP2 = new RangePartitioner(2, rdd, false) + val descendingP4 = new RangePartitioner(4, rdd, false) + + assert(p2 === p2) + assert(p4 === p4) + assert(p2 != p4) + assert(p4 != p2) + assert(p4 === anotherP4) + assert(anotherP4 === p4) + assert(descendingP2 === descendingP2) + assert(descendingP4 === descendingP4) + assert(descendingP2 != descendingP4) + assert(descendingP4 != descendingP2) + assert(p2 != descendingP2) + assert(p4 != descendingP4) + assert(descendingP2 != p2) + assert(descendingP4 != p4) + + sc.stop() + } + + test("HashPartitioner not equal to RangePartitioner") { + val sc = new SparkContext("local", "test") + val rdd = sc.parallelize(1 to 10).map(x => (x, x)) + val rangeP2 = new RangePartitioner(2, rdd) + val hashP2 = new HashPartitioner(2) + assert(rangeP2 === rangeP2) + assert(hashP2 === hashP2) + assert(hashP2 != rangeP2) + assert(rangeP2 != hashP2) + sc.stop() + } + + test("partitioner preservation") { + val sc = new SparkContext("local", "test") + + val rdd = sc.parallelize(1 to 10, 4).map(x => (x, x)) + + val grouped2 = rdd.groupByKey(2) + val grouped4 = rdd.groupByKey(4) + val reduced2 = rdd.reduceByKey(_ + _, 2) + val reduced4 = rdd.reduceByKey(_ + _, 4) + + assert(rdd.partitioner === None) + + assert(grouped2.partitioner === Some(new HashPartitioner(2))) + assert(grouped4.partitioner === Some(new HashPartitioner(4))) + assert(reduced2.partitioner === Some(new HashPartitioner(2))) + assert(reduced4.partitioner === Some(new HashPartitioner(4))) + + assert(grouped2.groupByKey().partitioner === grouped2.partitioner) + assert(grouped2.groupByKey(3).partitioner != grouped2.partitioner) + assert(grouped2.groupByKey(2).partitioner === grouped2.partitioner) + assert(grouped4.groupByKey().partitioner === grouped4.partitioner) + assert(grouped4.groupByKey(3).partitioner != grouped4.partitioner) + assert(grouped4.groupByKey(4).partitioner === grouped4.partitioner) + + assert(grouped2.join(grouped4).partitioner === grouped2.partitioner) + assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped2.partitioner) + assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped2.partitioner) + assert(grouped2.cogroup(grouped4).partitioner === grouped2.partitioner) + + assert(grouped2.join(reduced2).partitioner === grouped2.partitioner) + assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner) + assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner) + assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner) + + sc.stop() + } +}