diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 80d66e59132dacb633bbd3e43fdeaf79a1b07782..1dff09a75d0385581d69bf20f23573374c95fa12 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -535,9 +535,14 @@ private[spark] class BlockManager(
               /* We'll store the bytes in memory if the block's storage level includes
                * "memory serialized", or if it should be cached as objects in memory
                * but we only requested its serialized bytes. */
-              val copyForMemory = ByteBuffer.allocate(bytes.limit)
-              copyForMemory.put(bytes)
-              memoryStore.putBytes(blockId, copyForMemory, level)
+              memoryStore.putBytes(blockId, bytes.limit, () => {
+                // https://issues.apache.org/jira/browse/SPARK-6076
+                // If the file size is bigger than the free memory, OOM will happen. So if we cannot
+                // put it into MemoryStore, copyForMemory should not be created. That's why this
+                // action is put into a `() => ByteBuffer` and created lazily.
+                val copyForMemory = ByteBuffer.allocate(bytes.limit)
+                copyForMemory.put(bytes)
+              })
               bytes.rewind()
             }
             if (!asBlockResult) {
@@ -991,15 +996,23 @@ private[spark] class BlockManager(
     putIterator(blockId, Iterator(value), level, tellMaster)
   }
 
+  def dropFromMemory(
+      blockId: BlockId,
+      data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
+    dropFromMemory(blockId, () => data)
+  }
+
   /**
    * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
    * store reaches its limit and needs to free up space.
    *
+   * If `data` is not put on disk, it won't be created.
+   *
    * Return the block status if the given block has been updated, else None.
    */
   def dropFromMemory(
       blockId: BlockId,
-      data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
+      data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
 
     logInfo(s"Dropping block $blockId from memory")
     val info = blockInfo.get(blockId).orNull
@@ -1023,7 +1036,7 @@ private[spark] class BlockManager(
         // Drop to disk, if storage level requires
         if (level.useDisk && !diskStore.contains(blockId)) {
           logInfo(s"Writing block $blockId to disk")
-          data match {
+          data() match {
             case Left(elements) =>
               diskStore.putArray(blockId, elements, level, returnValues = false)
             case Right(bytes) =>
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 1be860aea63d0b83c7bcf28845cbfd6d7dbffab0..ed609772e697994c95fbeed63d744e9f5c1370a2 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -98,6 +98,26 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     }
   }
 
+  /**
+   * Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and
+   * put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
+   *
+   * The caller should guarantee that `size` is correct.
+   */
+  def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
+    // Work on a duplicate - since the original input might be used elsewhere.
+    lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
+    val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false)
+    val data =
+      if (putAttempt.success) {
+        assert(bytes.limit == size)
+        Right(bytes.duplicate())
+      } else {
+        null
+      }
+    PutResult(size, data, putAttempt.droppedBlocks)
+  }
+
   override def putArray(
       blockId: BlockId,
       values: Array[Any],
@@ -312,11 +332,22 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     blockId.asRDDId.map(_.rddId)
   }
 
+  private def tryToPut(
+      blockId: BlockId,
+      value: Any,
+      size: Long,
+      deserialized: Boolean): ResultWithDroppedBlocks = {
+    tryToPut(blockId, () => value, size, deserialized)
+  }
+
   /**
    * Try to put in a set of values, if we can free up enough space. The value should either be
    * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
    * must also be passed by the caller.
    *
+   * `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be
+   * created to avoid OOM since it may be a big ByteBuffer.
+   *
    * Synchronize on `accountingLock` to ensure that all the put requests and its associated block
    * dropping is done by only on thread at a time. Otherwise while one thread is dropping
    * blocks to free memory for one block, another thread may use up the freed space for
@@ -326,7 +357,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
    */
   private def tryToPut(
       blockId: BlockId,
-      value: Any,
+      value: () => Any,
       size: Long,
       deserialized: Boolean): ResultWithDroppedBlocks = {
 
@@ -345,7 +376,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       droppedBlocks ++= freeSpaceResult.droppedBlocks
 
       if (enoughFreeSpace) {
-        val entry = new MemoryEntry(value, size, deserialized)
+        val entry = new MemoryEntry(value(), size, deserialized)
         entries.synchronized {
           entries.put(blockId, entry)
           currentMemory += size
@@ -357,12 +388,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       } else {
         // Tell the block manager that we couldn't put it in memory so that it can drop it to
         // disk if the block allows disk storage.
-        val data = if (deserialized) {
-          Left(value.asInstanceOf[Array[Any]])
+        lazy val data = if (deserialized) {
+          Left(value().asInstanceOf[Array[Any]])
         } else {
-          Right(value.asInstanceOf[ByteBuffer].duplicate())
+          Right(value().asInstanceOf[ByteBuffer].duplicate())
         }
-        val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
+        val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
         droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
       }
       // Release the unroll memory used because we no longer need the underlying Array
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 3fdbe99b5d02bceadff6ae8928f180f97b1de332..ecd1cba5b5abe225321c83e111e3ae8f4db4dd9e 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -170,8 +170,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
     assert(master.getLocations("a3").size === 0, "master was told about a3")
 
     // Drop a1 and a2 from memory; this should be reported back to the master
-    store.dropFromMemory("a1", null)
-    store.dropFromMemory("a2", null)
+    store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
+    store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
     assert(store.getSingle("a1") === None, "a1 not removed from store")
     assert(store.getSingle("a2") === None, "a2 not removed from store")
     assert(master.getLocations("a1").size === 0, "master did not remove a1")
@@ -413,8 +413,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
       t2.join()
       t3.join()
 
-      store.dropFromMemory("a1", null)
-      store.dropFromMemory("a2", null)
+      store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
+      store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
       store.waitForAsyncReregister()
     }
   }
@@ -1223,4 +1223,30 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
     assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
     assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
   }
+
+  test("lazily create a big ByteBuffer to avoid OOM if it cannot be put into MemoryStore") {
+    store = makeBlockManager(12000)
+    val memoryStore = store.memoryStore
+    val blockId = BlockId("rdd_3_10")
+    val result = memoryStore.putBytes(blockId, 13000, () => {
+      fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
+    })
+    assert(result.size === 13000)
+    assert(result.data === null)
+    assert(result.droppedBlocks === Nil)
+  }
+
+  test("put a small ByteBuffer to MemoryStore") {
+    store = makeBlockManager(12000)
+    val memoryStore = store.memoryStore
+    val blockId = BlockId("rdd_3_10")
+    var bytes: ByteBuffer = null
+    val result = memoryStore.putBytes(blockId, 10000, () => {
+      bytes = ByteBuffer.allocate(10000)
+      bytes
+    })
+    assert(result.size === 10000)
+    assert(result.data === Right(bytes))
+    assert(result.droppedBlocks === Nil)
+  }
 }