From 154163eddfd08f599aacf4a38aadfd510d94b10b Mon Sep 17 00:00:00 2001
From: Stephen Skeirik <skeirik2@illinois.edu>
Date: Sat, 5 May 2018 23:06:48 -0500
Subject: [PATCH] fixed BlockManager, BlockInfoManager, and MemoryStore by
 correctly referencing shared data structures between the three

---
 .../apache/spark/storage/BlockInfoManager.scala    |  9 +++++++--
 .../org/apache/spark/storage/BlockManager.scala    |  7 ++++++-
 .../apache/spark/storage/memory/MemoryStore.scala  | 14 +++++++-------
 3 files changed, 20 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 61a47ec112..b9c70eaf60 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.storage
 
+import java.util.TreeSet
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.JavaConverters._
@@ -28,7 +29,6 @@ import com.google.common.collect.{ConcurrentHashMultiset, ImmutableMultiset}
 import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.Logging
 
-
 /**
  * Tracks metadata for an individual block.
  *
@@ -112,7 +112,8 @@ object BlockInfo {
  *
  * This class is thread-safe.
  */
-private[storage] class BlockInfoManager extends Logging {
+private[storage] class BlockInfoManager(val blockManager: BlockManager)
+  extends Logging {
 
   private type TaskAttemptId = Long
 
@@ -123,6 +124,10 @@ private[storage] class BlockInfoManager extends Logging {
    */
   @GuardedBy("this")
   private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]
+  // SS
+  private[spark] var blockExInfos = new java.util.HashMap[RDDBlockId, BlockExInfo]
+  private[storage] var inMemExInfos = new java.util.TreeSet[BlockExInfo]
+  // SS
 
   /**
    * Tracks the set of blocks that each task has locked for writing.
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index f852869123..d8532b00bc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -45,6 +45,7 @@ import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.{ExternalShuffleClient, TempFileManager}
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
 import org.apache.spark.rpc.RpcEnv
+import org.apache.spark.scheduler.StageExInfo
 import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
 import org.apache.spark.shuffle.ShuffleManager
 import org.apache.spark.storage.memory._
@@ -138,7 +139,11 @@ private[spark] class BlockManager(
   }
 
   // Visible for testing
-  private[storage] val blockInfoManager = new BlockInfoManager
+  private[spark] val blockInfoManager = new BlockInfoManager(this)
+  // SS
+  var currentStage: Int = -1
+  private[spark] var stageExInfos = new mutable.HashMap[Int, StageExInfo]
+  // SS
 
   private val futureExecutionContext = ExecutionContext.fromExecutorService(
     ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 643c145336..1b4cc595b8 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -470,20 +470,20 @@ private[spark] class MemoryStore(
       }
 
 
-      blockInfoManager.blockManager.inMemBlockExInfo.synchronized {
-        val bxi_iterator = blockManager.inMemBlockExInfo.iterator()
+      blockInfoManager.inMemExInfos.synchronized {
+        val bxi_iterator = blockInfoManager.inMemExInfos.iterator()
         while (freedMemory < space &&  bxi_iterator.hasNext) {
-          val bxi = iterator.next()
-          if (blockIsEvictable(bxi.blockId, entries(bxi.blockId))) {
-            blockManager.stageExInfos.get(blockManager.currentStage) match {
+          val bxi = bxi_iterator.next()
+          if (blockIsEvictable(bxi.blockId, entries.get(bxi.blockId))) {
+            blockInfoManager.blockManager.stageExInfos.get(blockManager.currentStage) match {
               case Some(curStageExInfo) =>
                 // cur is this stage's output RDD
-                if (!curStageExInfo.curRunningRddMap.contains(bxi.blockId.getRddId)) {
+                if (!curStageExInfo.curRunningRddMap.contains(bxi.blockId.asRDDId.get.rddId)) {
                   selectedBlocks += bxi.blockId
                   freedMemory += bxi.size
                 }
               case None =>
-                logEarne("Should we get here?")
+                logInfo("Should we get here?")
             }
           }
         }
-- 
GitLab