diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 2ea1a3318b31d66835e5147dd17f67d690e69229..9daa777074f8efa462b44dc353fb427d73e6e4bb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -40,7 +40,7 @@ import org.apache.spark.partial.BoundedDouble import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult -import org.apache.spark.storage.{RDDBlockId, StorageLevel} +import org.apache.spark.storage.{BlockExInfo, RDDBlockId, StorageLevel} import org.apache.spark.util.{BoundedPriorityQueue, Utils} import org.apache.spark.util.collection.{OpenHashMap, Utils => collectionUtils} import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler, @@ -361,38 +361,40 @@ abstract class RDD[T: ClassTag]( val blockId = RDDBlockId(id, partition.index) var readCachedBlock = true // SS { + val key = blockId val blockManager = SparkEnv.get.blockManager + var blockExInfos = blockManager.blockInfoManager.blockExInfos - if (!blockManager.blockExInfo.containsKey(key)) { - blockManager.blockExInfo.put(key, new BlockExInfo(key)) + if (!blockExInfos.containsKey(key)) { + blockExInfos.put(key, new BlockExInfo(key)) } blockManager.stageExInfos.get(blockManager.currentStage) match { case Some(curStageExInfo) => var parExist = true - for (par <- curStageExInfo.depMap(rdd.id)) { + for (par <- curStageExInfo.depMap(id)) { val parBlockId = new RDDBlockId(par, partition.index) - if (blockManager.blockExInfo.containsKey(parBlockId) && - blockManager.blockExInfo.get(parBlockId).isExist - == 1) { + if (blockExInfos.containsKey(parBlockId) && + blockExInfos.get(parBlockId).isExist == 1) { // par is exist } else { // par not exist now, add this key to it's par's watching set parExist = false - if (!blockManager.blockExInfo.containsKey(parBlockId)) { - blockManager.blockExInfo.put(parBlockId, new BlockExInfo(parBlockId)) + if (!blockExInfos.containsKey(parBlockId)) { + blockExInfos.put(parBlockId, new BlockExInfo(parBlockId)) } - blockManager.blockExInfo.get(parBlockId).sonSet += key + blockExInfos.get(parBlockId).sonSet += key } } if (parExist) { // par are all exist so we update this rdd's start time // logEarne("par all exist, store start time of " + key) - blockManager.blockExInfo.get(key).creatStartTime = System.currentTimeMillis() + blockExInfos.get(key).creatStartTime = + System.currentTimeMillis() } case None => - // logEarne("Some Thing Wrong") + logInfo("An Error With CS525 Project") } // SS } // This method is called on executors, so we need call SparkEnv.get instead of sc.env.