Skip to content
Snippets Groups Projects
Commit 64dbf8d3 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Made ShuffleDependency automatically find a shuffle ID for itself

parent 64b52166
No related branches found
No related tags found
No related merge requests found
...@@ -26,11 +26,13 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { ...@@ -26,11 +26,13 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
* @param partitioner partitioner used to partition the shuffle output * @param partitioner partitioner used to partition the shuffle output
*/ */
class ShuffleDependency[K, V, C]( class ShuffleDependency[K, V, C](
val shuffleId: Int,
@transient rdd: RDD[(K, V)], @transient rdd: RDD[(K, V)],
val aggregator: Option[Aggregator[K, V, C]], val aggregator: Option[Aggregator[K, V, C]],
val partitioner: Partitioner) val partitioner: Partitioner)
extends Dependency(rdd) extends Dependency(rdd) {
val shuffleId: Int = rdd.context.newShuffleId()
}
/** /**
* Represents a one-to-one dependency between partitions of the parent and child RDDs. * Represents a one-to-one dependency between partitions of the parent and child RDDs.
......
...@@ -48,8 +48,7 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner) ...@@ -48,8 +48,7 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner)
deps += new OneToOneDependency(rdd) deps += new OneToOneDependency(rdd)
} else { } else {
logInfo("Adding shuffle dependency with " + rdd) logInfo("Adding shuffle dependency with " + rdd)
deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]]( deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]](rdd, Some(aggr), part)
context.newShuffleId, rdd, Some(aggr), part)
} }
} }
deps.toList deps.toList
......
...@@ -35,7 +35,7 @@ abstract class ShuffledRDD[K, V, C]( ...@@ -35,7 +35,7 @@ abstract class ShuffledRDD[K, V, C](
override def preferredLocations(split: Split) = Nil override def preferredLocations(split: Split) = Nil
val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part) val dep = new ShuffleDependency(parent, aggregator, part)
override val dependencies = List(dep) override val dependencies = List(dep)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment