diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index e36c759a425569e0cd857944ca0992ed2e31358a..12f83c83fcda18aefa794b7cbe0f5c245991531f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -60,9 +60,11 @@ private[spark] class ResultTask[T, U]( serializedTaskMetrics: Array[Byte], jobId: Option[Int] = None, appId: Option[String] = None, - appAttemptId: Option[String] = None) + appAttemptId: Option[String] = None, + depMap: HashMap[Int, Set[Int]] = null, + curRunningRddMap: HashMap[Int, Set[Int]] = null) extends Task[U](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics, - jobId, appId, appAttemptId) + jobId, appId, appAttemptId,depMap,curRunningRddMap) with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 7a25c47e2cab34554bd44498a1911c8c88456152..7cecbb53511483e185c909ed90f1ca9810f9a390 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -60,7 +60,10 @@ private[spark] class ShuffleMapTask( serializedTaskMetrics: Array[Byte], jobId: Option[Int] = None, appId: Option[String] = None, - appAttemptId: Option[String] = None) + appAttemptId: Option[String] = None, + depMap: HashMap[Int, Set[Int]] = None, + curRunningRddMap: HashMap[Int, Set[Int]] = None + ) extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics, jobId, appId, appAttemptId) with Logging { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index f536fc2a5f0a12e2aa983e8dcf6f3775dee5dda9..6cf6b942790cc00bf8ca89736ed00ed4873a0c4b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -60,7 +60,10 @@ private[spark] abstract class Task[T]( SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(), val jobId: Option[Int] = None, val appId: Option[String] = None, - val appAttemptId: Option[String] = None) extends Serializable { + val appAttemptId: Option[String] = None, + var depMap: HashMap[Int, Set[Int]] = new HashMap[Int, Set[Int]], + var curRunningRddMap: HashMap[Int, Set[Int]] = + new HashMap[Int, Set[Int]]) extends Serializable { @transient lazy val metrics: TaskMetrics = SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(serializedTaskMetrics))