From d8bf00c1e4814d91729aeca688c05e3e6d40621c Mon Sep 17 00:00:00 2001
From: Stephen Skeirik <skeirik2@illinois.edu>
Date: Sat, 5 May 2018 16:22:43 -0500
Subject: [PATCH] modified task data-struct

---
 .../main/scala/org/apache/spark/scheduler/ResultTask.scala  | 6 ++++--
 .../scala/org/apache/spark/scheduler/ShuffleMapTask.scala   | 5 ++++-
 core/src/main/scala/org/apache/spark/scheduler/Task.scala   | 5 ++++-
 3 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index e36c759a42..12f83c83fc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -60,9 +60,11 @@ private[spark] class ResultTask[T, U](
     serializedTaskMetrics: Array[Byte],
     jobId: Option[Int] = None,
     appId: Option[String] = None,
-    appAttemptId: Option[String] = None)
+    appAttemptId: Option[String] = None,
+    depMap: HashMap[Int, Set[Int]] = null,
+    curRunningRddMap: HashMap[Int, Set[Int]] = null)
   extends Task[U](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics,
-    jobId, appId, appAttemptId)
+    jobId, appId, appAttemptId,depMap,curRunningRddMap)
   with Serializable {
 
   @transient private[this] val preferredLocs: Seq[TaskLocation] = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 7a25c47e2c..7cecbb5351 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -60,7 +60,10 @@ private[spark] class ShuffleMapTask(
     serializedTaskMetrics: Array[Byte],
     jobId: Option[Int] = None,
     appId: Option[String] = None,
-    appAttemptId: Option[String] = None)
+    appAttemptId: Option[String] = None,
+    depMap: HashMap[Int, Set[Int]] = None,
+    curRunningRddMap: HashMap[Int, Set[Int]] = None
+    )
   extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties,
     serializedTaskMetrics, jobId, appId, appAttemptId)
   with Logging {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index f536fc2a5f..6cf6b94279 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -60,7 +60,10 @@ private[spark] abstract class Task[T](
       SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(),
     val jobId: Option[Int] = None,
     val appId: Option[String] = None,
-    val appAttemptId: Option[String] = None) extends Serializable {
+    val appAttemptId: Option[String] = None,
+    var depMap: HashMap[Int, Set[Int]] = new HashMap[Int, Set[Int]],
+    var curRunningRddMap: HashMap[Int, Set[Int]] =
+              new HashMap[Int, Set[Int]]) extends Serializable {
 
   @transient lazy val metrics: TaskMetrics =
     SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(serializedTaskMetrics))
-- 
GitLab