diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 7f4b6e8bd368374807ec6b5f9398e81f5f018d2d..1be860aea63d0b83c7bcf28845cbfd6d7dbffab0 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -46,6 +46,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // A mapping from thread ID to amount of memory used for unrolling a block (in bytes) // All accesses of this map are assumed to have manually synchronized on `accountingLock` private val unrollMemoryMap = mutable.HashMap[Long, Long]() + // Same as `unrollMemoryMap`, but for pending unroll memory as defined below. + // Pending unroll memory refers to the intermediate memory occupied by a thread + // after the unroll but before the actual putting of the block in the cache. + // This chunk of memory is expected to be released *as soon as* we finish + // caching the corresponding block as opposed to until after the task finishes. + // This is only used if a block is successfully unrolled in its entirety in + // memory (SPARK-4777). + private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]() /** * The amount of space ensured for unrolling values in memory, shared across all cores. @@ -283,12 +291,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } finally { - // If we return an array, the values returned do not depend on the underlying vector and - // we can immediately free up space for other threads. Otherwise, if we return an iterator, - // we release the memory claimed by this thread later on when the task finishes. + // If we return an array, the values returned will later be cached in `tryToPut`. + // In this case, we should release the memory after we cache the block there. + // Otherwise, if we return an iterator, we release the memory reserved here + // later when the task finishes. if (keepUnrolling) { - val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved - releaseUnrollMemoryForThisThread(amountToRelease) + accountingLock.synchronized { + val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved + releaseUnrollMemoryForThisThread(amountToRelease) + reservePendingUnrollMemoryForThisThread(amountToRelease) + } } } } @@ -353,6 +365,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } + // Release the unroll memory used because we no longer need the underlying Array + releasePendingUnrollMemoryForThisThread() } ResultWithDroppedBlocks(putSuccess, droppedBlocks) } @@ -381,7 +395,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } // Take into account the amount of memory currently occupied by unrolling blocks - val actualFreeMemory = freeMemory - currentUnrollMemory + // and minus the pending unroll memory for that block on current thread. + val threadId = Thread.currentThread().getId + val actualFreeMemory = freeMemory - currentUnrollMemory + + pendingUnrollMemoryMap.getOrElse(threadId, 0L) if (actualFreeMemory < space) { val rddToAdd = getRddId(blockIdToAdd) @@ -468,11 +485,32 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } + /** + * Reserve the unroll memory of current unroll successful block used by this thread + * until actually put the block into memory entry. + */ + def reservePendingUnrollMemoryForThisThread(memory: Long): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId, 0L) + memory + } + } + + /** + * Release pending unroll memory of current unroll successful block used by this thread + */ + def releasePendingUnrollMemoryForThisThread(): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + pendingUnrollMemoryMap.remove(threadId) + } + } + /** * Return the amount of memory currently occupied for unrolling blocks across all threads. */ def currentUnrollMemory: Long = accountingLock.synchronized { - unrollMemoryMap.values.sum + unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum } /** diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index ffe6f039145ea2e2e3dd5f5742f0e3dd3eec6e4d..3fdbe99b5d02bceadff6ae8928f180f97b1de332 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1064,6 +1064,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) assert(memoryStore.currentUnrollMemoryForThisThread === 0) + memoryStore.releasePendingUnrollMemoryForThisThread() // Unroll with not enough space. This should succeed after kicking out someBlock1. store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) @@ -1074,6 +1075,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach assert(droppedBlocks.size === 1) assert(droppedBlocks.head._1 === TestBlockId("someBlock1")) droppedBlocks.clear() + memoryStore.releasePendingUnrollMemoryForThisThread() // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.