From 883b7e9030e1a3948acee17608e51dcd9f4d55e1 Mon Sep 17 00:00:00 2001
From: zsxwing <zsxwing@gmail.com>
Date: Wed, 25 Mar 2015 12:09:30 -0700
Subject: [PATCH] [SPARK-6076][Block Manager] Fix a potential OOM issue when
 StorageLevel is MEMORY_AND_DISK_SER

In https://github.com/apache/spark/blob/dcd1e42d6b6ac08d2c0736bf61a15f515a1f222b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L538 , when StorageLevel is `MEMORY_AND_DISK_SER`, it will copy the content from file into memory, then put it into MemoryStore.
```scala
              val copyForMemory = ByteBuffer.allocate(bytes.limit)
              copyForMemory.put(bytes)
              memoryStore.putBytes(blockId, copyForMemory, level)
              bytes.rewind()
```
However, if the file is bigger than the free memory, OOM will happen. A better approach is testing if there is enough memory. If not, copyForMemory should not be created, since this is an optional operation.

Author: zsxwing <zsxwing@gmail.com>

Closes #4827 from zsxwing/SPARK-6076 and squashes the following commits:

7d25545 [zsxwing] Add alias for tryToPut and dropFromMemory
1100a54 [zsxwing] Replace call-by-name with () => T
0cc0257 [zsxwing] Fix a potential OOM issue when StorageLevel is MEMORY_AND_DISK_SER
---
 .../apache/spark/storage/BlockManager.scala   | 23 +++++++---
 .../apache/spark/storage/MemoryStore.scala    | 43 ++++++++++++++++---
 .../spark/storage/BlockManagerSuite.scala     | 34 +++++++++++++--
 3 files changed, 85 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 80d66e5913..1dff09a75d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -535,9 +535,14 @@ private[spark] class BlockManager(
               /* We'll store the bytes in memory if the block's storage level includes
                * "memory serialized", or if it should be cached as objects in memory
                * but we only requested its serialized bytes. */
-              val copyForMemory = ByteBuffer.allocate(bytes.limit)
-              copyForMemory.put(bytes)
-              memoryStore.putBytes(blockId, copyForMemory, level)
+              memoryStore.putBytes(blockId, bytes.limit, () => {
+                // https://issues.apache.org/jira/browse/SPARK-6076
+                // If the file size is bigger than the free memory, OOM will happen. So if we cannot
+                // put it into MemoryStore, copyForMemory should not be created. That's why this
+                // action is put into a `() => ByteBuffer` and created lazily.
+                val copyForMemory = ByteBuffer.allocate(bytes.limit)
+                copyForMemory.put(bytes)
+              })
               bytes.rewind()
             }
             if (!asBlockResult) {
@@ -991,15 +996,23 @@ private[spark] class BlockManager(
     putIterator(blockId, Iterator(value), level, tellMaster)
   }
 
+  def dropFromMemory(
+      blockId: BlockId,
+      data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
+    dropFromMemory(blockId, () => data)
+  }
+
   /**
    * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
    * store reaches its limit and needs to free up space.
    *
+   * If `data` is not put on disk, it won't be created.
+   *
    * Return the block status if the given block has been updated, else None.
    */
   def dropFromMemory(
       blockId: BlockId,
-      data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
+      data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
 
     logInfo(s"Dropping block $blockId from memory")
     val info = blockInfo.get(blockId).orNull
@@ -1023,7 +1036,7 @@ private[spark] class BlockManager(
         // Drop to disk, if storage level requires
         if (level.useDisk && !diskStore.contains(blockId)) {
           logInfo(s"Writing block $blockId to disk")
-          data match {
+          data() match {
             case Left(elements) =>
               diskStore.putArray(blockId, elements, level, returnValues = false)
             case Right(bytes) =>
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 1be860aea6..ed609772e6 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -98,6 +98,26 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     }
   }
 
+  /**
+   * Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and
+   * put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
+   *
+   * The caller should guarantee that `size` is correct.
+   */
+  def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
+    // Work on a duplicate - since the original input might be used elsewhere.
+    lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
+    val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false)
+    val data =
+      if (putAttempt.success) {
+        assert(bytes.limit == size)
+        Right(bytes.duplicate())
+      } else {
+        null
+      }
+    PutResult(size, data, putAttempt.droppedBlocks)
+  }
+
   override def putArray(
       blockId: BlockId,
       values: Array[Any],
@@ -312,11 +332,22 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     blockId.asRDDId.map(_.rddId)
   }
 
+  private def tryToPut(
+      blockId: BlockId,
+      value: Any,
+      size: Long,
+      deserialized: Boolean): ResultWithDroppedBlocks = {
+    tryToPut(blockId, () => value, size, deserialized)
+  }
+
   /**
    * Try to put in a set of values, if we can free up enough space. The value should either be
    * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
    * must also be passed by the caller.
    *
+   * `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be
+   * created to avoid OOM since it may be a big ByteBuffer.
+   *
    * Synchronize on `accountingLock` to ensure that all the put requests and its associated block
    * dropping is done by only on thread at a time. Otherwise while one thread is dropping
    * blocks to free memory for one block, another thread may use up the freed space for
@@ -326,7 +357,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
    */
   private def tryToPut(
       blockId: BlockId,
-      value: Any,
+      value: () => Any,
       size: Long,
       deserialized: Boolean): ResultWithDroppedBlocks = {
 
@@ -345,7 +376,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       droppedBlocks ++= freeSpaceResult.droppedBlocks
 
       if (enoughFreeSpace) {
-        val entry = new MemoryEntry(value, size, deserialized)
+        val entry = new MemoryEntry(value(), size, deserialized)
         entries.synchronized {
           entries.put(blockId, entry)
           currentMemory += size
@@ -357,12 +388,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       } else {
         // Tell the block manager that we couldn't put it in memory so that it can drop it to
         // disk if the block allows disk storage.
-        val data = if (deserialized) {
-          Left(value.asInstanceOf[Array[Any]])
+        lazy val data = if (deserialized) {
+          Left(value().asInstanceOf[Array[Any]])
         } else {
-          Right(value.asInstanceOf[ByteBuffer].duplicate())
+          Right(value().asInstanceOf[ByteBuffer].duplicate())
         }
-        val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
+        val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
         droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
       }
       // Release the unroll memory used because we no longer need the underlying Array
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 3fdbe99b5d..ecd1cba5b5 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -170,8 +170,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
     assert(master.getLocations("a3").size === 0, "master was told about a3")
 
     // Drop a1 and a2 from memory; this should be reported back to the master
-    store.dropFromMemory("a1", null)
-    store.dropFromMemory("a2", null)
+    store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
+    store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
     assert(store.getSingle("a1") === None, "a1 not removed from store")
     assert(store.getSingle("a2") === None, "a2 not removed from store")
     assert(master.getLocations("a1").size === 0, "master did not remove a1")
@@ -413,8 +413,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
       t2.join()
       t3.join()
 
-      store.dropFromMemory("a1", null)
-      store.dropFromMemory("a2", null)
+      store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
+      store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
       store.waitForAsyncReregister()
     }
   }
@@ -1223,4 +1223,30 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
     assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
     assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
   }
+
+  test("lazily create a big ByteBuffer to avoid OOM if it cannot be put into MemoryStore") {
+    store = makeBlockManager(12000)
+    val memoryStore = store.memoryStore
+    val blockId = BlockId("rdd_3_10")
+    val result = memoryStore.putBytes(blockId, 13000, () => {
+      fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
+    })
+    assert(result.size === 13000)
+    assert(result.data === null)
+    assert(result.droppedBlocks === Nil)
+  }
+
+  test("put a small ByteBuffer to MemoryStore") {
+    store = makeBlockManager(12000)
+    val memoryStore = store.memoryStore
+    val blockId = BlockId("rdd_3_10")
+    var bytes: ByteBuffer = null
+    val result = memoryStore.putBytes(blockId, 10000, () => {
+      bytes = ByteBuffer.allocate(10000)
+      bytes
+    })
+    assert(result.size === 10000)
+    assert(result.data === Right(bytes))
+    assert(result.droppedBlocks === Nil)
+  }
 }
-- 
GitLab