diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d8532b00bc90419cc7e13abcd5697b164425987c..242922d279ecf8c5b7d1c0a6d534d3ad9edfaefb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1428,6 +1428,18 @@ private[spark] class BlockManager( var blockIsUpdated = false val level = info.level + // SS + if (blockId.isRDD) { + logInfo("Phoneix: change exist status of " + blockId + " to 0") + val curValue = blockInfoManager.blockExInfos.get(blockId) + curValue.isExist = 0 + blockInfoManager.inMemExInfos.synchronized { + logInfo("Phoneix: Remove " + blockId + " from inMemExInfos") + blockInfoManager.inMemExInfos.remove(curValue) + } + } + // SS + // Drop to disk, if storage level requires if (level.useDisk && !diskStore.contains(blockId)) { logInfo(s"Writing block $blockId to disk") @@ -1474,6 +1486,63 @@ private[spark] class BlockManager( def removeRdd(rddId: Int): Int = { // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks. logInfo(s"Removing RDD $rddId") + // SS + logInfo("Phoneix: Now we in Stage: " + currentStage + + " and the depMap is: " + stageExInfos(currentStage).depMap) + + var targetRdd: Int = -1 + stageExInfos(currentStage).depMap.foreach { cur => + if (cur._2.contains(rddId)) { + targetRdd = cur._1 + } + } + + logInfo("Phonex: Now we set targetRDD to: " + targetRdd + " becasue we removeRdd: " + rddId) + + val iter = blockInfoManager.blockExInfos.entrySet().iterator() + val blockExInfos = blockInfoManager.blockExInfos + val inMemExInfos = blockInfoManager.inMemExInfos + + while (iter.hasNext) { + val cur = iter.next() + val curId = cur.getKey.rddId + val splitIndex = cur.getKey.splitIndex + if (curId == targetRdd) { + inMemExInfos.synchronized { + if (inMemExInfos.contains(cur.getValue)) { + logInfo("Phoneix: Remove " + cur.getKey + " from inMemExInfos") + inMemExInfos.remove(cur.getValue) + cur.getValue.creatCost = cur.getValue.creatCost + blockExInfos.get( + new RDDBlockId(rddId, splitIndex)).creatCost + cur.getValue.norCost = cur.getValue. + creatCost.toDouble / (cur.getValue.size / 1024 / 1024) + logInfo("Phoenix: Add " + cur.getValue.blockId + " to inMemExInfos") + inMemExInfos.add(cur.getValue) + + } else { + cur.getValue.creatCost = cur.getValue.creatCost + blockExInfos.get( + new RDDBlockId(rddId, splitIndex)).creatCost + cur.getValue.norCost = cur.getValue. + creatCost.toDouble / (cur.getValue.size / 1024 / 1024) + } + } + + logInfo("Phoenix: Due to Removing RDD " + rddId + " we have to change rdd_" + curId + "_" + + splitIndex + " ctime") + } else if (curId == rddId) { + val curValue = blockExInfos.get(new RDDBlockId(rddId, splitIndex)) + curValue.isExist = 0 + inMemExInfos.synchronized { + logInfo("Phoenix: Remove " + curValue.blockId + " from inMemExInfos") + inMemExInfos.remove(curValue) + } + logInfo("Phoenix: Due to Removing RDD " + rddId + " we now change " + curId + "_" + + splitIndex + " to not exist and TreeSet have size " + inMemExInfos.size()) + } + } + // TODO update create cost of sonRDD when removing parRDD and sonRDD not at remote + // SS + val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId) blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) } blocksToRemove.size