diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 61a47ec112656060e3e7f0d5f77913f38b3d8a58..b9c70eaf607ccec4f645a2e2a14127ded09fb7e1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -17,6 +17,7 @@ package org.apache.spark.storage +import java.util.TreeSet import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ @@ -28,7 +29,6 @@ import com.google.common.collect.{ConcurrentHashMultiset, ImmutableMultiset} import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.Logging - /** * Tracks metadata for an individual block. * @@ -112,7 +112,8 @@ object BlockInfo { * * This class is thread-safe. */ -private[storage] class BlockInfoManager extends Logging { +private[storage] class BlockInfoManager(val blockManager: BlockManager) + extends Logging { private type TaskAttemptId = Long @@ -123,6 +124,10 @@ private[storage] class BlockInfoManager extends Logging { */ @GuardedBy("this") private[this] val infos = new mutable.HashMap[BlockId, BlockInfo] + // SS + private[spark] var blockExInfos = new java.util.HashMap[RDDBlockId, BlockExInfo] + private[storage] var inMemExInfos = new java.util.TreeSet[BlockExInfo] + // SS /** * Tracks the set of blocks that each task has locked for writing. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f852869123b8e0a62a0f1ffa941301071c7ab7c3..d8532b00bc90419cc7e13abcd5697b164425987c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -45,6 +45,7 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.{ExternalShuffleClient, TempFileManager} import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.rpc.RpcEnv +import org.apache.spark.scheduler.StageExInfo import org.apache.spark.serializer.{SerializerInstance, SerializerManager} import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage.memory._ @@ -138,7 +139,11 @@ private[spark] class BlockManager( } // Visible for testing - private[storage] val blockInfoManager = new BlockInfoManager + private[spark] val blockInfoManager = new BlockInfoManager(this) + // SS + var currentStage: Int = -1 + private[spark] var stageExInfos = new mutable.HashMap[Int, StageExInfo] + // SS private val futureExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128)) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 643c145336657eeb0f0371d9b429704b98dc171f..1b4cc595b8471b34b098d0400b660c21834c0045 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -470,20 +470,20 @@ private[spark] class MemoryStore( } - blockInfoManager.blockManager.inMemBlockExInfo.synchronized { - val bxi_iterator = blockManager.inMemBlockExInfo.iterator() + blockInfoManager.inMemExInfos.synchronized { + val bxi_iterator = blockInfoManager.inMemExInfos.iterator() while (freedMemory < space && bxi_iterator.hasNext) { - val bxi = iterator.next() - if (blockIsEvictable(bxi.blockId, entries(bxi.blockId))) { - blockManager.stageExInfos.get(blockManager.currentStage) match { + val bxi = bxi_iterator.next() + if (blockIsEvictable(bxi.blockId, entries.get(bxi.blockId))) { + blockInfoManager.blockManager.stageExInfos.get(blockManager.currentStage) match { case Some(curStageExInfo) => // cur is this stage's output RDD - if (!curStageExInfo.curRunningRddMap.contains(bxi.blockId.getRddId)) { + if (!curStageExInfo.curRunningRddMap.contains(bxi.blockId.asRDDId.get.rddId)) { selectedBlocks += bxi.blockId freedMemory += bxi.size } case None => - logEarne("Should we get here?") + logInfo("Should we get here?") } } }