From 642b246beb7879978d31f2e6e97de7e06c74dcb7 Mon Sep 17 00:00:00 2001
From: "Zhang, Liye" <liye.zhang@intel.com>
Date: Thu, 16 Oct 2014 19:07:37 -0700
Subject: [PATCH] [SPARK-3941][CORE] _remainingmem should not increase twice
 when updateBlockInfo

In BlockManagermasterActor, _remainingMem would increase memSize for twice when updateBlockInfo if new storageLevel is invalid and old storageLevel is "useMemory". Also, _remainingMem should increase with original memory size instead of new memSize.

Author: Zhang, Liye <liye.zhang@intel.com>

Closes #2792 from liyezhang556520/spark-3941-remainMem and squashes the following commits:

3d487cc [Zhang, Liye] make the code concise
0380a32 [Zhang, Liye] [SPARK-3941][CORE] _remainingmem should not increase twice when updateBlockInfo
---
 .../apache/spark/storage/BlockManagerMasterActor.scala   | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 6a06257ed0..088f06e389 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -457,16 +457,18 @@ private[spark] class BlockManagerInfo(
 
     if (_blocks.containsKey(blockId)) {
       // The block exists on the slave already.
-      val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
+      val blockStatus: BlockStatus = _blocks.get(blockId)
+      val originalLevel: StorageLevel = blockStatus.storageLevel
+      val originalMemSize: Long = blockStatus.memSize
 
       if (originalLevel.useMemory) {
-        _remainingMem += memSize
+        _remainingMem += originalMemSize
       }
     }
 
     if (storageLevel.isValid) {
       /* isValid means it is either stored in-memory, on-disk or on-Tachyon.
-       * But the memSize here indicates the data size in or dropped from memory,
+       * The memSize here indicates the data size in or dropped from memory,
        * tachyonSize here indicates the data size in or dropped from Tachyon,
        * and the diskSize here indicates the data size in or dropped to disk.
        * They can be both larger than 0, when a block is dropped from memory to disk.
@@ -493,7 +495,6 @@ private[spark] class BlockManagerInfo(
       val blockStatus: BlockStatus = _blocks.get(blockId)
       _blocks.remove(blockId)
       if (blockStatus.storageLevel.useMemory) {
-        _remainingMem += blockStatus.memSize
         logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
           blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
           Utils.bytesToString(_remainingMem)))
-- 
GitLab