Skip to content
Snippets Groups Projects
Commit 039cc622 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #251 from JoshRosen/docs/internals

Document Dependency classes and make minor interface improvements
parents 933e4f51 e10308f5
No related branches found
No related tags found
No related merge requests found
package spark package spark
abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) extends Serializable /**
* Base class for dependencies.
*/
abstract class Dependency[T](val rdd: RDD[T]) extends Serializable
abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd, false) { /**
* Base class for dependencies where each partition of the parent RDD is used by at most one
* partition of the child RDD. Narrow dependencies allow for pipelined execution.
*/
abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
/**
* Get the parent partitions for a child partition.
* @param outputPartition a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(outputPartition: Int): Seq[Int] def getParents(outputPartition: Int): Seq[Int]
} }
/**
* Represents a dependency on the output of a shuffle stage.
* @param shuffleId the shuffle id
* @param rdd the parent RDD
* @param aggregator optional aggregator; this allows for map-side combining
* @param partitioner partitioner used to partition the shuffle output
*/
class ShuffleDependency[K, V, C]( class ShuffleDependency[K, V, C](
val shuffleId: Int, val shuffleId: Int,
@transient rdd: RDD[(K, V)], @transient rdd: RDD[(K, V)],
val aggregator: Aggregator[K, V, C], val aggregator: Option[Aggregator[K, V, C]],
val partitioner: Partitioner) val partitioner: Partitioner)
extends Dependency(rdd, true) extends Dependency(rdd)
/**
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int) = List(partitionId) override def getParents(partitionId: Int) = List(partitionId)
} }
/**
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) { extends NarrowDependency[T](rdd) {
......
...@@ -49,7 +49,7 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner) ...@@ -49,7 +49,7 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner)
} else { } else {
logInfo("Adding shuffle dependency with " + rdd) logInfo("Adding shuffle dependency with " + rdd)
deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]]( deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]](
context.newShuffleId, rdd, aggr, part) context.newShuffleId, rdd, Some(aggr), part)
} }
} }
deps.toList deps.toList
......
...@@ -22,7 +22,7 @@ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split { ...@@ -22,7 +22,7 @@ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split {
*/ */
abstract class ShuffledRDD[K, V, C]( abstract class ShuffledRDD[K, V, C](
@transient parent: RDD[(K, V)], @transient parent: RDD[(K, V)],
aggregator: Aggregator[K, V, C], aggregator: Option[Aggregator[K, V, C]],
part: Partitioner) part: Partitioner)
extends RDD[(K, C)](parent.context) { extends RDD[(K, C)](parent.context) {
...@@ -48,7 +48,7 @@ class RepartitionShuffledRDD[K, V]( ...@@ -48,7 +48,7 @@ class RepartitionShuffledRDD[K, V](
part: Partitioner) part: Partitioner)
extends ShuffledRDD[K, V, V]( extends ShuffledRDD[K, V, V](
parent, parent,
Aggregator[K, V, V](null, null, null, false), None,
part) { part) {
override def compute(split: Split): Iterator[(K, V)] = { override def compute(split: Split): Iterator[(K, V)] = {
...@@ -95,7 +95,7 @@ class ShuffledAggregatedRDD[K, V, C]( ...@@ -95,7 +95,7 @@ class ShuffledAggregatedRDD[K, V, C](
@transient parent: RDD[(K, V)], @transient parent: RDD[(K, V)],
aggregator: Aggregator[K, V, C], aggregator: Aggregator[K, V, C],
part : Partitioner) part : Partitioner)
extends ShuffledRDD[K, V, C](parent, aggregator, part) { extends ShuffledRDD[K, V, C](parent, Some(aggregator), part) {
override def compute(split: Split): Iterator[(K, C)] = { override def compute(split: Split): Iterator[(K, C)] = {
val combiners = new JHashMap[K, C] val combiners = new JHashMap[K, C]
......
...@@ -43,7 +43,7 @@ class UnionRDD[T: ClassManifest]( ...@@ -43,7 +43,7 @@ class UnionRDD[T: ClassManifest](
override val dependencies = { override val dependencies = {
val deps = new ArrayBuffer[Dependency[_]] val deps = new ArrayBuffer[Dependency[_]]
var pos = 0 var pos = 0
for ((rdd, index) <- rdds.zipWithIndex) { for (rdd <- rdds) {
deps += new RangeDependency(rdd, 0, pos, rdd.splits.size) deps += new RangeDependency(rdd, 0, pos, rdd.splits.size)
pos += rdd.splits.size pos += rdd.splits.size
} }
......
...@@ -111,11 +111,11 @@ private[spark] class ShuffleMapTask( ...@@ -111,11 +111,11 @@ private[spark] class ShuffleMapTask(
override def run(attemptId: Long): MapStatus = { override def run(attemptId: Long): MapStatus = {
val numOutputSplits = dep.partitioner.numPartitions val numOutputSplits = dep.partitioner.numPartitions
val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]]
val partitioner = dep.partitioner val partitioner = dep.partitioner
val bucketIterators = val bucketIterators =
if (aggregator.mapSideCombine) { if (dep.aggregator.isDefined && dep.aggregator.get.mapSideCombine) {
val aggregator = dep.aggregator.get.asInstanceOf[Aggregator[Any, Any, Any]]
// Apply combiners (map-side aggregation) to the map output. // Apply combiners (map-side aggregation) to the map output.
val buckets = Array.tabulate(numOutputSplits)(_ => new JHashMap[Any, Any]) val buckets = Array.tabulate(numOutputSplits)(_ => new JHashMap[Any, Any])
for (elem <- rdd.iterator(split)) { for (elem <- rdd.iterator(split)) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment