Skip to content
Snippets Groups Projects
Commit 72ec298c authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Added reduceByKey, groupByKey and join operations based on combine, as

well as versions of the shuffle operations that set the number of splits
automatically.
parent d947cb97
No related branches found
No related tags found
No related merge requests found
......@@ -12,50 +12,16 @@ import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
import mesos.SlaveOffer
/**
* An RDD that captures the splits of a parent RDD and gives them unique indexes.
* This is useful for a variety of shuffle implementations.
*/
class NumberedSplitRDD[T: ClassManifest](prev: RDD[T])
extends RDD[(Int, Iterator[T])](prev.sparkContext) {
@transient val splits_ = {
prev.splits.zipWithIndex.map {
case (s, i) => new NumberedSplitRDDSplit(s, i): Split
}.toArray
}
override def splits = splits_
override def preferredLocations(split: Split) = {
val nsplit = split.asInstanceOf[NumberedSplitRDDSplit]
prev.preferredLocations(nsplit.prev)
}
override def iterator(split: Split) = {
val nsplit = split.asInstanceOf[NumberedSplitRDDSplit]
Iterator((nsplit.index, prev.iterator(nsplit.prev)))
}
override def taskStarted(split: Split, slot: SlaveOffer) = {
val nsplit = split.asInstanceOf[NumberedSplitRDDSplit]
prev.taskStarted(nsplit.prev, slot)
}
}
class NumberedSplitRDDSplit(val prev: Split, val index: Int) extends Split {
override def getId() = "NumberedSplitRDDSplit(%d)".format(index)
}
/**
* A simple implementation of shuffle using a distributed file system.
*
* TODO: Add support for compression when spark.compress is set to true.
*/
@serializable
class DfsShuffle[K, V, C](
rdd: RDD[(K, V)],
numOutputSplits: Int,
createCombiner: () => C,
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)
extends Logging
......@@ -72,11 +38,12 @@ extends Logging
numberedSplitRdd.foreach((pair: (Int, Iterator[(K, V)])) => {
val myIndex = pair._1
val myIterator = pair._2
val combiners = new HashMap[K, C] {
override def default(key: K) = createCombiner()
}
val combiners = new HashMap[K, C]
for ((k, v) <- myIterator) {
combiners(k) = mergeValue(combiners(k), v)
combiners(k) = combiners.get(k) match {
case Some(c) => mergeValue(c, v)
case None => createCombiner(v)
}
}
val fs = DfsShuffle.getFileSystem()
val outputStreams = (0 until numOutputSplits).map(i => {
......@@ -95,17 +62,18 @@ extends Logging
// Return an RDD that does each of the merges for a given partition
return sc.parallelize(0 until numOutputSplits).flatMap((myIndex: Int) => {
val combiners = new HashMap[K, C] {
override def default(key: K) = createCombiner()
}
val combiners = new HashMap[K, C]
val fs = DfsShuffle.getFileSystem()
for (i <- Utils.shuffle(0 until numInputSplits)) {
val path = new Path(dir, "%d-to-%d".format(i, myIndex))
val inputStream = new ObjectInputStream(fs.open(path))
try {
while (true) {
val pair = inputStream.readObject().asInstanceOf[(K, C)]
combiners(pair._1) = mergeCombiners(combiners(pair._1), pair._2)
val (k, c) = inputStream.readObject().asInstanceOf[(K, C)]
combiners(k) = combiners.get(k) match {
case Some(oldC) => mergeCombiners(oldC, c)
case None => c
}
}
} catch {
case e: EOFException => {}
......@@ -147,3 +115,39 @@ object DfsShuffle {
return path
}
}
/**
* An RDD that captures the splits of a parent RDD and gives them unique indexes.
* This is useful for a variety of shuffle implementations.
*/
class NumberedSplitRDD[T: ClassManifest](prev: RDD[T])
extends RDD[(Int, Iterator[T])](prev.sparkContext) {
@transient val splits_ = {
prev.splits.zipWithIndex.map {
case (s, i) => new NumberedSplitRDDSplit(s, i): Split
}.toArray
}
override def splits = splits_
override def preferredLocations(split: Split) = {
val nsplit = split.asInstanceOf[NumberedSplitRDDSplit]
prev.preferredLocations(nsplit.prev)
}
override def iterator(split: Split) = {
val nsplit = split.asInstanceOf[NumberedSplitRDDSplit]
Iterator((nsplit.index, prev.iterator(nsplit.prev)))
}
override def taskStarted(split: Split, slot: SlaveOffer) = {
val nsplit = split.asInstanceOf[NumberedSplitRDDSplit]
prev.taskStarted(nsplit.prev, slot)
}
}
class NumberedSplitRDDSplit(val prev: Split, val index: Int) extends Split {
override def getId() = "NumberedSplitRDDSplit(%d)".format(index)
}
......@@ -38,7 +38,7 @@ extends RDD[(K, V)](sc) {
val conf = new JobConf()
FileInputFormat.setInputPaths(conf, path)
val inputFormat = createInputFormat(conf)
val inputSplits = inputFormat.getSplits(conf, sc.scheduler.numCores)
val inputSplits = inputFormat.getSplits(conf, sc.numCores)
inputSplits.map(x => new HadoopSplit(x): Split).toArray
}
......
......@@ -330,8 +330,8 @@ extends RDD[Pair[T, U]](sc) {
}
}
@serializable class PairRDDExtras[K, V](rdd: RDD[(K, V)]) {
def reduceByKey(func: (V, V) => V): Map[K, V] = {
@serializable class PairRDDExtras[K, V](self: RDD[(K, V)]) {
def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = {
def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = {
for ((k, v) <- m2) {
m1.get(k) match {
......@@ -341,14 +341,66 @@ extends RDD[Pair[T, U]](sc) {
}
return m1
}
rdd.map(pair => HashMap(pair)).reduce(mergeMaps)
self.map(pair => HashMap(pair)).reduce(mergeMaps)
}
def combineByKey[C](numSplits: Int,
createCombiner: () => C,
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numSplits: Int)
: RDD[(K, C)] = {
new DfsShuffle(self, numSplits, createCombiner, mergeValue, mergeCombiners).compute()
}
def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, numSplits)
}
def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = {
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, mergeCombiners _, numSplits)
bufs.asInstanceOf[RDD[(K, Seq[V])]]
}
def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = {
val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
new PairRDDExtras(vs ++ ws).groupByKey(numSplits).flatMap {
case (k, seq) => {
val vbuf = new ArrayBuffer[V]
val wbuf = new ArrayBuffer[W]
seq.foreach(_ match {
case Left(v) => vbuf += v
case Right(w) => wbuf += w
})
for (v <- vbuf; w <- wbuf) yield (k, (v, w))
}
}
}
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)
: RDD[(K, C)] = {
new DfsShuffle(rdd, numSplits, createCombiner, mergeValue, mergeCombiners).compute()
: RDD[(K, C)] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, numCores)
}
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
reduceByKey(func, numCores)
}
def groupByKey(): RDD[(K, Seq[V])] = {
groupByKey(numCores)
}
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
join(other, numCores)
}
def numCores = self.sparkContext.numCores
def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
}
......@@ -14,7 +14,7 @@ class SparkContext(
val sparkHome: String = null,
val jars: Seq[String] = Nil)
extends Logging {
private[spark] var scheduler: Scheduler = {
private var scheduler: Scheduler = {
// Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
master match {
......@@ -41,7 +41,7 @@ extends Logging {
new ParallelArray[T](this, seq, numSlices)
def parallelize[T: ClassManifest](seq: Seq[T]): RDD[T] =
parallelize(seq, scheduler.numCores)
parallelize(seq, numCores)
def textFile(path: String): RDD[String] =
new HadoopTextFile(this, path)
......@@ -147,6 +147,9 @@ extends Logging {
ClosureCleaner.clean(f)
return f
}
// Get the number of cores available to run tasks (as reported by Scheduler)
def numCores = scheduler.numCores
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment