From 112bd9bfc5b9729f6f86518998b5d80c5e79fe5e Mon Sep 17 00:00:00 2001 From: liuxian <liu.xian3@zte.com.cn> Date: Mon, 19 Jun 2017 11:46:58 +0800 Subject: [PATCH] [SPARK-21090][CORE] Optimize the unified memory manager code ## What changes were proposed in this pull request? 1.In `acquireStorageMemory`, when the Memory Mode is OFF_HEAP ,the `maxOffHeapMemory` should be modified to `maxOffHeapStorageMemory`. after this PR,it will same as ON_HEAP Memory Mode. Because when acquire memory is between `maxOffHeapStorageMemory` and `maxOffHeapMemory`,it will fail surely, so if acquire memory is greater than `maxOffHeapStorageMemory`(not greater than `maxOffHeapMemory`),we should fail fast. 2. Borrow memory from execution, `numBytes` modified to `numBytes - storagePool.memoryFree` will be more reasonable. Because we just acquire `(numBytes - storagePool.memoryFree)`, unnecessary borrowed `numBytes` from execution ## How was this patch tested? added unit test case Author: liuxian <liu.xian3@zte.com.cn> Closes #18296 from 10110346/wip-lx-0614. --- .../spark/memory/UnifiedMemoryManager.scala | 5 +-- .../spark/memory/MemoryManagerSuite.scala | 2 +- .../memory/UnifiedMemoryManagerSuite.scala | 32 +++++++++++++++++++ 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index fea2808218..df193552be 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -160,7 +160,7 @@ private[spark] class UnifiedMemoryManager private[memory] ( case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, - maxOffHeapMemory) + maxOffHeapStorageMemory) } if (numBytes > maxMemory) { // Fail fast if the block simply won't fit @@ -171,7 +171,8 @@ private[spark] class UnifiedMemoryManager private[memory] ( if (numBytes > storagePool.memoryFree) { // There is not enough free memory in the storage pool, so try to borrow free memory from // the execution pool. - val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes) + val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, + numBytes - storagePool.memoryFree) executionPool.decrementPoolSize(memoryBorrowedFromExecution) storagePool.incrementPoolSize(memoryBorrowedFromExecution) } diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index eb2b3ffd15..85eeb5055a 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -117,7 +117,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft evictBlocksToFreeSpaceCalled.set(numBytesToFree) if (numBytesToFree <= mm.storageMemoryUsed) { // We can evict enough blocks to fulfill the request for space - mm.releaseStorageMemory(numBytesToFree, MemoryMode.ON_HEAP) + mm.releaseStorageMemory(numBytesToFree, mm.tungstenMemoryMode) evictedBlocks += Tuple2(null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L)) numBytesToFree } else { diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index c821054412..02b04cdbb2 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -303,4 +303,36 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes mm.invokePrivate[Unit](assertInvariants()) } + test("not enough free memory in the storage pool --OFF_HEAP") { + val conf = new SparkConf() + .set("spark.memory.offHeap.size", "1000") + .set("spark.testing.memory", "1000") + .set("spark.memory.offHeap.enabled", "true") + val taskAttemptId = 0L + val mm = UnifiedMemoryManager(conf, numCores = 1) + val ms = makeMemoryStore(mm) + val memoryMode = MemoryMode.OFF_HEAP + + assert(mm.acquireExecutionMemory(400L, taskAttemptId, memoryMode) === 400L) + assert(mm.storageMemoryUsed === 0L) + assert(mm.executionMemoryUsed === 400L) + + // Fail fast + assert(!mm.acquireStorageMemory(dummyBlock, 700L, memoryMode)) + assert(mm.storageMemoryUsed === 0L) + + assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode)) + assert(mm.storageMemoryUsed === 100L) + assertEvictBlocksToFreeSpaceNotCalled(ms) + + // Borrow 50 from execution memory + assert(mm.acquireStorageMemory(dummyBlock, 450L, memoryMode)) + assertEvictBlocksToFreeSpaceNotCalled(ms) + assert(mm.storageMemoryUsed === 550L) + + // Borrow 50 from execution memory and evict 50 to free space + assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode)) + assertEvictBlocksToFreeSpaceCalled(ms, 50) + assert(mm.storageMemoryUsed === 600L) + } } -- GitLab