diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 45b73380806ddba798e75241b56021059a8ddf7b..245d94ac4f8b11f42285bb012eb2f2de8d0c900f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -317,6 +317,9 @@ private[spark] class BlockManager(
 
   /**
    * Put the block locally, using the given storage level.
+   *
+   * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
+   * so may corrupt or change the data stored by the `BlockManager`.
    */
   override def putBlockData(
       blockId: BlockId,
@@ -755,6 +758,9 @@ private[spark] class BlockManager(
   /**
    * Put a new block of serialized bytes to the block manager.
    *
+   * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
+   * so may corrupt or change the data stored by the `BlockManager`.
+   *
    * @param encrypt If true, asks the block manager to encrypt the data block before storing,
    *                when I/O encryption is enabled. This is required for blocks that have been
    *                read from unencrypted sources, since all the BlockManager read APIs
@@ -773,7 +779,7 @@ private[spark] class BlockManager(
       if (encrypt && securityManager.ioEncryptionKey.isDefined) {
         try {
           val data = bytes.toByteBuffer
-          val in = new ByteBufferInputStream(data, true)
+          val in = new ByteBufferInputStream(data)
           val byteBufOut = new ByteBufferOutputStream(data.remaining())
           val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, conf,
             securityManager.ioEncryptionKey.get)
@@ -800,6 +806,9 @@ private[spark] class BlockManager(
    *
    * If the block already exists, this method will not overwrite it.
    *
+   * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
+   * so may corrupt or change the data stored by the `BlockManager`.
+   *
    * @param keepReadLock if true, this method will hold the read lock when it returns (even if the
    *                     block already exists). If false, this method will hold no locks when it
    *                     returns.
@@ -843,7 +852,15 @@ private[spark] class BlockManager(
               false
           }
         } else {
-          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
+          val memoryMode = level.memoryMode
+          memoryStore.putBytes(blockId, size, memoryMode, () => {
+            if (memoryMode == MemoryMode.OFF_HEAP &&
+                bytes.chunks.exists(buffer => !buffer.isDirect)) {
+              bytes.copy(Platform.allocateDirectBuffer)
+            } else {
+              bytes
+            }
+          })
         }
         if (!putSucceeded && level.useDisk) {
           logWarning(s"Persisting block $blockId to disk instead.")
@@ -1048,7 +1065,7 @@ private[spark] class BlockManager(
           try {
             replicate(blockId, bytesToReplicate, level, remoteClassTag)
           } finally {
-            bytesToReplicate.dispose()
+            bytesToReplicate.unmap()
           }
           logDebug("Put block %s remotely took %s"
             .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index e12f2e6095d5ad26d9bcf9a6d9d40a95bd8fd9ca..5efdd23f79a21df90292b9eebebe0dc32dcaf162 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -236,22 +236,60 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
 
 /** Helper methods for storage-related objects. */
 private[spark] object StorageUtils extends Logging {
+  // Ewwww... Reflection!!! See the unmap method for justification
+  private val memoryMappedBufferFileDescriptorField = {
+    val mappedBufferClass = classOf[java.nio.MappedByteBuffer]
+    val fdField = mappedBufferClass.getDeclaredField("fd")
+    fdField.setAccessible(true)
+    fdField
+  }
 
   /**
-   * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
-   * might cause errors if one attempts to read from the unmapped buffer, but it's better than
-   * waiting for the GC to find it because that could lead to huge numbers of open files. There's
-   * unfortunately no standard API to do this.
+   * Attempt to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun
+   * API that will cause errors if one attempts to read from the disposed buffer. However, neither
+   * the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put
+   * pressure on the garbage collector. Waiting for garbage collection may lead to the depletion of
+   * off-heap memory or huge numbers of open files. There's unfortunately no standard API to
+   * manually dispose of these kinds of buffers.
+   *
+   * See also [[unmap]]
    */
   def dispose(buffer: ByteBuffer): Unit = {
     if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
-      logTrace(s"Unmapping $buffer")
-      if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
-        buffer.asInstanceOf[DirectBuffer].cleaner().clean()
+      logTrace(s"Disposing of $buffer")
+      cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
+    }
+  }
+
+  /**
+   * Attempt to unmap a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that will
+   * cause errors if one attempts to read from the unmapped buffer. However, the file descriptors of
+   * memory-mapped buffers do not put pressure on the garbage collector. Waiting for garbage
+   * collection may lead to huge numbers of open files. There's unfortunately no standard API to
+   * manually unmap memory-mapped buffers.
+   *
+   * See also [[dispose]]
+   */
+  def unmap(buffer: ByteBuffer): Unit = {
+    if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
+      // Note that direct buffers are instances of MappedByteBuffer. As things stand in Java 8, the
+      // JDK does not provide a public API to distinguish between direct buffers and memory-mapped
+      // buffers. As an alternative, we peek beneath the curtains and look for a non-null file
+      // descriptor in mappedByteBuffer
+      if (memoryMappedBufferFileDescriptorField.get(buffer) != null) {
+        logTrace(s"Unmapping $buffer")
+        cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
       }
     }
   }
 
+  private def cleanDirectBuffer(buffer: DirectBuffer) = {
+    val cleaner = buffer.cleaner()
+    if (cleaner != null) {
+      cleaner.clean()
+    }
+  }
+
   /**
    * Update the given list of RDDInfo with the given list of storage statuses.
    * This method overwrites the old values stored in the RDDInfo's.
diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
index dce2ac63a664cbfdc7ac303f40394bb62efc225b..50dc948e6c41030398f1e3a7365b8575b108814c 100644
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
@@ -23,11 +23,10 @@ import java.nio.ByteBuffer
 import org.apache.spark.storage.StorageUtils
 
 /**
- * Reads data from a ByteBuffer, and optionally cleans it up using StorageUtils.dispose()
- * at the end of the stream (e.g. to close a memory-mapped file).
+ * Reads data from a ByteBuffer.
  */
 private[spark]
-class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = false)
+class ByteBufferInputStream(private var buffer: ByteBuffer)
   extends InputStream {
 
   override def read(): Int = {
@@ -72,9 +71,6 @@ class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = f
    */
   private def cleanUp() {
     if (buffer != null) {
-      if (dispose) {
-        StorageUtils.dispose(buffer)
-      }
       buffer = null
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 7572cac39317c714bb738e3ded7b4681397899db..1667516663b35b0cd5e1bdc4d046cc4c2259dec9 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -86,7 +86,11 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
   }
 
   /**
-   * Copy this buffer into a new ByteBuffer.
+   * Convert this buffer to a ByteBuffer. If this buffer is backed by a single chunk, its underlying
+   * data will not be copied. Instead, it will be duplicated. If this buffer is backed by multiple
+   * chunks, the data underlying this buffer will be copied into a new byte buffer. As a result, it
+   * is suggested to use this method only if the caller does not need to manage the memory
+   * underlying this buffer.
    *
    * @throws UnsupportedOperationException if this buffer's size exceeds the max ByteBuffer size.
    */
@@ -132,10 +136,10 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
   }
 
   /**
-   * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
-   * might cause errors if one attempts to read from the unmapped buffer, but it's better than
-   * waiting for the GC to find it because that could lead to huge numbers of open files. There's
-   * unfortunately no standard API to do this.
+   * Attempt to clean up any ByteBuffer in this ChunkedByteBuffer which is direct or memory-mapped.
+   * See [[StorageUtils.dispose]] for more information.
+   *
+   * See also [[unmap]]
    */
   def dispose(): Unit = {
     if (!disposed) {
@@ -143,6 +147,19 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
       disposed = true
     }
   }
+
+  /**
+   * Attempt to unmap any ByteBuffer in this ChunkedByteBuffer if it is memory-mapped. See
+   * [[StorageUtils.unmap]] for more information.
+   *
+   * See also [[dispose]]
+   */
+  def unmap(): Unit = {
+    if (!disposed) {
+      chunks.foreach(StorageUtils.unmap)
+      disposed = true
+    }
+  }
 }
 
 /**
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 75dc04038debcedcbc74c9277e316e5f17a78f9f..d907add920c8a15ce695a390f78f854acb22b77e 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -374,7 +374,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
       // Put the block into one of the stores
       val blockId = new TestBlockId(
         "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
-      stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+      val testValue = Array.fill[Byte](blockSize)(1)
+      stores(0).putSingle(blockId, testValue, storageLevel)
 
       // Assert that master know two locations for the block
       val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
@@ -386,12 +387,23 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
         testStore => blockLocations.contains(testStore.blockManagerId.executorId)
       }.foreach { testStore =>
         val testStoreName = testStore.blockManagerId.executorId
-        assert(
-          testStore.getLocalValues(blockId).isDefined, s"$blockId was not found in $testStoreName")
-        testStore.releaseLock(blockId)
+        val blockResultOpt = testStore.getLocalValues(blockId)
+        assert(blockResultOpt.isDefined, s"$blockId was not found in $testStoreName")
+        val localValues = blockResultOpt.get.data.toSeq
+        assert(localValues.size == 1)
+        assert(localValues.head === testValue)
         assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
           s"master does not have status for ${blockId.name} in $testStoreName")
 
+        val memoryStore = testStore.memoryStore
+        if (memoryStore.contains(blockId) && !storageLevel.deserialized) {
+          memoryStore.getBytes(blockId).get.chunks.foreach { byteBuffer =>
+            assert(storageLevel.useOffHeap == byteBuffer.isDirect,
+              s"memory mode ${storageLevel.memoryMode} is not compatible with " +
+                byteBuffer.getClass.getSimpleName)
+          }
+        }
+
         val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId)
 
         // Assert that block status in the master for this store has expected storage level