Skip to content
Snippets Groups Projects
Commit e2297b3b authored by skeirik2's avatar skeirik2
Browse files

added missing code from Chinese team to BlockManager

parent 180714a2
No related branches found
No related tags found
No related merge requests found
...@@ -1428,6 +1428,18 @@ private[spark] class BlockManager( ...@@ -1428,6 +1428,18 @@ private[spark] class BlockManager(
var blockIsUpdated = false var blockIsUpdated = false
val level = info.level val level = info.level
// SS
if (blockId.isRDD) {
logInfo("Phoneix: change exist status of " + blockId + " to 0")
val curValue = blockInfoManager.blockExInfos.get(blockId)
curValue.isExist = 0
blockInfoManager.inMemExInfos.synchronized {
logInfo("Phoneix: Remove " + blockId + " from inMemExInfos")
blockInfoManager.inMemExInfos.remove(curValue)
}
}
// SS
// Drop to disk, if storage level requires // Drop to disk, if storage level requires
if (level.useDisk && !diskStore.contains(blockId)) { if (level.useDisk && !diskStore.contains(blockId)) {
logInfo(s"Writing block $blockId to disk") logInfo(s"Writing block $blockId to disk")
...@@ -1474,6 +1486,63 @@ private[spark] class BlockManager( ...@@ -1474,6 +1486,63 @@ private[spark] class BlockManager(
def removeRdd(rddId: Int): Int = { def removeRdd(rddId: Int): Int = {
// TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks. // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
logInfo(s"Removing RDD $rddId") logInfo(s"Removing RDD $rddId")
// SS
logInfo("Phoneix: Now we in Stage: " + currentStage +
" and the depMap is: " + stageExInfos(currentStage).depMap)
var targetRdd: Int = -1
stageExInfos(currentStage).depMap.foreach { cur =>
if (cur._2.contains(rddId)) {
targetRdd = cur._1
}
}
logInfo("Phonex: Now we set targetRDD to: " + targetRdd + " becasue we removeRdd: " + rddId)
val iter = blockInfoManager.blockExInfos.entrySet().iterator()
val blockExInfos = blockInfoManager.blockExInfos
val inMemExInfos = blockInfoManager.inMemExInfos
while (iter.hasNext) {
val cur = iter.next()
val curId = cur.getKey.rddId
val splitIndex = cur.getKey.splitIndex
if (curId == targetRdd) {
inMemExInfos.synchronized {
if (inMemExInfos.contains(cur.getValue)) {
logInfo("Phoneix: Remove " + cur.getKey + " from inMemExInfos")
inMemExInfos.remove(cur.getValue)
cur.getValue.creatCost = cur.getValue.creatCost + blockExInfos.get(
new RDDBlockId(rddId, splitIndex)).creatCost
cur.getValue.norCost = cur.getValue.
creatCost.toDouble / (cur.getValue.size / 1024 / 1024)
logInfo("Phoenix: Add " + cur.getValue.blockId + " to inMemExInfos")
inMemExInfos.add(cur.getValue)
} else {
cur.getValue.creatCost = cur.getValue.creatCost + blockExInfos.get(
new RDDBlockId(rddId, splitIndex)).creatCost
cur.getValue.norCost = cur.getValue.
creatCost.toDouble / (cur.getValue.size / 1024 / 1024)
}
}
logInfo("Phoenix: Due to Removing RDD " + rddId + " we have to change rdd_" + curId + "_"
+ splitIndex + " ctime")
} else if (curId == rddId) {
val curValue = blockExInfos.get(new RDDBlockId(rddId, splitIndex))
curValue.isExist = 0
inMemExInfos.synchronized {
logInfo("Phoenix: Remove " + curValue.blockId + " from inMemExInfos")
inMemExInfos.remove(curValue)
}
logInfo("Phoenix: Due to Removing RDD " + rddId + " we now change " + curId + "_"
+ splitIndex + " to not exist and TreeSet have size " + inMemExInfos.size())
}
}
// TODO update create cost of sonRDD when removing parRDD and sonRDD not at remote
// SS
val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId) val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId)
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) } blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
blocksToRemove.size blocksToRemove.size
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment