diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index 19a51dd5b87f900ae11a61039961cface6136a5a..dfc7e292b737983aa6a5317e7c41ab5d06388626 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -26,11 +26,13 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { * @param partitioner partitioner used to partition the shuffle output */ class ShuffleDependency[K, V, C]( - val shuffleId: Int, @transient rdd: RDD[(K, V)], val aggregator: Option[Aggregator[K, V, C]], val partitioner: Partitioner) - extends Dependency(rdd) + extends Dependency(rdd) { + + val shuffleId: Int = rdd.context.newShuffleId() +} /** * Represents a one-to-one dependency between partitions of the parent and child RDDs. diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index f1defbe492a6e89ab3a8c8ac077b98addf511f2d..ace25006270239799ae8ce823887e04df8d8c58b 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -48,8 +48,7 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner) deps += new OneToOneDependency(rdd) } else { logInfo("Adding shuffle dependency with " + rdd) - deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]]( - context.newShuffleId, rdd, Some(aggr), part) + deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]](rdd, Some(aggr), part) } } deps.toList diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index 7577909b8390dc1c572f61110bd0d08ba8d16dad..be120acc71565ffdc740c369c501a228d9aa8e6d 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -35,7 +35,7 @@ abstract class ShuffledRDD[K, V, C]( override def preferredLocations(split: Split) = Nil - val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part) + val dep = new ShuffleDependency(parent, aggregator, part) override val dependencies = List(dep) }