From e2297b3baafd61eb1c2cce4715bbc2e213e396cc Mon Sep 17 00:00:00 2001
From: Stephen Skeirik <skeirik2@illinois.edu>
Date: Sun, 6 May 2018 00:47:16 -0500
Subject: [PATCH] added missing code from Chinese team to BlockManager

---
 .../apache/spark/storage/BlockManager.scala   | 69 +++++++++++++++++++
 1 file changed, 69 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index d8532b00bc..242922d279 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1428,6 +1428,18 @@ private[spark] class BlockManager(
     var blockIsUpdated = false
     val level = info.level
 
+    // SS
+    if (blockId.isRDD) {
+      logInfo("Phoneix: change exist status of " + blockId + " to 0")
+      val curValue = blockInfoManager.blockExInfos.get(blockId)
+      curValue.isExist = 0
+      blockInfoManager.inMemExInfos.synchronized {
+        logInfo("Phoneix: Remove " + blockId + " from inMemExInfos")
+        blockInfoManager.inMemExInfos.remove(curValue)
+      }
+    }
+    // SS
+
     // Drop to disk, if storage level requires
     if (level.useDisk && !diskStore.contains(blockId)) {
       logInfo(s"Writing block $blockId to disk")
@@ -1474,6 +1486,63 @@ private[spark] class BlockManager(
   def removeRdd(rddId: Int): Int = {
     // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
     logInfo(s"Removing RDD $rddId")
+    // SS
+    logInfo("Phoneix: Now we in Stage: " + currentStage +
+            " and the depMap is: " + stageExInfos(currentStage).depMap)
+
+    var targetRdd: Int = -1
+    stageExInfos(currentStage).depMap.foreach { cur =>
+      if (cur._2.contains(rddId)) {
+        targetRdd = cur._1
+      }
+    }
+
+    logInfo("Phonex: Now we set targetRDD to: " + targetRdd + " becasue we removeRdd: " + rddId)
+
+    val iter = blockInfoManager.blockExInfos.entrySet().iterator()
+    val blockExInfos = blockInfoManager.blockExInfos
+    val inMemExInfos = blockInfoManager.inMemExInfos
+
+    while (iter.hasNext) {
+      val cur = iter.next()
+      val curId = cur.getKey.rddId
+      val splitIndex = cur.getKey.splitIndex
+      if (curId == targetRdd) {
+        inMemExInfos.synchronized {
+          if (inMemExInfos.contains(cur.getValue)) {
+            logInfo("Phoneix: Remove " + cur.getKey + " from inMemExInfos")
+            inMemExInfos.remove(cur.getValue)
+            cur.getValue.creatCost = cur.getValue.creatCost + blockExInfos.get(
+              new RDDBlockId(rddId, splitIndex)).creatCost
+            cur.getValue.norCost = cur.getValue.
+              creatCost.toDouble / (cur.getValue.size / 1024 / 1024)
+            logInfo("Phoenix: Add " + cur.getValue.blockId + " to inMemExInfos")
+            inMemExInfos.add(cur.getValue)
+
+          } else {
+            cur.getValue.creatCost = cur.getValue.creatCost + blockExInfos.get(
+              new RDDBlockId(rddId, splitIndex)).creatCost
+            cur.getValue.norCost = cur.getValue.
+              creatCost.toDouble / (cur.getValue.size / 1024 / 1024)
+          }
+        }
+
+        logInfo("Phoenix: Due to Removing RDD " + rddId + " we have to change rdd_" + curId + "_"
+          + splitIndex + " ctime")
+      } else if (curId == rddId) {
+        val curValue = blockExInfos.get(new RDDBlockId(rddId, splitIndex))
+        curValue.isExist = 0
+        inMemExInfos.synchronized {
+          logInfo("Phoenix: Remove " + curValue.blockId + " from inMemExInfos")
+          inMemExInfos.remove(curValue)
+        }
+        logInfo("Phoenix: Due to Removing RDD " + rddId + " we now change " + curId + "_"
+          + splitIndex + " to not exist and TreeSet have size " + inMemExInfos.size())
+      }
+    }
+    // TODO update create cost of sonRDD when removing parRDD and sonRDD not at remote
+    // SS
+
     val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId)
     blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
     blocksToRemove.size
-- 
GitLab