diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index dabc81001834b35ed2c18bd10689a80b007c9df1..550e1ba6d3de0f5fafe054c3cda610d849b437c8 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -173,7 +173,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
     TorrentBroadcast.synchronized {
       setConf(SparkEnv.get.conf)
       val blockManager = SparkEnv.get.blockManager
-      blockManager.getLocal(broadcastId).map(_.data.next()) match {
+      blockManager.getLocalValues(broadcastId).map(_.data.next()) match {
         case Some(x) =>
           releaseLock(broadcastId)
           x.asInstanceOf[T]
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index b5adbd88a2c2388dcc936513028cb80794224546..e89b03e38b468ad51410f5eacadf29540474a853 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
 import javax.annotation.concurrent.GuardedBy
 
 import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.storage.{BlockId, MemoryStore}
+import org.apache.spark.storage.BlockId
+import org.apache.spark.storage.memory.MemoryStore
 import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.unsafe.memory.MemoryAllocator
 
@@ -113,6 +114,7 @@ private[spark] abstract class MemoryManager(
 
   /**
    * Release all memory for the given task and mark it as inactive (e.g. when a task ends).
+   *
    * @return the number of bytes freed.
    */
   private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index 6a88966f60d23f1638f9ddd568f87047d9769847..1d376adf1a12ef9cec5a70057714a7e06d41a438 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
 import javax.annotation.concurrent.GuardedBy
 
 import org.apache.spark.Logging
-import org.apache.spark.storage.{BlockId, MemoryStore}
+import org.apache.spark.storage.BlockId
+import org.apache.spark.storage.memory.MemoryStore
 
 /**
  * Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage
@@ -55,6 +56,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
 
   /**
    * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
+ *
    * @return whether all N bytes were successfully granted.
    */
   def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b38e2ec57fe323105d05017142da9f31585c361f..873330e136e224c033c124dbbc8b8e20cc1b7649 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -40,24 +40,15 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.serializer.{Serializer, SerializerInstance}
 import org.apache.spark.shuffle.ShuffleManager
+import org.apache.spark.storage.memory._
 import org.apache.spark.util._
 
-private[spark] sealed trait BlockValues
-private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
-private[spark] case class IteratorValues(iterator: () => Iterator[Any]) extends BlockValues
-
 /* Class for returning a fetched block and associated metrics. */
 private[spark] class BlockResult(
     val data: Iterator[Any],
     val readMethod: DataReadMethod.Value,
     val bytes: Long)
 
-// Class for representing return value of doPut()
-private sealed trait DoPutResult
-private case object DoPutSucceeded extends DoPutResult
-private case object DoPutBytesFailed extends DoPutResult
-private case class DoPutIteratorFailed(iter: Iterator[Any]) extends DoPutResult
-
 /**
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
@@ -78,7 +69,15 @@ private[spark] class BlockManager(
     numUsableCores: Int)
   extends BlockDataManager with Logging {
 
-  val diskBlockManager = new DiskBlockManager(this, conf)
+  private[spark] val externalShuffleServiceEnabled =
+    conf.getBoolean("spark.shuffle.service.enabled", false)
+
+  val diskBlockManager = {
+    // Only perform cleanup if an external service is not serving our shuffle files.
+    val deleteFilesOnStop =
+      !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
+    new DiskBlockManager(conf, deleteFilesOnStop)
+  }
 
   private[storage] val blockInfoManager = new BlockInfoManager
 
@@ -86,8 +85,8 @@ private[spark] class BlockManager(
     ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
 
   // Actual storage of where blocks are kept
-  private[spark] val memoryStore = new MemoryStore(this, memoryManager)
-  private[spark] val diskStore = new DiskStore(this, diskBlockManager)
+  private[spark] val memoryStore = new MemoryStore(conf, this, memoryManager)
+  private[spark] val diskStore = new DiskStore(conf, diskBlockManager)
   memoryManager.setMemoryStore(memoryStore)
 
   // Note: depending on the memory manager, `maxStorageMemory` may actually vary over time.
@@ -96,9 +95,6 @@ private[spark] class BlockManager(
   // to revisit whether reporting this value as the "max" is intuitive to the user.
   private val maxMemory = memoryManager.maxStorageMemory
 
-  private[spark]
-  val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
-
   // Port used by the external shuffle service. In Yarn mode, this may be already be
   // set through the Hadoop configuration as the server is launched in the Yarn NM.
   private val externalShuffleServicePort = {
@@ -285,13 +281,9 @@ private[spark] class BlockManager(
     if (blockId.isShuffle) {
       shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
     } else {
-      val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
-        .asInstanceOf[Option[ByteBuffer]]
-      if (blockBytesOpt.isDefined) {
-        val buffer = blockBytesOpt.get
-        new BlockManagerManagedBuffer(this, blockId, buffer)
-      } else {
-        throw new BlockNotFoundException(blockId.toString)
+      getLocalBytes(blockId) match {
+        case Some(buffer) => new BlockManagerManagedBuffer(this, blockId, buffer)
+        case None => throw new BlockNotFoundException(blockId.toString)
       }
     }
   }
@@ -407,11 +399,71 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Get block from local block manager.
+   * Get block from local block manager as an iterator of Java objects.
    */
-  def getLocal(blockId: BlockId): Option[BlockResult] = {
+  def getLocalValues(blockId: BlockId): Option[BlockResult] = {
     logDebug(s"Getting local block $blockId")
-    doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
+    blockInfoManager.lockForReading(blockId) match {
+      case None =>
+        logDebug(s"Block $blockId was not found")
+        None
+      case Some(info) =>
+        val level = info.level
+        logDebug(s"Level for block $blockId is $level")
+        if (level.useMemory && memoryStore.contains(blockId)) {
+          val iter: Iterator[Any] = if (level.deserialized) {
+            memoryStore.getValues(blockId).get
+          } else {
+            dataDeserialize(blockId, memoryStore.getBytes(blockId).get)
+          }
+          val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
+          Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
+        } else if (level.useDisk && diskStore.contains(blockId)) {
+          val iterToReturn: Iterator[Any] = {
+            val diskBytes = diskStore.getBytes(blockId)
+            if (level.deserialized) {
+              val diskIterator = dataDeserialize(blockId, diskBytes)
+              if (level.useMemory) {
+                // Cache the values before returning them
+                memoryStore.putIterator(blockId, diskIterator, level) match {
+                  case Left(iter) =>
+                    // The memory store put() failed, so it returned the iterator back to us:
+                    iter
+                  case Right(_) =>
+                    // The put() succeeded, so we can read the values back:
+                    memoryStore.getValues(blockId).get
+                }
+              } else {
+                diskIterator
+              }
+            } else { // storage level is serialized
+              if (level.useMemory) {
+                // Cache the bytes back into memory to speed up subsequent reads.
+                val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => {
+                  // https://issues.apache.org/jira/browse/SPARK-6076
+                  // If the file size is bigger than the free memory, OOM will happen. So if we
+                  // cannot put it into MemoryStore, copyForMemory should not be created. That's why
+                  // this action is put into a `() => ByteBuffer` and created lazily.
+                  val copyForMemory = ByteBuffer.allocate(diskBytes.limit)
+                  copyForMemory.put(diskBytes)
+                })
+                if (putSucceeded) {
+                  dataDeserialize(blockId, memoryStore.getBytes(blockId).get)
+                } else {
+                  dataDeserialize(blockId, diskBytes)
+                }
+              } else {
+                dataDeserialize(blockId, diskBytes)
+              }
+            }
+          }
+          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
+          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+        } else {
+          releaseLock(blockId)
+          throw new SparkException(s"Block $blockId was not found even though it's read-locked")
+        }
+    }
   }
 
   /**
@@ -428,77 +480,44 @@ private[spark] class BlockManager(
       Option(
         shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
     } else {
-      doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
-    }
-  }
-
-  private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
-    blockInfoManager.lockForReading(blockId) match {
-      case None =>
-        logDebug(s"Block $blockId was not found")
-        None
-      case Some(info) =>
-        doGetLocal(blockId, info, asBlockResult)
+      blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
     }
   }
 
   /**
-   * Get a local block from the block manager.
-   * Assumes that the caller holds a read lock on the block.
+   * Get block from the local block manager as serialized bytes.
+   *
+   * Must be called while holding a read lock on the block.
+   * Releases the read lock upon exception; keeps the read lock upon successful return.
    */
-  private def doGetLocal(
-      blockId: BlockId,
-      info: BlockInfo,
-      asBlockResult: Boolean): Option[Any] = {
+  private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ByteBuffer = {
     val level = info.level
     logDebug(s"Level for block $blockId is $level")
-
-    // Look for the block in memory
-    if (level.useMemory) {
-      logDebug(s"Getting block $blockId from memory")
-      val result = if (asBlockResult) {
-        memoryStore.getValues(blockId).map { iter =>
-          val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
-          new BlockResult(ci, DataReadMethod.Memory, info.size)
-        }
+    // In order, try to read the serialized bytes from memory, then from disk, then fall back to
+    // serializing in-memory objects, and, finally, throw an exception if the block does not exist.
+    if (level.deserialized) {
+      // Try to avoid expensive serialization by reading a pre-serialized copy from disk:
+      if (level.useDisk && diskStore.contains(blockId)) {
+        // Note: we purposely do not try to put the block back into memory here. Since this branch
+        // handles deserialized blocks, this block may only be cached in memory as objects, not
+        // serialized bytes. Because the caller only requested bytes, it doesn't make sense to
+        // cache the block's deserialized objects since that caching may not have a payoff.
+        diskStore.getBytes(blockId)
+      } else if (level.useMemory && memoryStore.contains(blockId)) {
+        // The block was not found on disk, so serialize an in-memory copy:
+        dataSerialize(blockId, memoryStore.getValues(blockId).get)
       } else {
-        memoryStore.getBytes(blockId)
-      }
-      result match {
-        case Some(values) =>
-          return result
-        case None =>
-          logDebug(s"Block $blockId not found in memory")
-      }
-    }
-
-    // Look for block on disk, potentially storing it back in memory if required
-    if (level.useDisk) {
-      logDebug(s"Getting block $blockId from disk")
-      val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
-        case Some(b) => b
-        case None =>
-          releaseLock(blockId)
-          throw new BlockException(
-            blockId, s"Block $blockId not found on disk, though it should be")
+        releaseLock(blockId)
+        throw new SparkException(s"Block $blockId was not found even though it's read-locked")
       }
-      assert(0 == bytes.position())
-
-      if (!level.useMemory) {
-        // If the block shouldn't be stored in memory, we can just return it
-        if (asBlockResult) {
-          val iter = CompletionIterator[Any, Iterator[Any]](
-            dataDeserialize(blockId, bytes), releaseLock(blockId))
-          return Some(new BlockResult(iter, DataReadMethod.Disk, info.size))
-        } else {
-          return Some(bytes)
-        }
-      } else {
-        // Otherwise, we also have to store something in the memory store
-        if (!level.deserialized && !asBlockResult) {
-          /* We'll store the bytes in memory if the block's storage level includes
-           * "memory serialized" and we requested its serialized bytes. */
-          memoryStore.putBytes(blockId, bytes.limit, () => {
+    } else {  // storage level is serialized
+      if (level.useMemory && memoryStore.contains(blockId)) {
+        memoryStore.getBytes(blockId).get
+      } else if (level.useDisk && diskStore.contains(blockId)) {
+        val bytes = diskStore.getBytes(blockId)
+        if (level.useMemory) {
+          // Cache the bytes back into memory to speed up subsequent reads.
+          val memoryStorePutSucceeded = memoryStore.putBytes(blockId, bytes.limit(), () => {
             // https://issues.apache.org/jira/browse/SPARK-6076
             // If the file size is bigger than the free memory, OOM will happen. So if we cannot
             // put it into MemoryStore, copyForMemory should not be created. That's why this
@@ -506,39 +525,19 @@ private[spark] class BlockManager(
             val copyForMemory = ByteBuffer.allocate(bytes.limit)
             copyForMemory.put(bytes)
           })
-          bytes.rewind()
-        }
-        if (!asBlockResult) {
-          return Some(bytes)
-        } else {
-          val values = dataDeserialize(blockId, bytes)
-          val valuesToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              // Cache the values before returning them
-              memoryStore.putIterator(blockId, values, level, allowPersistToDisk = false) match {
-                case Left(iter) =>
-                  // The memory store put() failed, so it returned the iterator back to us:
-                  iter
-                case Right(_) =>
-                  // The put() succeeded, so we can read the values back:
-                  memoryStore.getValues(blockId).get
-              }
-            } else {
-              values
-            }
+          if (memoryStorePutSucceeded) {
+            memoryStore.getBytes(blockId).get
+          } else {
+            bytes.rewind()
+            bytes
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](valuesToReturn, releaseLock(blockId))
-          return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+        } else {
+          bytes
         }
+      } else {
+        releaseLock(blockId)
+        throw new SparkException(s"Block $blockId was not found even though it's read-locked")
       }
-    } else {
-      // This branch represents a case where the BlockInfoManager contained an entry for
-      // the block but the block could not be found in any of the block stores. This case
-      // should never occur, but for completeness's sake we address it here.
-      logError(
-        s"Block $blockId is supposedly stored locally but was not found in any block store")
-      releaseLock(blockId)
-      None
     }
   }
 
@@ -547,17 +546,10 @@ private[spark] class BlockManager(
    *
    * This does not acquire a lock on this block in this JVM.
    */
-  def getRemote(blockId: BlockId): Option[BlockResult] = {
-    logDebug(s"Getting remote block $blockId")
-    doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
-  }
-
-  /**
-   * Get block from remote block managers as serialized bytes.
-   */
-  def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
-    logDebug(s"Getting remote block $blockId as bytes")
-    doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
+  def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+    getRemoteBytes(blockId).map { data =>
+      new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit())
+    }
   }
 
   /**
@@ -570,7 +562,11 @@ private[spark] class BlockManager(
     preferredLocs ++ otherLocs
   }
 
-  private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
+  /**
+   * Get block from remote block managers as serialized bytes.
+   */
+  def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
+    logDebug(s"Getting remote block $blockId")
     require(blockId != null, "BlockId is null")
     val locations = getLocations(blockId)
     var numFetchFailures = 0
@@ -595,14 +591,7 @@ private[spark] class BlockManager(
       }
 
       if (data != null) {
-        if (asBlockResult) {
-          return Some(new BlockResult(
-            dataDeserialize(blockId, data),
-            DataReadMethod.Network,
-            data.limit()))
-        } else {
-          return Some(data)
-        }
+        return Some(data)
       }
       logDebug(s"The value of block $blockId is null")
     }
@@ -618,12 +607,12 @@ private[spark] class BlockManager(
    * automatically be freed once the result's `data` iterator is fully consumed.
    */
   def get(blockId: BlockId): Option[BlockResult] = {
-    val local = getLocal(blockId)
+    val local = getLocalValues(blockId)
     if (local.isDefined) {
       logInfo(s"Found block $blockId locally")
       return local
     }
-    val remote = getRemote(blockId)
+    val remote = getRemoteValues(blockId)
     if (remote.isDefined) {
       logInfo(s"Found block $blockId remotely")
       return remote
@@ -673,24 +662,26 @@ private[spark] class BlockManager(
       level: StorageLevel,
       makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = {
     // Initially we hold no locks on this block.
-    doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) match {
-      case DoPutSucceeded =>
+    doPutIterator(blockId, makeIterator, level, keepReadLock = true) match {
+      case None =>
         // doPut() didn't hand work back to us, so the block already existed or was successfully
         // stored. Therefore, we now hold a read lock on the block.
-        val blockResult = get(blockId).getOrElse {
+        val blockResult = getLocalValues(blockId).getOrElse {
           // Since we held a read lock between the doPut() and get() calls, the block should not
           // have been evicted, so get() not returning the block indicates some internal error.
           releaseLock(blockId)
           throw new SparkException(s"get() failed for block $blockId even though we held a lock")
         }
+        // We already hold a read lock on the block from the doPut() call and getLocalValues()
+        // acquires the lock again, so we need to call releaseLock() here so that the net number
+        // of lock acquisitions is 1 (since the caller will only call release() once).
+        releaseLock(blockId)
         Left(blockResult)
-      case DoPutIteratorFailed(iter) =>
+      case Some(iter) =>
         // The put failed, likely because the data was too large to fit in memory and could not be
         // dropped to disk. Therefore, we need to pass the input iterator back to the caller so
         // that they can decide what to do with the values (e.g. process them without caching).
        Right(iter)
-      case DoPutBytesFailed =>
-        throw new SparkException("doPut returned an invalid failure response")
     }
   }
 
@@ -701,16 +692,10 @@ private[spark] class BlockManager(
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,
-      tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
+      tellMaster: Boolean = true): Boolean = {
     require(values != null, "Values is null")
-    val result = doPut(
-      blockId,
-      IteratorValues(() => values),
-      level,
-      tellMaster,
-      effectiveStorageLevel)
-    result == DoPutSucceeded
+    // If doPut() didn't hand work back to us, then block already existed or was successfully stored
+    doPutIterator(blockId, () => values, level, tellMaster).isEmpty
   }
 
   /**
@@ -739,46 +724,105 @@ private[spark] class BlockManager(
       blockId: BlockId,
       bytes: ByteBuffer,
       level: StorageLevel,
-      tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
+      tellMaster: Boolean = true): Boolean = {
     require(bytes != null, "Bytes is null")
-    val result = doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
-    result == DoPutSucceeded
+    doPutBytes(blockId, bytes, level, tellMaster)
   }
 
   /**
-   * Put the given block according to the given level in one of the block stores, replicating
+   * Put the given bytes according to the given level in one of the block stores, replicating
    * the values if necessary.
    *
    * If the block already exists, this method will not overwrite it.
    *
-   * @param effectiveStorageLevel the level according to which the block will actually be handled.
-   *                              This allows the caller to specify an alternate behavior of doPut
-   *                              while preserving the original level specified by the user.
    * @param keepReadLock if true, this method will hold the read lock when it returns (even if the
    *                     block already exists). If false, this method will hold no locks when it
    *                     returns.
-   * @return [[DoPutSucceeded]] if the block was already present or if the put succeeded, or
-   *        [[DoPutBytesFailed]] if the put failed and we were storing bytes, or
-   *        [[DoPutIteratorFailed]] if the put failed and we were storing an iterator.
+   * @return true if the block was already present or if the put succeeded, false otherwise.
    */
-  private def doPut(
+  private def doPutBytes(
       blockId: BlockId,
-      data: BlockValues,
+      bytes: ByteBuffer,
       level: StorageLevel,
       tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None,
-      keepReadLock: Boolean = false): DoPutResult = {
+      keepReadLock: Boolean = false): Boolean = {
+    doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo =>
+      val startTimeMs = System.currentTimeMillis
+      // Since we're storing bytes, initiate the replication before storing them locally.
+      // This is faster as data is already serialized and ready to send.
+      val replicationFuture = if (level.replication > 1) {
+        // Duplicate doesn't copy the bytes, but just creates a wrapper
+        val bufferView = bytes.duplicate()
+        Future {
+          // This is a blocking action and should run in futureExecutionContext which is a cached
+          // thread pool
+          replicate(blockId, bufferView, level)
+        }(futureExecutionContext)
+      } else {
+        null
+      }
+
+      bytes.rewind()
+      val size = bytes.limit()
+
+      if (level.useMemory) {
+        // Put it in memory first, even if it also has useDisk set to true;
+        // We will drop it to disk later if the memory store can't hold it.
+        val putSucceeded = if (level.deserialized) {
+          val values = dataDeserialize(blockId, bytes.duplicate())
+          memoryStore.putIterator(blockId, values, level).isRight
+        } else {
+          memoryStore.putBytes(blockId, size, () => bytes)
+        }
+        if (!putSucceeded && level.useDisk) {
+          logWarning(s"Persisting block $blockId to disk instead.")
+          diskStore.putBytes(blockId, bytes)
+        }
+      } else if (level.useDisk) {
+        diskStore.putBytes(blockId, bytes)
+      }
+
+      val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+      val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+      if (blockWasSuccessfullyStored) {
+        // Now that the block is in either the memory, externalBlockStore, or disk store,
+        // tell the master about it.
+        putBlockInfo.size = size
+        if (tellMaster) {
+          reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+        }
+        Option(TaskContext.get()).foreach { c =>
+          c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
+        }
+      }
+      logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
+      if (level.replication > 1) {
+        // Wait for asynchronous replication to finish
+        Await.ready(replicationFuture, Duration.Inf)
+      }
+      if (blockWasSuccessfullyStored) {
+        None
+      } else {
+        Some(bytes)
+      }
+    }.isEmpty
+  }
+
+  /**
+   * Helper method used to abstract common code from [[doPutBytes()]] and [[doPutIterator()]].
+   *
+   * @param putBody a function which attempts the actual put() and returns None on success
+   *                or Some on failure.
+   */
+  private def doPut[T](
+      blockId: BlockId,
+      level: StorageLevel,
+      tellMaster: Boolean,
+      keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = {
 
     require(blockId != null, "BlockId is null")
     require(level != null && level.isValid, "StorageLevel is null or invalid")
-    effectiveStorageLevel.foreach { level =>
-      require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
-    }
 
-    /* Remember the block's storage level so that we can correctly drop it to disk if it needs
-     * to be dropped right after it got put into memory. Note, however, that other threads will
-     * not be able to get() this block until we call markReady on its BlockInfo. */
     val putBlockInfo = {
       val newInfo = new BlockInfo(level, tellMaster)
       if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
@@ -789,138 +833,113 @@ private[spark] class BlockManager(
           // lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
           releaseLock(blockId)
         }
-        return DoPutSucceeded
+        return None
       }
     }
 
     val startTimeMs = System.currentTimeMillis
-
-    // Size of the block in bytes
-    var size = 0L
-
-    // The level we actually use to put the block
-    val putLevel = effectiveStorageLevel.getOrElse(level)
-
-    // If we're storing bytes, then initiate the replication before storing them locally.
-    // This is faster as data is already serialized and ready to send.
-    val replicationFuture = data match {
-      case b: ByteBufferValues if putLevel.replication > 1 =>
-        // Duplicate doesn't copy the bytes, but just creates a wrapper
-        val bufferView = b.buffer.duplicate()
-        Future {
-          // This is a blocking action and should run in futureExecutionContext which is a cached
-          // thread pool
-          replicate(blockId, bufferView, putLevel)
-        }(futureExecutionContext)
-      case _ => null
-    }
-
-    var blockWasSuccessfullyStored = false
-    var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
-
-    putBlockInfo.synchronized {
-      logTrace("Put for block %s took %s to get into synchronized block"
-        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
-
-      try {
-        if (putLevel.useMemory) {
-          // Put it in memory first, even if it also has useDisk set to true;
-          // We will drop it to disk later if the memory store can't hold it.
-          data match {
-            case IteratorValues(iterator) =>
-              memoryStore.putIterator(blockId, iterator(), putLevel) match {
-                case Right(s) =>
-                  size = s
-                case Left(iter) =>
-                  iteratorFromFailedMemoryStorePut = Some(iter)
-              }
-            case ByteBufferValues(bytes) =>
-              bytes.rewind()
-              size = bytes.limit()
-              memoryStore.putBytes(blockId, bytes, putLevel)
-          }
-        } else if (putLevel.useDisk) {
-          data match {
-            case IteratorValues(iterator) =>
-              diskStore.putIterator(blockId, iterator(), putLevel) match {
-                case Right(s) =>
-                  size = s
-                // putIterator() will never return Left (see its return type).
-              }
-            case ByteBufferValues(bytes) =>
-              bytes.rewind()
-              size = bytes.limit()
-              diskStore.putBytes(blockId, bytes, putLevel)
-          }
+    var blockWasSuccessfullyStored: Boolean = false
+    val result: Option[T] = try {
+      val res = putBody(putBlockInfo)
+      blockWasSuccessfullyStored = res.isEmpty
+      res
+    } finally {
+      if (blockWasSuccessfullyStored) {
+        if (keepReadLock) {
+          blockInfoManager.downgradeLock(blockId)
         } else {
-          assert(putLevel == StorageLevel.NONE)
-          throw new BlockException(
-            blockId, s"Attempted to put block $blockId without specifying storage level!")
+          blockInfoManager.unlock(blockId)
         }
-
-        val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
-        blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
-        if (blockWasSuccessfullyStored) {
-          // Now that the block is in either the memory, externalBlockStore, or disk store,
-          // let other threads read it, and tell the master about it.
-          putBlockInfo.size = size
-          if (tellMaster) {
-            reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
-          }
-          Option(TaskContext.get()).foreach { c =>
-            c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
-          }
-        }
-      } finally {
-        if (blockWasSuccessfullyStored) {
-          if (keepReadLock) {
-            blockInfoManager.downgradeLock(blockId)
-          } else {
-            blockInfoManager.unlock(blockId)
-          }
-        } else {
-          blockInfoManager.removeBlock(blockId)
-          logWarning(s"Putting block $blockId failed")
-        }
-      }
-    }
-    logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
-
-    if (replicationFuture != null) {
-      // Wait for asynchronous replication to finish
-      Await.ready(replicationFuture, Duration.Inf)
-    } else if (putLevel.replication > 1 && blockWasSuccessfullyStored) {
-      val remoteStartTime = System.currentTimeMillis
-      val bytesToReplicate: ByteBuffer = {
-        doGetLocal(blockId, putBlockInfo, asBlockResult = false)
-          .map(_.asInstanceOf[ByteBuffer])
-          .getOrElse {
-            throw new SparkException(s"Block $blockId was not found even though it was just stored")
-          }
-      }
-      try {
-        replicate(blockId, bytesToReplicate, putLevel)
-      } finally {
-        BlockManager.dispose(bytesToReplicate)
+      } else {
+        blockInfoManager.removeBlock(blockId)
+        logWarning(s"Putting block $blockId failed")
       }
-      logDebug("Put block %s remotely took %s"
-        .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
     }
-
-    if (putLevel.replication > 1) {
+    if (level.replication > 1) {
       logDebug("Putting block %s with replication took %s"
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     } else {
       logDebug("Putting block %s without replication took %s"
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     }
+    result
+  }
 
-    if (blockWasSuccessfullyStored) {
-      DoPutSucceeded
-    } else if (iteratorFromFailedMemoryStorePut.isDefined) {
-      DoPutIteratorFailed(iteratorFromFailedMemoryStorePut.get)
-    } else {
-      DoPutBytesFailed
+  /**
+   * Put the given block according to the given level in one of the block stores, replicating
+   * the values if necessary.
+   *
+   * If the block already exists, this method will not overwrite it.
+   *
+   * @param keepReadLock if true, this method will hold the read lock when it returns (even if the
+   *                     block already exists). If false, this method will hold no locks when it
+   *                     returns.
+   * @return None if the block was already present or if the put succeeded, or Some(iterator)
+   *         if the put failed.
+   */
+  private def doPutIterator(
+      blockId: BlockId,
+      iterator: () => Iterator[Any],
+      level: StorageLevel,
+      tellMaster: Boolean = true,
+      keepReadLock: Boolean = false): Option[Iterator[Any]] = {
+    doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo =>
+      val startTimeMs = System.currentTimeMillis
+      var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
+      // Size of the block in bytes
+      var size = 0L
+      if (level.useMemory) {
+        // Put it in memory first, even if it also has useDisk set to true;
+        // We will drop it to disk later if the memory store can't hold it.
+        memoryStore.putIterator(blockId, iterator(), level) match {
+          case Right(s) =>
+            size = s
+          case Left(iter) =>
+            // Not enough space to unroll this block; drop to disk if applicable
+            if (level.useDisk) {
+              logWarning(s"Persisting block $blockId to disk instead.")
+              diskStore.put(blockId) { fileOutputStream =>
+                dataSerializeStream(blockId, fileOutputStream, iter)
+              }
+              size = diskStore.getSize(blockId)
+            } else {
+              iteratorFromFailedMemoryStorePut = Some(iter)
+            }
+        }
+      } else if (level.useDisk) {
+        diskStore.put(blockId) { fileOutputStream =>
+          dataSerializeStream(blockId, fileOutputStream, iterator())
+        }
+        size = diskStore.getSize(blockId)
+      }
+
+      val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+      val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+      if (blockWasSuccessfullyStored) {
+        // Now that the block is in either the memory, externalBlockStore, or disk store,
+        // tell the master about it.
+        putBlockInfo.size = size
+        if (tellMaster) {
+          reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+        }
+        Option(TaskContext.get()).foreach { c =>
+          c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
+        }
+        logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
+        if (level.replication > 1) {
+          val remoteStartTime = System.currentTimeMillis
+          val bytesToReplicate = doGetLocalBytes(blockId, putBlockInfo)
+          try {
+            replicate(blockId, bytesToReplicate, level)
+          } finally {
+            BlockManager.dispose(bytesToReplicate)
+          }
+          logDebug("Put block %s remotely took %s"
+            .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
+        }
+      }
+      assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
+      iteratorFromFailedMemoryStorePut
     }
   }
 
@@ -1077,9 +1096,11 @@ private[spark] class BlockManager(
       logInfo(s"Writing block $blockId to disk")
       data() match {
         case Left(elements) =>
-          diskStore.putIterator(blockId, elements.toIterator, level)
+          diskStore.put(blockId) { fileOutputStream =>
+            dataSerializeStream(blockId, fileOutputStream, elements.toIterator)
+          }
         case Right(bytes) =>
-          diskStore.putBytes(blockId, bytes, level)
+          diskStore.putBytes(blockId, bytes)
       }
       blockIsUpdated = true
     }
@@ -1229,7 +1250,6 @@ private[spark] class BlockManager(
     rpcEnv.stop(slaveEndpoint)
     blockInfoManager.clear()
     memoryStore.clear()
-    diskStore.clear()
     futureExecutionContext.shutdownNow()
     logInfo("BlockManager stopped")
   }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
deleted file mode 100644
index b069918b16106740edc6b492401a063b9f74ef10..0000000000000000000000000000000000000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.Logging
-
-/**
- * Abstract class to store blocks.
- */
-private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging {
-
-  def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): Unit
-
-  /**
-   * Attempt to store an iterator of values.
-   *
-   * @return an iterator of values (in case the put failed), or the estimated size of the stored
-   *         values if the put succeeded.
-   */
-  def putIterator(
-      blockId: BlockId,
-      values: Iterator[Any],
-      level: StorageLevel): Either[Iterator[Any], Long]
-
-  /**
-   * Return the size of a block in bytes.
-   */
-  def getSize(blockId: BlockId): Long
-
-  def getBytes(blockId: BlockId): Option[ByteBuffer]
-
-  def getValues(blockId: BlockId): Option[Iterator[Any]]
-
-  /**
-   * Remove a block, if it exists.
-   *
-   * @param blockId the block to remove.
-   * @return True if the block was found and removed, False otherwise.
-   * @throws IllegalStateException if the block is pinned by a task.
-   */
-  def remove(blockId: BlockId): Boolean
-
-  def contains(blockId: BlockId): Boolean
-
-  def clear() { }
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 4daf22f71415e0a57609f21fc5173da433f1fd68..e51d96e57bc6f39f051ba84cee466b25fcc42d53 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -26,18 +26,14 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 /**
  * Creates and maintains the logical mapping between logical blocks and physical on-disk
- * locations. By default, one block is mapped to one file with a name given by its BlockId.
- * However, it is also possible to have a block map to only a segment of a file, by calling
- * mapBlockToFileSegment().
+ * locations. One block is mapped to one file with a name given by its BlockId.
  *
  * Block files are hashed among the directories listed in spark.local.dir (or in
  * SPARK_LOCAL_DIRS, if it's set).
  */
-private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkConf)
-  extends Logging {
+private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging {
 
-  private[spark]
-  val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64)
+  private[spark] val subDirsPerLocalDir = conf.getInt("spark.diskStore.subDirectories", 64)
 
   /* Create one local directory for each path mentioned in spark.local.dir; then, inside this
    * directory, create multiple subdirectories that we will hash files into, in order to avoid
@@ -163,10 +159,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
   }
 
   private def doStop(): Unit = {
-    // Only perform cleanup if an external service is not serving our shuffle files.
-    // Also blockManagerId could be null if block manager is not initialized properly.
-    if (!blockManager.externalShuffleServiceEnabled ||
-      (blockManager.blockManagerId != null && blockManager.blockManagerId.isDriver)) {
+    if (deleteFilesOnStop) {
       localDirs.foreach { localDir =>
         if (localDir.isDirectory() && localDir.exists()) {
           try {
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index e35aa1b0684da15678c8757a0ed453bb177ac0bd..caecd97a0b722db8cbe43aa5880e7abdf2565615 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -17,112 +17,100 @@
 
 package org.apache.spark.storage
 
-import java.io.{File, FileOutputStream, IOException, RandomAccessFile}
+import java.io.{FileOutputStream, IOException, RandomAccessFile}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel.MapMode
 
-import org.apache.spark.Logging
+import com.google.common.io.Closeables
+
+import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.util.Utils
 
 /**
  * Stores BlockManager blocks on disk.
  */
-private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
-  extends BlockStore(blockManager) with Logging {
+private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging {
 
-  val minMemoryMapBytes = blockManager.conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+  private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
 
-  override def getSize(blockId: BlockId): Long = {
+  def getSize(blockId: BlockId): Long = {
     diskManager.getFile(blockId.name).length
   }
 
-  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
-    // So that we do not modify the input offsets !
-    // duplicate does not copy buffer, so inexpensive
-    val bytes = _bytes.duplicate()
+  /**
+   * Invokes the provided callback function to write the specific block.
+   *
+   * @throws IllegalStateException if the block already exists in the disk store.
+   */
+  def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
+    if (contains(blockId)) {
+      throw new IllegalStateException(s"Block $blockId is already present in the disk store")
+    }
     logDebug(s"Attempting to put block $blockId")
     val startTime = System.currentTimeMillis
     val file = diskManager.getFile(blockId)
-    val channel = new FileOutputStream(file).getChannel
-    Utils.tryWithSafeFinally {
-      while (bytes.remaining > 0) {
-        channel.write(bytes)
+    val fileOutputStream = new FileOutputStream(file)
+    var threwException: Boolean = true
+    try {
+      writeFunc(fileOutputStream)
+      threwException = false
+    } finally {
+      try {
+        Closeables.close(fileOutputStream, threwException)
+      } finally {
+         if (threwException) {
+          remove(blockId)
+        }
       }
-    } {
-      channel.close()
     }
     val finishTime = System.currentTimeMillis
     logDebug("Block %s stored as %s file on disk in %d ms".format(
-      file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
+      file.getName,
+      Utils.bytesToString(file.length()),
+      finishTime - startTime))
   }
 
-  override def putIterator(
-      blockId: BlockId,
-      values: Iterator[Any],
-      level: StorageLevel): Right[Iterator[Any], Long] = {
-    logDebug(s"Attempting to write values for block $blockId")
-    val startTime = System.currentTimeMillis
-    val file = diskManager.getFile(blockId)
-    val outputStream = new FileOutputStream(file)
-    try {
+  def putBytes(blockId: BlockId, _bytes: ByteBuffer): Unit = {
+    // So that we do not modify the input offsets !
+    // duplicate does not copy buffer, so inexpensive
+    val bytes = _bytes.duplicate()
+    put(blockId) { fileOutputStream =>
+      val channel = fileOutputStream.getChannel
       Utils.tryWithSafeFinally {
-        blockManager.dataSerializeStream(blockId, outputStream, values)
+        while (bytes.remaining > 0) {
+          channel.write(bytes)
+        }
       } {
-        // Close outputStream here because it should be closed before file is deleted.
-        outputStream.close()
+        channel.close()
       }
-    } catch {
-      case e: Throwable =>
-        if (file.exists()) {
-          if (!file.delete()) {
-            logWarning(s"Error deleting ${file}")
-          }
-        }
-        throw e
     }
-
-    val length = file.length
-
-    val timeTaken = System.currentTimeMillis - startTime
-    logDebug("Block %s stored as %s file on disk in %d ms".format(
-      file.getName, Utils.bytesToString(length), timeTaken))
-
-    Right(length)
   }
 
-  private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
+  def getBytes(blockId: BlockId): ByteBuffer = {
+    val file = diskManager.getFile(blockId.name)
     val channel = new RandomAccessFile(file, "r").getChannel
     Utils.tryWithSafeFinally {
       // For small files, directly read rather than memory map
-      if (length < minMemoryMapBytes) {
-        val buf = ByteBuffer.allocate(length.toInt)
-        channel.position(offset)
+      if (file.length < minMemoryMapBytes) {
+        val buf = ByteBuffer.allocate(file.length.toInt)
+        channel.position(0)
         while (buf.remaining() != 0) {
           if (channel.read(buf) == -1) {
             throw new IOException("Reached EOF before filling buffer\n" +
-              s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+              s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
           }
         }
         buf.flip()
-        Some(buf)
+        buf
       } else {
-        Some(channel.map(MapMode.READ_ONLY, offset, length))
+        channel.map(MapMode.READ_ONLY, 0, file.length)
       }
     } {
       channel.close()
     }
   }
 
-  override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
-    val file = diskManager.getFile(blockId.name)
-    getBytes(file, 0, file.length)
-  }
-
-  override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
-    getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
-  }
-
-  override def remove(blockId: BlockId): Boolean = {
+  def remove(blockId: BlockId): Boolean = {
     val file = diskManager.getFile(blockId.name)
     if (file.exists()) {
       val ret = file.delete()
@@ -135,7 +123,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
     }
   }
 
-  override def contains(blockId: BlockId): Boolean = {
+  def contains(blockId: BlockId): Boolean = {
     val file = diskManager.getFile(blockId.name)
     file.exists()
   }
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
similarity index 76%
rename from core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
rename to core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index bb72fe4bcafbeb95060f25eef50a6021a441b2f1..a80b2357ff9110abd02daf4d0ba927f79a892956 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.storage
+package org.apache.spark.storage.memory
 
 import java.nio.ByteBuffer
 import java.util.LinkedHashMap
@@ -23,8 +23,9 @@ import java.util.LinkedHashMap
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{Logging, SparkConf, TaskContext}
 import org.apache.spark.memory.MemoryManager
+import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel}
 import org.apache.spark.util.{SizeEstimator, Utils}
 import org.apache.spark.util.collection.SizeTrackingVector
 
@@ -34,13 +35,15 @@ private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
  * Stores blocks in memory, either as Arrays of deserialized Java objects or as
  * serialized ByteBuffers.
  */
-private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: MemoryManager)
-  extends BlockStore(blockManager) {
+private[spark] class MemoryStore(
+    conf: SparkConf,
+    blockManager: BlockManager,
+    memoryManager: MemoryManager)
+  extends Logging {
 
   // Note: all changes to memory allocations, notably putting blocks, evicting blocks, and
   // acquiring or releasing unroll memory, must be synchronized on `memoryManager`!
 
-  private val conf = blockManager.conf
   private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
 
   // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
@@ -81,32 +84,21 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
     memoryUsed - currentUnrollMemory
   }
 
-  override def getSize(blockId: BlockId): Long = {
+  def getSize(blockId: BlockId): Long = {
     entries.synchronized {
       entries.get(blockId).size
     }
   }
 
-  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
-    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
-    // Work on a duplicate - since the original input might be used elsewhere.
-    val bytes = _bytes.duplicate()
-    bytes.rewind()
-    if (level.deserialized) {
-      val values = blockManager.dataDeserialize(blockId, bytes)
-      putIterator(blockId, values, level)
-    } else {
-      tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
-    }
-  }
-
   /**
    * Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and
    * put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
    *
    * The caller should guarantee that `size` is correct.
+   *
+   * @return true if the put() succeeded, false otherwise.
    */
-  def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = {
+  def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Boolean = {
     require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
     // Work on a duplicate - since the original input might be used elsewhere.
     lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
@@ -114,89 +106,70 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
     if (putSuccess) {
       assert(bytes.limit == size)
     }
-  }
-
-  override def putIterator(
-      blockId: BlockId,
-      values: Iterator[Any],
-      level: StorageLevel): Either[Iterator[Any], Long] = {
-    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
-    putIterator(blockId, values, level, allowPersistToDisk = true)
+    putSuccess
   }
 
   /**
    * Attempt to put the given block in memory store.
    *
-   * There may not be enough space to fully unroll the iterator in memory, in which case we
-   * optionally drop the values to disk if
-   *   (1) the block's storage level specifies useDisk, and
-   *   (2) `allowPersistToDisk` is true.
-   *
-   * One scenario in which `allowPersistToDisk` is false is when the BlockManager reads a block
-   * back from disk and attempts to cache it in memory. In this case, we should not persist the
-   * block back on disk again, as it is already in disk store.
+   * @return the estimated size of the stored data if the put() succeeded, or an iterator
+   *         in case the put() failed (the returned iterator lets callers fall back to the disk
+   *         store if desired).
    */
   private[storage] def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
-      level: StorageLevel,
-      allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = {
+      level: StorageLevel): Either[Iterator[Any], Long] = {
     require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
     val unrolledValues = unrollSafely(blockId, values)
     unrolledValues match {
       case Left(arrayValues) =>
         // Values are fully unrolled in memory, so store them as an array
-        val size = {
-          if (level.deserialized) {
-            val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
-            tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)
-            sizeEstimate
+        if (level.deserialized) {
+          val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
+          if (tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)) {
+            Right(sizeEstimate)
           } else {
-            val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
-            tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
-            bytes.limit()
+            Left(arrayValues.toIterator)
           }
-        }
-        Right(size)
-      case Right(iteratorValues) =>
-        // Not enough space to unroll this block; drop to disk if applicable
-        if (level.useDisk && allowPersistToDisk) {
-          logWarning(s"Persisting block $blockId to disk instead.")
-          blockManager.diskStore.putIterator(blockId, iteratorValues, level)
         } else {
-          Left(iteratorValues)
+          val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
+          if (tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)) {
+            Right(bytes.limit())
+          } else {
+            Left(arrayValues.toIterator)
+          }
         }
+      case Right(iteratorValues) =>
+        Left(iteratorValues)
     }
   }
 
-  override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
+  def getBytes(blockId: BlockId): Option[ByteBuffer] = {
     val entry = entries.synchronized {
       entries.get(blockId)
     }
     if (entry == null) {
       None
-    } else if (entry.deserialized) {
-      Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator))
     } else {
+      require(!entry.deserialized, "should only call getBytes on blocks stored in serialized form")
       Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
     }
   }
 
-  override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
+  def getValues(blockId: BlockId): Option[Iterator[Any]] = {
     val entry = entries.synchronized {
       entries.get(blockId)
     }
     if (entry == null) {
       None
-    } else if (entry.deserialized) {
-      Some(entry.value.asInstanceOf[Array[Any]].iterator)
     } else {
-      val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
-      Some(blockManager.dataDeserialize(blockId, buffer))
+      require(entry.deserialized, "should only call getValues on deserialized blocks")
+      Some(entry.value.asInstanceOf[Array[Any]].iterator)
     }
   }
 
-  override def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
+  def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
     val entry = entries.synchronized {
       entries.remove(blockId)
     }
@@ -210,7 +183,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
     }
   }
 
-  override def clear(): Unit = memoryManager.synchronized {
+  def clear(): Unit = memoryManager.synchronized {
     entries.synchronized {
       entries.clear()
     }
@@ -323,14 +296,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
    * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
    * must also be passed by the caller.
    *
-   * `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be
-   * created to avoid OOM since it may be a big ByteBuffer.
-   *
-   * Synchronize on `memoryManager` to ensure that all the put requests and its associated block
-   * dropping is done by only on thread at a time. Otherwise while one thread is dropping
-   * blocks to free memory for one block, another thread may use up the freed space for
-   * another block.
-   *
    * @return whether put was successful.
    */
   private def tryToPut(
@@ -338,42 +303,33 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       value: () => Any,
       size: Long,
       deserialized: Boolean): Boolean = {
+    val acquiredEnoughStorageMemory = {
+      // Synchronize on memoryManager so that the pending unroll memory isn't stolen by another
+      // task.
+      memoryManager.synchronized {
+        // Note: if we have previously unrolled this block successfully, then pending unroll
+        // memory should be non-zero. This is the amount that we already reserved during the
+        // unrolling process. In this case, we can just reuse this space to cache our block.
+        // The synchronization on `memoryManager` here guarantees that the release and acquire
+        // happen atomically. This relies on the assumption that all memory acquisitions are
+        // synchronized on the same lock.
+        releasePendingUnrollMemoryForThisTask()
+        memoryManager.acquireStorageMemory(blockId, size)
+      }
+    }
 
-    /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
-     * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
-     * been released, it must be ensured that those to-be-dropped blocks are not double counted
-     * for freeing up more space for another block that needs to be put. Only then the actually
-     * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
-
-    memoryManager.synchronized {
-      // Note: if we have previously unrolled this block successfully, then pending unroll
-      // memory should be non-zero. This is the amount that we already reserved during the
-      // unrolling process. In this case, we can just reuse this space to cache our block.
-      // The synchronization on `memoryManager` here guarantees that the release and acquire
-      // happen atomically. This relies on the assumption that all memory acquisitions are
-      // synchronized on the same lock.
-      releasePendingUnrollMemoryForThisTask()
-      val enoughMemory = memoryManager.acquireStorageMemory(blockId, size)
-      if (enoughMemory) {
-        // We acquired enough memory for the block, so go ahead and put it
-        val entry = new MemoryEntry(value(), size, deserialized)
-        entries.synchronized {
-          entries.put(blockId, entry)
-        }
-        val valuesOrBytes = if (deserialized) "values" else "bytes"
-        logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
-          blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
-      } else {
-        // Tell the block manager that we couldn't put it in memory so that it can drop it to
-        // disk if the block allows disk storage.
-        lazy val data = if (deserialized) {
-          Left(value().asInstanceOf[Array[Any]])
-        } else {
-          Right(value().asInstanceOf[ByteBuffer].duplicate())
-        }
-        blockManager.dropFromMemory(blockId, () => data)
+    if (acquiredEnoughStorageMemory) {
+      // We acquired enough memory for the block, so go ahead and put it
+      val entry = new MemoryEntry(value(), size, deserialized)
+      entries.synchronized {
+        entries.put(blockId, entry)
       }
-      enoughMemory
+      val valuesOrBytes = if (deserialized) "values" else "bytes"
+      logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
+        blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
+      true
+    } else {
+      false
     }
   }
 
@@ -455,7 +411,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
     }
   }
 
-  override def contains(blockId: BlockId): Boolean = {
+  def contains(blockId: BlockId): Boolean = {
     entries.synchronized { entries.containsKey(blockId) }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 88fdbbdaec90273f9b9390bd66f08fcd0549a287..f97cfbba3265ba4986846df4babc5f270dca4fc7 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -37,7 +37,7 @@ class DummyBroadcastClass(rdd: RDD[Int]) extends Serializable {
     rdd.map { x =>
       val bm = SparkEnv.get.blockManager
       // Check if broadcast block was fetched
-      val isFound = bm.getLocal(BroadcastBlockId(bid)).isDefined
+      val isFound = bm.getLocalValues(BroadcastBlockId(bid)).isDefined
       (x, isFound)
     }.collect().toSet
   }
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index d9764c7c10983b38ab23ba9d129554c6ff1f3e1b..686e948b5df70d55b15d1a05257c57d17f161e19 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -31,7 +31,8 @@ import org.scalatest.BeforeAndAfterEach
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, StorageLevel}
+import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel}
+import org.apache.spark.storage.memory.MemoryStore
 
 
 /**
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index eee78d396e14778bb92ecd0ef6d90dc16405b01f..741d4fdf78197e2582478b1da3bcf5a4bf6d8440 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
 import org.mockito.Mockito.when
 
 import org.apache.spark.SparkConf
-import org.apache.spark.storage.{MemoryStore, TestBlockId}
+import org.apache.spark.storage.TestBlockId
+import org.apache.spark.storage.memory.MemoryStore
 
 class StaticMemoryManagerSuite extends MemoryManagerSuite {
   private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 9686c6621b465d511b963c6b6a62f2993d047db9..9001a26652c92b953a4499587c7630e14d82b275 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
 import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.SparkConf
-import org.apache.spark.storage.{MemoryStore, TestBlockId}
+import org.apache.spark.storage.TestBlockId
+import org.apache.spark.storage.memory.MemoryStore
 
 class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTester {
   private val dummyBlock = TestBlockId("--")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index ae1faf5d98f38e59611e16c3453bed3e6c05dd2b..b78a3648cd8bcfccbe5215357a560b64b770836e 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -366,7 +366,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
         testStore => blockLocations.contains(testStore.blockManagerId.executorId)
       }.foreach { testStore =>
         val testStoreName = testStore.blockManagerId.executorId
-        assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName")
+        assert(
+          testStore.getLocalValues(blockId).isDefined, s"$blockId was not found in $testStoreName")
         testStore.releaseLock(blockId)
         assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
           s"master does not have status for ${blockId.name} in $testStoreName")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 0485b0501c0300a62da2c439853ca199773b3cd8..42595c8cf2daa883ace45cf55b45c110af997a28 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark.storage
 
-import java.nio.{ByteBuffer, MappedByteBuffer}
-import java.util.Arrays
+import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
@@ -614,11 +613,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store.putSingle("a3", a3, storageLevel)
     assert(accessMethod("a2").isDefined, "a2 was not in store")
     assert(accessMethod("a3").isDefined, "a3 was not in store")
-    assert(!store.memoryStore.contains("a1"), "a1 was in memory store")
     assert(accessMethod("a1").isDefined, "a1 was not in store")
     val dataShouldHaveBeenCachedBackIntoMemory = {
-      if (storageLevel.deserialized) !getAsBytes
-      else getAsBytes
+      if (storageLevel.deserialized) {
+        !getAsBytes
+      } else {
+        // If the block's storage level is serialized, then always cache the bytes in memory, even
+        // if the caller requested values.
+        true
+      }
     }
     if (dataShouldHaveBeenCachedBackIntoMemory) {
       assert(store.memoryStore.contains("a1"), "a1 was not in memory store")
@@ -735,7 +738,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
     assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
     store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
-    assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
+    assert(!store.memoryStore.contains("a2"), "a2 was in memory store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
   }
 
@@ -829,50 +832,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     }
   }
 
-  test("reads of memory-mapped and non memory-mapped files are equivalent") {
-    val confKey = "spark.storage.memoryMapThreshold"
-
-    // Create a non-trivial (not all zeros) byte array
-    var counter = 0.toByte
-    def incr: Byte = {counter = (counter + 1).toByte; counter;}
-    val bytes = Array.fill[Byte](1000)(incr)
-    val byteBuffer = ByteBuffer.wrap(bytes)
-
-    val blockId = BlockId("rdd_1_2")
-
-    // This sequence of mocks makes these tests fairly brittle. It would
-    // be nice to refactor classes involved in disk storage in a way that
-    // allows for easier testing.
-    val blockManager = mock(classOf[BlockManager])
-    when(blockManager.conf).thenReturn(conf.clone.set(confKey, "0"))
-    val diskBlockManager = new DiskBlockManager(blockManager, conf)
-
-    val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
-    diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
-    val mapped = diskStoreMapped.getBytes(blockId).get
-
-    when(blockManager.conf).thenReturn(conf.clone.set(confKey, "1m"))
-    val diskStoreNotMapped = new DiskStore(blockManager, diskBlockManager)
-    diskStoreNotMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
-    val notMapped = diskStoreNotMapped.getBytes(blockId).get
-
-    // Not possible to do isInstanceOf due to visibility of HeapByteBuffer
-    assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
-      "Expected HeapByteBuffer for un-mapped read")
-    assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read")
-
-    def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
-      val array = new Array[Byte](in.remaining())
-      in.get(array)
-      array
-    }
-
-    val mappedAsArray = arrayFromByteBuffer(mapped)
-    val notMappedAsArray = arrayFromByteBuffer(notMapped)
-    assert(Arrays.equals(mappedAsArray, bytes))
-    assert(Arrays.equals(notMappedAsArray, bytes))
-  }
-
   test("updated block statuses") {
     store = makeBlockManager(12000)
     store.registerTask(0)
@@ -1232,19 +1191,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
-    // Unroll huge block with not enough space. This should fail and drop the new block to disk
-    // directly in addition to kicking out b2 in the process. Memory store should contain only
-    // b3, while disk store should contain b1, b2 and b4.
+    // Unroll huge block with not enough space. This should fail and return an iterator so that
+    // the block may be stored to disk. During the unrolling process, block "b2" should be kicked
+    // out, so the memory store should contain only b3, while the disk store should contain
+    // b1, b2 and b4.
     val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk)
-    assert(result4.isRight)
+    assert(result4.isLeft)
     assert(!memoryStore.contains("b1"))
     assert(!memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
     assert(!memoryStore.contains("b4"))
-    assert(diskStore.contains("b1"))
-    assert(diskStore.contains("b2"))
-    assert(!diskStore.contains("b3"))
-    assert(diskStore.contains("b4"))
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
   }
 
@@ -1366,7 +1322,7 @@ private object BlockManagerSuite {
       getLocalAndReleaseLock(blockId).isDefined
     }
 
-    val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocal)
+    val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocalValues)
     val getAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.get)
     val getSingleAndReleaseLock: (BlockId) => Option[Any] = wrapGet(store.getSingle)
     val getLocalBytesAndReleaseLock: (BlockId) => Option[ByteBuffer] = wrapGet(store.getLocalBytes)
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 69e17461df755fcee3477d645e4e9492bb0f8d1b..bbfd6df3b699054312d44accfdcb662e61ae193a 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -21,7 +21,6 @@ import java.io.{File, FileWriter}
 
 import scala.language.reflectiveCalls
 
-import org.mockito.Mockito.{mock, when}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -33,8 +32,6 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
   private var rootDir1: File = _
   private var rootDirs: String = _
 
-  val blockManager = mock(classOf[BlockManager])
-  when(blockManager.conf).thenReturn(testConf)
   var diskBlockManager: DiskBlockManager = _
 
   override def beforeAll() {
@@ -57,7 +54,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
     super.beforeEach()
     val conf = testConf.clone
     conf.set("spark.local.dir", rootDirs)
-    diskBlockManager = new DiskBlockManager(blockManager, conf)
+    diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
   }
 
   override def afterEach() {
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..97e74fe7060029823986b5110063cf0ac34dca2f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.Arrays
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+
+class DiskStoreSuite extends SparkFunSuite {
+
+  test("reads of memory-mapped and non memory-mapped files are equivalent") {
+    val confKey = "spark.storage.memoryMapThreshold"
+
+    // Create a non-trivial (not all zeros) byte array
+    val bytes = Array.tabulate[Byte](1000)(_.toByte)
+    val byteBuffer = ByteBuffer.wrap(bytes)
+
+    val blockId = BlockId("rdd_1_2")
+    val diskBlockManager = new DiskBlockManager(new SparkConf(), deleteFilesOnStop = true)
+
+    val diskStoreMapped = new DiskStore(new SparkConf().set(confKey, "0"), diskBlockManager)
+    diskStoreMapped.putBytes(blockId, byteBuffer)
+    val mapped = diskStoreMapped.getBytes(blockId)
+    assert(diskStoreMapped.remove(blockId))
+
+    val diskStoreNotMapped = new DiskStore(new SparkConf().set(confKey, "1m"), diskBlockManager)
+    diskStoreNotMapped.putBytes(blockId, byteBuffer)
+    val notMapped = diskStoreNotMapped.getBytes(blockId)
+
+    // Not possible to do isInstanceOf due to visibility of HeapByteBuffer
+    assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
+      "Expected HeapByteBuffer for un-mapped read")
+    assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read")
+
+    def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
+      val array = new Array[Byte](in.remaining())
+      in.get(array)
+      array
+    }
+
+    val mappedAsArray = arrayFromByteBuffer(mapped)
+    val notMappedAsArray = arrayFromByteBuffer(notMapped)
+    assert(Arrays.equals(mappedAsArray, bytes))
+    assert(Arrays.equals(notMappedAsArray, bytes))
+  }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index dd16fc3ecaf5d3f4bafdefc6155188fc206ef9c5..45424f9bac05a7559cdc5efab810ded893a72ef8 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -106,7 +106,10 @@ class ReceivedBlockHandlerSuite
       testBlockStoring(handler) { case (data, blockIds, storeResults) =>
         // Verify the data in block manager is correct
         val storedData = blockIds.flatMap { blockId =>
-          blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
+          blockManager
+            .getLocalValues(blockId)
+            .map(_.data.map(_.toString).toList)
+            .getOrElse(List.empty)
         }.toList
         storedData shouldEqual data
 
@@ -130,7 +133,10 @@ class ReceivedBlockHandlerSuite
       testBlockStoring(handler) { case (data, blockIds, storeResults) =>
         // Verify the data in block manager is correct
         val storedData = blockIds.flatMap { blockId =>
-          blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
+          blockManager
+            .getLocalValues(blockId)
+            .map(_.data.map(_.toString).toList)
+            .getOrElse(List.empty)
         }.toList
         storedData shouldEqual data