diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 852666260ead228b041e30dc436c7bae48e83a74..9214fa8cffaf655c825a0fd047e39c980a7fb55d 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -96,7 +96,9 @@ private[spark] class CoarseGrainedExecutorBackend( val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) // SS { - val currentStageId = taskDesc.name.substring(taskDesc.name.lastIndexOf(' ') + 1, taskDesc.name.lastIndexOf('.')).toInt + val currentStageId = + taskDesc.name.substring(taskDesc.name.lastIndexOf(' ') + 1, + taskDesc.name.lastIndexOf('.')).toInt // logEarne("Current in Stage: " + currentStageId) env.currentStage = currentStageId env.blockManager.currentStage = currentStageId diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 5738d15627c527ed5084c19a1e2ad948275ddac3..0def96f8cd890bd2f3da9b8abdb6c7979229b9df 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -327,7 +327,9 @@ private[spark] class Executor( } // SS { - // logEarne("Task " + taskId + " is in Stage " + task.stageId + " and the depMap is " + task.depMap) + // logEarne("Task " + taskId + " is in Stage " + task.stageId + " and + // the depMap is " + task.depMap) + // if (!env.blockManager.stageExInfos.contains(task.stageId)) { env.blockManager.stageExInfos.put(task.stageId, new StageExInfo(task.stageId, null, null, task.depMap, task.curRunningRddMap)) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 9f3e3273f9461f96d951f0a314594e7451c73d0f..2ea1a3318b31d66835e5147dd17f67d690e69229 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -392,7 +392,7 @@ abstract class RDD[T: ClassTag]( blockManager.blockExInfo.get(key).creatStartTime = System.currentTimeMillis() } case None => - //logEarne("Some Thing Wrong") + // logEarne("Some Thing Wrong") } // SS } // This method is called on executors, so we need call SparkEnv.get instead of sc.env. diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 77bed0cc62c49ba053c1320b16a25df834f800b5..9f3cc3c48cf542f5b9454e22a78dde722f5e97ff 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1128,7 +1128,8 @@ class DAGScheduler( // SS { new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, - Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, depMap, curRunningRddMap) + Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, + depMap, curRunningRddMap) // SS } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 12f83c83fcda18aefa794b7cbe0f5c245991531f..7f0219af4bdc9ea5f0e42b3d7215254e268c45ca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -64,7 +64,7 @@ private[spark] class ResultTask[T, U]( depMap: HashMap[Int, Set[Int]] = null, curRunningRddMap: HashMap[Int, Set[Int]] = null) extends Task[U](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics, - jobId, appId, appAttemptId,depMap,curRunningRddMap) + jobId, appId, appAttemptId, depMap, curRunningRddMap) with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageExInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageExInfo.scala index 11b3692ec1ddd3541a56d4b5c34eed608b4a83ae..90389a45fc4267054ccc106c980e608f35afaa1e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageExInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageExInfo.scala @@ -19,9 +19,7 @@ package org.apache.spark.storage import scala.collection.mutable -/** - * DS to store info of a stage. - */ +// DS to store info of a stage. class StageExInfo(val stageId: Int, val alreadyPerRddSet: Set[Int], // prs val afterPerRddSet: Set[Int], // aprs