diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
deleted file mode 100644
index 2b456facd9439afce1095b8c0b9c34e43c5f4ca9..0000000000000000000000000000000000000000
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import scala.collection.mutable
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage._
-import org.apache.spark.util.CompletionIterator
-
-/**
- * Spark class responsible for passing RDDs partition contents to the BlockManager and making
- * sure a node doesn't load two copies of an RDD at once.
- */
-private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
-
-  /** Keys of RDD partitions that are being computed/loaded. */
-  private val loading = new mutable.HashSet[RDDBlockId]
-
-  /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
-  def getOrCompute[T](
-      rdd: RDD[T],
-      partition: Partition,
-      context: TaskContext,
-      storageLevel: StorageLevel): Iterator[T] = {
-
-    val key = RDDBlockId(rdd.id, partition.index)
-    logDebug(s"Looking for partition $key")
-    blockManager.get(key) match {
-      case Some(blockResult) =>
-        // Partition is already materialized, so just return its values
-        val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
-        existingMetrics.incBytesReadInternal(blockResult.bytes)
-
-        val iter = blockResult.data.asInstanceOf[Iterator[T]]
-
-        new InterruptibleIterator[T](context, iter) {
-          override def next(): T = {
-            existingMetrics.incRecordsReadInternal(1)
-            delegate.next()
-          }
-        }
-      case None =>
-        // Acquire a lock for loading this partition
-        // If another thread already holds the lock, wait for it to finish return its results
-        val storedValues = acquireLockForPartition[T](key)
-        if (storedValues.isDefined) {
-          return new InterruptibleIterator[T](context, storedValues.get)
-        }
-
-        // Otherwise, we have to load the partition ourselves
-        try {
-          logInfo(s"Partition $key not found, computing it")
-          val computedValues = rdd.computeOrReadCheckpoint(partition, context)
-          val cachedValues = putInBlockManager(key, computedValues, storageLevel)
-          new InterruptibleIterator(context, cachedValues)
-        } finally {
-          loading.synchronized {
-            loading.remove(key)
-            loading.notifyAll()
-          }
-        }
-    }
-  }
-
-  /**
-   * Acquire a loading lock for the partition identified by the given block ID.
-   *
-   * If the lock is free, just acquire it and return None. Otherwise, another thread is already
-   * loading the partition, so we wait for it to finish and return the values loaded by the thread.
-   */
-  private def acquireLockForPartition[T](id: RDDBlockId): Option[Iterator[T]] = {
-    loading.synchronized {
-      if (!loading.contains(id)) {
-        // If the partition is free, acquire its lock to compute its value
-        loading.add(id)
-        None
-      } else {
-        // Otherwise, wait for another thread to finish and return its result
-        logInfo(s"Another thread is loading $id, waiting for it to finish...")
-        while (loading.contains(id)) {
-          try {
-            loading.wait()
-          } catch {
-            case e: Exception =>
-              logWarning(s"Exception while waiting for another thread to load $id", e)
-          }
-        }
-        logInfo(s"Finished waiting for $id")
-        val values = blockManager.get(id)
-        if (!values.isDefined) {
-          /* The block is not guaranteed to exist even after the other thread has finished.
-           * For instance, the block could be evicted after it was put, but before our get.
-           * In this case, we still need to load the partition ourselves. */
-          logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
-          loading.add(id)
-        }
-        values.map(_.data.asInstanceOf[Iterator[T]])
-      }
-    }
-  }
-
-  /**
-   * Cache the values of a partition, keeping track of any updates in the storage statuses of
-   * other blocks along the way.
-   *
-   * The effective storage level refers to the level that actually specifies BlockManager put
-   * behavior, not the level originally specified by the user. This is mainly for forcing a
-   * MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
-   * while preserving the original semantics of the RDD as specified by the application.
-   */
-  private def putInBlockManager[T](
-      key: BlockId,
-      values: Iterator[T],
-      level: StorageLevel,
-      effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
-
-    val putLevel = effectiveStorageLevel.getOrElse(level)
-    if (!putLevel.useMemory) {
-      /*
-       * This RDD is not to be cached in memory, so we can just pass the computed values as an
-       * iterator directly to the BlockManager rather than first fully unrolling it in memory.
-       */
-      blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
-      blockManager.get(key) match {
-        case Some(v) => v.data.asInstanceOf[Iterator[T]]
-        case None =>
-          logInfo(s"Failure to store $key")
-          throw new BlockException(key, s"Block manager failed to return cached value for $key!")
-      }
-    } else {
-      /*
-       * This RDD is to be cached in memory. In this case we cannot pass the computed values
-       * to the BlockManager as an iterator and expect to read it back later. This is because
-       * we may end up dropping a partition from memory store before getting it back.
-       *
-       * In addition, we must be careful to not unroll the entire partition in memory at once.
-       * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
-       * single partition. Instead, we unroll the values cautiously, potentially aborting and
-       * dropping the partition to disk if applicable.
-       */
-      blockManager.memoryStore.unrollSafely(key, values) match {
-        case Left(arr) =>
-          // We have successfully unrolled the entire partition, so cache it in memory
-          blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
-          CompletionIterator[T, Iterator[T]](
-            arr.iterator.asInstanceOf[Iterator[T]],
-            blockManager.releaseLock(key))
-        case Right(it) =>
-          // There is not enough space to cache this partition in memory
-          val returnValues = it.asInstanceOf[Iterator[T]]
-          if (putLevel.useDisk) {
-            logWarning(s"Persisting partition $key to disk instead.")
-            val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
-              useOffHeap = false, deserialized = false, putLevel.replication)
-            putInBlockManager[T](key, returnValues, level, Some(diskOnlyLevel))
-          } else {
-            returnValues
-          }
-      }
-    }
-  }
-
-}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 204f7356f7ef8ecb5c1757e79b5c43a89e3e3bfd..b3b3729625ad5f9bd89491e1d92467d25999b34c 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -56,7 +56,6 @@ class SparkEnv (
     private[spark] val rpcEnv: RpcEnv,
     val serializer: Serializer,
     val closureSerializer: Serializer,
-    val cacheManager: CacheManager,
     val mapOutputTracker: MapOutputTracker,
     val shuffleManager: ShuffleManager,
     val broadcastManager: BroadcastManager,
@@ -333,8 +332,6 @@ object SparkEnv extends Logging {
 
     val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
 
-    val cacheManager = new CacheManager(blockManager)
-
     val metricsSystem = if (isDriver) {
       // Don't start metrics system right now for Driver.
       // We need to wait for the task scheduler to give us an app ID.
@@ -371,7 +368,6 @@ object SparkEnv extends Logging {
       rpcEnv,
       serializer,
       closureSerializer,
-      cacheManager,
       mapOutputTracker,
       shuffleManager,
       broadcastManager,
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index c08f87a8b45c10ac0438e2bb502ab768f915f001..dabc81001834b35ed2c18bd10689a80b007c9df1 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -99,18 +99,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
     // Store a copy of the broadcast variable in the driver so that tasks run on the driver
     // do not create a duplicate copy of the broadcast variable's value.
     val blockManager = SparkEnv.get.blockManager
-    if (blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
-      blockManager.releaseLock(broadcastId)
-    } else {
+    if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
       throw new SparkException(s"Failed to store $broadcastId in BlockManager")
     }
     val blocks =
       TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
     blocks.zipWithIndex.foreach { case (block, i) =>
       val pieceId = BroadcastBlockId(id, "piece" + i)
-      if (blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
-        blockManager.releaseLock(pieceId)
-      } else {
+      if (!blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
         throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
       }
     }
@@ -130,22 +126,24 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
       // First try getLocalBytes because there is a chance that previous attempts to fetch the
       // broadcast blocks have already fetched some of the blocks. In that case, some blocks
       // would be available locally (on this executor).
-      def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
-      def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
-        // If we found the block from remote executors/driver's BlockManager, put the block
-        // in this executor's BlockManager.
-        if (!bm.putBytes(pieceId, block, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
-          throw new SparkException(
-            s"Failed to store $pieceId of $broadcastId in local BlockManager")
-        }
-        block
+      bm.getLocalBytes(pieceId) match {
+        case Some(block) =>
+          blocks(pid) = block
+          releaseLock(pieceId)
+        case None =>
+          bm.getRemoteBytes(pieceId) match {
+            case Some(b) =>
+              // We found the block from remote executors/driver's BlockManager, so put the block
+              // in this executor's BlockManager.
+              if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
+                throw new SparkException(
+                  s"Failed to store $pieceId of $broadcastId in local BlockManager")
+              }
+              blocks(pid) = b
+            case None =>
+              throw new SparkException(s"Failed to get $pieceId of $broadcastId")
+          }
       }
-      val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
-        throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
-      // At this point we are guaranteed to hold a read lock, since we either got the block locally
-      // or stored the remotely-fetched block and automatically downgraded the write lock.
-      blocks(pid) = block
-      releaseLock(pieceId)
     }
     blocks
   }
@@ -191,9 +189,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
           // Store the merged copy in BlockManager so other tasks on this executor don't
           // need to re-fetch it.
           val storageLevel = StorageLevel.MEMORY_AND_DISK
-          if (blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
-            releaseLock(broadcastId)
-          } else {
+          if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
             throw new SparkException(s"Failed to store $broadcastId in BlockManager")
           }
           obj
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a959f200d4cc2e71cf38229cf7ecdaf69fb25b68..e88d6cd08998c7ce822a0c52f99be8f3b97e65ab 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -292,11 +292,8 @@ private[spark] class Executor(
             ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
           } else if (resultSize >= maxRpcMessageSize) {
             val blockId = TaskResultBlockId(taskId)
-            val putSucceeded = env.blockManager.putBytes(
+            env.blockManager.putBytes(
               blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
-            if (putSucceeded) {
-              env.blockManager.releaseLock(blockId)
-            }
             logInfo(
               s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
             ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index e4246df83a6ec24fb9c6836cf6c2d3736c751906..e86933b948966a345e12b0de1dd11c536e884302 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -66,10 +66,7 @@ class NettyBlockRpcServer(
           serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata))
         val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
         val blockId = BlockId(uploadBlock.blockId)
-        val putSucceeded = blockManager.putBlockData(blockId, data, level)
-        if (putSucceeded) {
-          blockManager.releaseLock(blockId)
-        }
+        blockManager.putBlockData(blockId, data, level)
         responseContext.onSuccess(ByteBuffer.allocate(0))
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 6a6ad2d75a897898613efff1ecda5c85fc4f0ed9..e5fdebc65da8a092fa995e6735b08e937d41ca9e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -37,7 +37,7 @@ import org.apache.spark.partial.BoundedDouble
 import org.apache.spark.partial.CountEvaluator
 import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.{RDDBlockId, StorageLevel}
 import org.apache.spark.util.{BoundedPriorityQueue, Utils}
 import org.apache.spark.util.collection.OpenHashMap
 import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler,
@@ -272,7 +272,7 @@ abstract class RDD[T: ClassTag](
    */
   final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
     if (storageLevel != StorageLevel.NONE) {
-      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
+      getOrCompute(split, context)
     } else {
       computeOrReadCheckpoint(split, context)
     }
@@ -314,6 +314,35 @@ abstract class RDD[T: ClassTag](
     }
   }
 
+  /**
+   * Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached.
+   */
+  private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
+    val blockId = RDDBlockId(id, partition.index)
+    var readCachedBlock = true
+    // This method is called on executors, so we need call SparkEnv.get instead of sc.env.
+    SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, () => {
+      readCachedBlock = false
+      computeOrReadCheckpoint(partition, context)
+    }) match {
+      case Left(blockResult) =>
+        if (readCachedBlock) {
+          val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
+          existingMetrics.incBytesReadInternal(blockResult.bytes)
+          new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
+            override def next(): T = {
+              existingMetrics.incRecordsReadInternal(1)
+              delegate.next()
+            }
+          }
+        } else {
+          new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
+        }
+      case Right(iter) =>
+        new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
+    }
+  }
+
   /**
    * Execute a block of code in a scope such that all new RDDs created in this body will
    * be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}.
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 0eda97e58d451b8d6f3b94069edb0edb6a48890a..b23244ad5187ac9c7defcdd567776011fe3fa113 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -71,27 +71,13 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
     _writerTask = t
     checkInvariants()
   }
-  private[this] var _writerTask: Long = 0
-
-  /**
-   * True if this block has been removed from the BlockManager and false otherwise.
-   * This field is used to communicate block deletion to blocked readers / writers (see its usage
-   * in [[BlockInfoManager]]).
-   */
-  def removed: Boolean = _removed
-  def removed_=(r: Boolean): Unit = {
-    _removed = r
-    checkInvariants()
-  }
-  private[this] var _removed: Boolean = false
+  private[this] var _writerTask: Long = BlockInfo.NO_WRITER
 
   private def checkInvariants(): Unit = {
     // A block's reader count must be non-negative:
     assert(_readerCount >= 0)
     // A block is either locked for reading or for writing, but not for both at the same time:
     assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER)
-    // If a block is removed then it is not locked:
-    assert(!_removed || (_readerCount == 0 && _writerTask == BlockInfo.NO_WRITER))
   }
 
   checkInvariants()
@@ -195,16 +181,22 @@ private[storage] class BlockInfoManager extends Logging {
       blockId: BlockId,
       blocking: Boolean = true): Option[BlockInfo] = synchronized {
     logTrace(s"Task $currentTaskAttemptId trying to acquire read lock for $blockId")
-    infos.get(blockId).map { info =>
-      while (info.writerTask != BlockInfo.NO_WRITER) {
-        if (blocking) wait() else return None
+    do {
+      infos.get(blockId) match {
+        case None => return None
+        case Some(info) =>
+          if (info.writerTask == BlockInfo.NO_WRITER) {
+            info.readerCount += 1
+            readLocksByTask(currentTaskAttemptId).add(blockId)
+            logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
+            return Some(info)
+          }
       }
-      if (info.removed) return None
-      info.readerCount += 1
-      readLocksByTask(currentTaskAttemptId).add(blockId)
-      logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
-      info
-    }
+      if (blocking) {
+        wait()
+      }
+    } while (blocking)
+    None
   }
 
   /**
@@ -226,21 +218,25 @@ private[storage] class BlockInfoManager extends Logging {
       blockId: BlockId,
       blocking: Boolean = true): Option[BlockInfo] = synchronized {
     logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId")
-    infos.get(blockId).map { info =>
-      if (info.writerTask == currentTaskAttemptId) {
-        throw new IllegalStateException(
-          s"Task $currentTaskAttemptId has already locked $blockId for writing")
-      } else {
-        while (info.writerTask != BlockInfo.NO_WRITER || info.readerCount != 0) {
-          if (blocking) wait() else return None
-        }
-        if (info.removed) return None
+    do {
+      infos.get(blockId) match {
+        case None => return None
+        case Some(info) =>
+          if (info.writerTask == currentTaskAttemptId) {
+            throw new IllegalStateException(
+              s"Task $currentTaskAttemptId has already locked $blockId for writing")
+          } else if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) {
+            info.writerTask = currentTaskAttemptId
+            writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
+            logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
+            return Some(info)
+          }
       }
-      info.writerTask = currentTaskAttemptId
-      writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
-      logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
-      info
-    }
+      if (blocking) {
+        wait()
+      }
+    } while (blocking)
+    None
   }
 
   /**
@@ -306,29 +302,30 @@ private[storage] class BlockInfoManager extends Logging {
   }
 
   /**
-   * Atomically create metadata for a block and acquire a write lock for it, if it doesn't already
-   * exist.
+   * Attempt to acquire the appropriate lock for writing a new block.
+   *
+   * This enforces the first-writer-wins semantics. If we are the first to write the block,
+   * then just go ahead and acquire the write lock. Otherwise, if another thread is already
+   * writing the block, then we wait for the write to finish before acquiring the read lock.
    *
-   * @param blockId the block id.
-   * @param newBlockInfo the block info for the new block.
    * @return true if the block did not already exist, false otherwise. If this returns false, then
-   *         no new locks are acquired. If this returns true, a write lock on the new block will
-   *         be held.
+   *         a read lock on the existing block will be held. If this returns true, a write lock on
+   *         the new block will be held.
    */
   def lockNewBlockForWriting(
       blockId: BlockId,
       newBlockInfo: BlockInfo): Boolean = synchronized {
     logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
-    if (!infos.contains(blockId)) {
-      infos(blockId) = newBlockInfo
-      newBlockInfo.writerTask = currentTaskAttemptId
-      writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
-      logTrace(s"Task $currentTaskAttemptId successfully locked new block $blockId")
-      true
-    } else {
-      logTrace(s"Task $currentTaskAttemptId did not create and lock block $blockId " +
-        s"because that block already exists")
-      false
+    lockForReading(blockId) match {
+      case Some(info) =>
+        // Block already exists. This could happen if another thread races with us to compute
+        // the same block. In this case, just keep the read lock and return.
+        false
+      case None =>
+        // Block does not yet exist or is removed, so we are free to acquire the write lock
+        infos(blockId) = newBlockInfo
+        lockForWriting(blockId)
+        true
     }
   }
 
@@ -418,7 +415,6 @@ private[storage] class BlockInfoManager extends Logging {
           infos.remove(blockId)
           blockInfo.readerCount = 0
           blockInfo.writerTask = BlockInfo.NO_WRITER
-          blockInfo.removed = true
         }
       case None =>
         throw new IllegalArgumentException(
@@ -434,7 +430,6 @@ private[storage] class BlockInfoManager extends Logging {
     infos.valuesIterator.foreach { blockInfo =>
       blockInfo.readerCount = 0
       blockInfo.writerTask = BlockInfo.NO_WRITER
-      blockInfo.removed = true
     }
     infos.clear()
     readLocksByTask.clear()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 29124b368e405e06db97c71c536c86640a8668fb..b59191b2917c9aebd28d1feb36d484d2544b54e3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -44,8 +44,7 @@ import org.apache.spark.util._
 
 private[spark] sealed trait BlockValues
 private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
-private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
-private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
+private[spark] case class IteratorValues(iterator: () => Iterator[Any]) extends BlockValues
 
 /* Class for returning a fetched block and associated metrics. */
 private[spark] class BlockResult(
@@ -648,8 +647,38 @@ private[spark] class BlockManager(
   }
 
   /**
-   * @return true if the block was stored or false if the block was already stored or an
-   *         error occurred.
+   * Retrieve the given block if it exists, otherwise call the provided `makeIterator` method
+   * to compute the block, persist it, and return its values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an iterator if the block
+   *         could not be cached.
+   */
+  def getOrElseUpdate(
+      blockId: BlockId,
+      level: StorageLevel,
+      makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = {
+    // Initially we hold no locks on this block.
+    doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) match {
+      case None =>
+        // doPut() didn't hand work back to us, so the block already existed or was successfully
+        // stored. Therefore, we now hold a read lock on the block.
+        val blockResult = get(blockId).getOrElse {
+          // Since we held a read lock between the doPut() and get() calls, the block should not
+          // have been evicted, so get() not returning the block indicates some internal error.
+          releaseLock(blockId)
+          throw new SparkException(s"get() failed for block $blockId even though we held a lock")
+        }
+        Left(blockResult)
+      case Some(failedPutResult) =>
+        // The put failed, likely because the data was too large to fit in memory and could not be
+        // dropped to disk. Therefore, we need to pass the input iterator back to the caller so
+        // that they can decide what to do with the values (e.g. process them without caching).
+       Right(failedPutResult.data.left.get)
+    }
+  }
+
+  /**
+   * @return true if the block was stored or false if an error occurred.
    */
   def putIterator(
       blockId: BlockId,
@@ -658,7 +687,7 @@ private[spark] class BlockManager(
       tellMaster: Boolean = true,
       effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
     require(values != null, "Values is null")
-    doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
+    doPut(blockId, IteratorValues(() => values), level, tellMaster, effectiveStorageLevel).isEmpty
   }
 
   /**
@@ -678,27 +707,10 @@ private[spark] class BlockManager(
       syncWrites, writeMetrics, blockId)
   }
 
-  /**
-   * Put a new block of values to the block manager.
-   *
-   * @return true if the block was stored or false if the block was already stored or an
-   *         error occurred.
-   */
-  def putArray(
-      blockId: BlockId,
-      values: Array[Any],
-      level: StorageLevel,
-      tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
-    require(values != null, "Values is null")
-    doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
-  }
-
   /**
    * Put a new block of serialized bytes to the block manager.
    *
-   * @return true if the block was stored or false if the block was already stored or an
-   *         error occurred.
+   * @return true if the block was stored or false if an error occurred.
    */
   def putBytes(
       blockId: BlockId,
@@ -707,26 +719,32 @@ private[spark] class BlockManager(
       tellMaster: Boolean = true,
       effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
     require(bytes != null, "Bytes is null")
-    doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
+    doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel).isEmpty
   }
 
   /**
    * Put the given block according to the given level in one of the block stores, replicating
    * the values if necessary.
    *
-   * The effective storage level refers to the level according to which the block will actually be
-   * handled. This allows the caller to specify an alternate behavior of doPut while preserving
-   * the original level specified by the user.
+   * If the block already exists, this method will not overwrite it.
    *
-   * @return true if the block was stored or false if the block was already stored or an
-   *         error occurred.
+   * @param effectiveStorageLevel the level according to which the block will actually be handled.
+   *                              This allows the caller to specify an alternate behavior of doPut
+   *                              while preserving the original level specified by the user.
+   * @param keepReadLock if true, this method will hold the read lock when it returns (even if the
+   *                     block already exists). If false, this method will hold no locks when it
+   *                     returns.
+   * @return `Some(PutResult)` if the block did not exist and could not be successfully cached,
+   *         or None if the block already existed or was successfully stored (fully consuming
+   *         the input data / input iterator).
    */
   private def doPut(
       blockId: BlockId,
       data: BlockValues,
       level: StorageLevel,
       tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
+      effectiveStorageLevel: Option[StorageLevel] = None,
+      keepReadLock: Boolean = false): Option[PutResult] = {
 
     require(blockId != null, "BlockId is null")
     require(level != null && level.isValid, "StorageLevel is null or invalid")
@@ -743,7 +761,11 @@ private[spark] class BlockManager(
         newInfo
       } else {
         logWarning(s"Block $blockId already exists on this machine; not re-adding it")
-        return false
+        if (!keepReadLock) {
+          // lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
+          releaseLock(blockId)
+        }
+        return None
       }
     }
 
@@ -779,6 +801,7 @@ private[spark] class BlockManager(
     }
 
     var blockWasSuccessfullyStored = false
+    var result: PutResult = null
 
     putBlockInfo.synchronized {
       logTrace("Put for block %s took %s to get into synchronized block"
@@ -803,11 +826,9 @@ private[spark] class BlockManager(
         }
 
         // Actually put the values
-        val result = data match {
+        result = data match {
           case IteratorValues(iterator) =>
-            blockStore.putIterator(blockId, iterator, putLevel, returnValues)
-          case ArrayValues(array) =>
-            blockStore.putArray(blockId, array, putLevel, returnValues)
+            blockStore.putIterator(blockId, iterator(), putLevel, returnValues)
           case ByteBufferValues(bytes) =>
             bytes.rewind()
             blockStore.putBytes(blockId, bytes, putLevel)
@@ -834,7 +855,11 @@ private[spark] class BlockManager(
         }
       } finally {
         if (blockWasSuccessfullyStored) {
-          blockInfoManager.downgradeLock(blockId)
+          if (keepReadLock) {
+            blockInfoManager.downgradeLock(blockId)
+          } else {
+            blockInfoManager.unlock(blockId)
+          }
         } else {
           blockInfoManager.removeBlock(blockId)
           logWarning(s"Putting block $blockId failed")
@@ -852,18 +877,20 @@ private[spark] class BlockManager(
             Await.ready(replicationFuture, Duration.Inf)
           }
         case _ =>
-          val remoteStartTime = System.currentTimeMillis
-          // Serialize the block if not already done
-          if (bytesAfterPut == null) {
-            if (valuesAfterPut == null) {
-              throw new SparkException(
-                "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
+          if (blockWasSuccessfullyStored) {
+            val remoteStartTime = System.currentTimeMillis
+            // Serialize the block if not already done
+            if (bytesAfterPut == null) {
+              if (valuesAfterPut == null) {
+                throw new SparkException(
+                  "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
+              }
+              bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
             }
-            bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
+            replicate(blockId, bytesAfterPut, putLevel)
+            logDebug("Put block %s remotely took %s"
+              .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
           }
-          replicate(blockId, bytesAfterPut, putLevel)
-          logDebug("Put block %s remotely took %s"
-            .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
       }
     }
 
@@ -877,7 +904,11 @@ private[spark] class BlockManager(
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     }
 
-    blockWasSuccessfullyStored
+    if (blockWasSuccessfullyStored) {
+      None
+    } else {
+      Some(result)
+    }
   }
 
   /**
@@ -1033,7 +1064,7 @@ private[spark] class BlockManager(
       logInfo(s"Writing block $blockId to disk")
       data() match {
         case Left(elements) =>
-          diskStore.putArray(blockId, elements, level, returnValues = false)
+          diskStore.putIterator(blockId, elements.toIterator, level, returnValues = false)
         case Right(bytes) =>
           diskStore.putBytes(blockId, bytes, level)
       }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index 6f6a6773ba4fdd3adf7e96fb5e986dec645333cc..d3af50d974232de2926b3240c0dc276803979b9f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -19,8 +19,6 @@ package org.apache.spark.storage
 
 import java.nio.ByteBuffer
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.Logging
 
 /**
@@ -43,12 +41,6 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
     level: StorageLevel,
     returnValues: Boolean): PutResult
 
-  def putArray(
-    blockId: BlockId,
-    values: Array[Any],
-    level: StorageLevel,
-    returnValues: Boolean): PutResult
-
   /**
    * Return the size of a block in bytes.
    */
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 1f3f193f2ffa22ed53c1a6c14b5d30e9d157855a..bfa6560a724c0e14e14db1125039fe6c897ab2f5 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -58,14 +58,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
     PutResult(bytes.limit(), Right(bytes.duplicate()))
   }
 
-  override def putArray(
-      blockId: BlockId,
-      values: Array[Any],
-      level: StorageLevel,
-      returnValues: Boolean): PutResult = {
-    putIterator(blockId, values.toIterator, level, returnValues)
-  }
-
   override def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 2f16c8f3d8badf40cc3684f3952553b6c7c0efa1..317d73abba4c6dbaa8f544cec503d403679316a9 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -120,22 +120,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
     PutResult(size, data)
   }
 
-  override def putArray(
-      blockId: BlockId,
-      values: Array[Any],
-      level: StorageLevel,
-      returnValues: Boolean): PutResult = {
-    if (level.deserialized) {
-      val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
-      tryToPut(blockId, () => values, sizeEstimate, deserialized = true)
-      PutResult(sizeEstimate, Left(values.iterator))
-    } else {
-      val bytes = blockManager.dataSerialize(blockId, values.iterator)
-      tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
-      PutResult(bytes.limit(), Right(bytes.duplicate()))
-    }
-  }
-
   override def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
@@ -166,7 +150,17 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
     unrolledValues match {
       case Left(arrayValues) =>
         // Values are fully unrolled in memory, so store them as an array
-        val res = putArray(blockId, arrayValues, level, returnValues)
+        val res = {
+          if (level.deserialized) {
+            val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
+            tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)
+            PutResult(sizeEstimate, Left(arrayValues.iterator))
+          } else {
+            val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
+            tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
+            PutResult(bytes.limit(), Right(bytes.duplicate()))
+          }
+        }
         PutResult(res.size, res.data)
       case Right(iteratorValues) =>
         // Not enough space to unroll this block; drop to disk if applicable
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
deleted file mode 100644
index ffc02bcb011f3ad69106a40c7c5e4230c5613cdd..0000000000000000000000000000000000000000
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.mockito.Mockito._
-import org.scalatest.BeforeAndAfter
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage._
-
-// TODO: Test the CacheManager's thread-safety aspects
-class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter
-  with MockitoSugar {
-
-  var blockManager: BlockManager = _
-  var cacheManager: CacheManager = _
-  var split: Partition = _
-  /** An RDD which returns the values [1, 2, 3, 4]. */
-  var rdd: RDD[Int] = _
-  var rdd2: RDD[Int] = _
-  var rdd3: RDD[Int] = _
-
-  before {
-    sc = new SparkContext("local", "test")
-    blockManager = mock[BlockManager]
-    cacheManager = new CacheManager(blockManager)
-    split = new Partition { override def index: Int = 0 }
-    rdd = new RDD[Int](sc, Nil) {
-      override def getPartitions: Array[Partition] = Array(split)
-      override val getDependencies = List[Dependency[_]]()
-      override def compute(split: Partition, context: TaskContext): Iterator[Int] =
-        Array(1, 2, 3, 4).iterator
-    }
-    rdd2 = new RDD[Int](sc, List(new OneToOneDependency(rdd))) {
-      override def getPartitions: Array[Partition] = firstParent[Int].partitions
-      override def compute(split: Partition, context: TaskContext): Iterator[Int] =
-        firstParent[Int].iterator(split, context)
-    }.cache()
-    rdd3 = new RDD[Int](sc, List(new OneToOneDependency(rdd2))) {
-      override def getPartitions: Array[Partition] = firstParent[Int].partitions
-      override def compute(split: Partition, context: TaskContext): Iterator[Int] =
-        firstParent[Int].iterator(split, context)
-    }.cache()
-  }
-
-  test("get uncached rdd") {
-    // Do not mock this test, because attempting to match Array[Any], which is not covariant,
-    // in blockManager.put is a losing battle. You have been warned.
-    blockManager = sc.env.blockManager
-    cacheManager = sc.env.cacheManager
-    val context = TaskContext.empty()
-    val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
-    val getValue = blockManager.get(RDDBlockId(rdd.id, split.index))
-    assert(computeValue.toList === List(1, 2, 3, 4))
-    assert(getValue.isDefined, "Block cached from getOrCompute is not found!")
-    assert(getValue.get.data.toList === List(1, 2, 3, 4))
-  }
-
-  test("get cached rdd") {
-    val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
-    when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result))
-
-    val context = TaskContext.empty()
-    val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
-    assert(value.toList === List(5, 6, 7))
-  }
-
-  test("verify task metrics updated correctly") {
-    cacheManager = sc.env.cacheManager
-    val context = TaskContext.empty()
-    try {
-      TaskContext.setTaskContext(context)
-      sc.env.blockManager.registerTask(0)
-      cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
-      assert(context.taskMetrics.updatedBlockStatuses.size === 2)
-    } finally {
-      TaskContext.unset()
-    }
-  }
-}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index 662b18f667b0d695432cd042c1b27eff0ff9e045..fe83fc722a8e8f48d3a5077dc96085a7b8c8a44d 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -80,10 +80,18 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
     withTaskId(1) {
       assert(blockInfoManager.lockNewBlockForWriting("block", blockInfo))
       assert(blockInfoManager.get("block").get eq blockInfo)
-      assert(!blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
-      assert(blockInfoManager.get("block").get eq blockInfo)
       assert(blockInfo.readerCount === 0)
       assert(blockInfo.writerTask === 1)
+      // Downgrade lock so that second call doesn't block:
+      blockInfoManager.downgradeLock("block")
+      assert(blockInfo.readerCount === 1)
+      assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
+      assert(!blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      assert(blockInfo.readerCount === 2)
+      assert(blockInfoManager.get("block").get eq blockInfo)
+      assert(blockInfo.readerCount === 2)
+      assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
+      blockInfoManager.unlock("block")
       blockInfoManager.unlock("block")
       assert(blockInfo.readerCount === 0)
       assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
@@ -92,6 +100,67 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
     assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries + 1)
   }
 
+  test("lockNewBlockForWriting blocks while write lock is held, then returns false after release") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+    }
+    val lock1Future = Future {
+      withTaskId(1) {
+        blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+      }
+    }
+    val lock2Future = Future {
+      withTaskId(2) {
+        blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+      }
+    }
+    Thread.sleep(300)  // Hack to try to ensure that both future tasks are waiting
+    withTaskId(0) {
+      blockInfoManager.downgradeLock("block")
+    }
+    // After downgrading to a read lock, both threads should wake up and acquire the shared
+    // read lock.
+    assert(!Await.result(lock1Future, 1.seconds))
+    assert(!Await.result(lock2Future, 1.seconds))
+    assert(blockInfoManager.get("block").get.readerCount === 3)
+  }
+
+  test("lockNewBlockForWriting blocks while write lock is held, then returns true after removal") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+    }
+    val lock1Future = Future {
+      withTaskId(1) {
+        blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+      }
+    }
+    val lock2Future = Future {
+      withTaskId(2) {
+        blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
+      }
+    }
+    Thread.sleep(300)  // Hack to try to ensure that both future tasks are waiting
+    withTaskId(0) {
+      blockInfoManager.removeBlock("block")
+    }
+    // After removing the block, the write lock is released. Both threads should wake up but only
+    // one should acquire the write lock. The second thread should block until the winner of the
+    // write race releases its lock.
+    val winningFuture: Future[Boolean] =
+      Await.ready(Future.firstCompletedOf(Seq(lock1Future, lock2Future)), 1.seconds)
+    assert(winningFuture.value.get.get)
+    val winningTID = blockInfoManager.get("block").get.writerTask
+    assert(winningTID === 1 || winningTID === 2)
+    val losingFuture: Future[Boolean] = if (winningTID == 1) lock2Future else lock1Future
+    assert(!losingFuture.isCompleted)
+    // Once the writer releases its lock, the blocked future should wake up again and complete.
+    withTaskId(winningTID) {
+      blockInfoManager.unlock("block")
+    }
+    assert(!Await.result(losingFuture, 1.seconds))
+    assert(blockInfoManager.get("block").get.readerCount === 1)
+  }
+
   test("read locks are reentrant") {
     withTaskId(1) {
       assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index a94d8b424d95639c065ecc9515d4638bfbe91a63..ae1faf5d98f38e59611e16c3453bed3e6c05dd2b 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -190,7 +190,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
 
     def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = {
       stores.head.putSingle(blockId, new Array[Byte](blockSize), level)
-      stores.head.releaseLock(blockId)
       val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet
       stores.foreach { _.removeBlock(blockId) }
       master.removeBlock(blockId)
@@ -252,7 +251,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
     // Insert a block with 2x replication and return the number of copies of the block
     def replicateAndGetNumCopies(blockId: String): Int = {
       store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2)
-      store.releaseLock(blockId)
       val numLocations = master.getLocations(blockId).size
       allStores.foreach { _.removeBlock(blockId) }
       numLocations
@@ -290,7 +288,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
     def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = {
       val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
       initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
-      initialStores.head.releaseLock(blockId)
       val numLocations = master.getLocations(blockId).size
       allStores.foreach { _.removeBlock(blockId) }
       numLocations
@@ -358,7 +355,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
       val blockId = new TestBlockId(
         "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
       stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
-      stores(0).releaseLock(blockId)
 
       // Assert that master know two locations for the block
       val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
@@ -397,7 +393,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
           (1 to 10).foreach {
             i =>
               testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER)
-              testStore.releaseLock(s"dummy-block-$i")
           }
           (1 to 10).foreach {
             i => testStore.removeBlock(s"dummy-block-$i")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index e4ab9ee0ebb38b41740d98edf31697e50603e8b6..89b427049b5488ba2327fc8c88adc970b8deb459 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -172,9 +172,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
 
     // Putting a1, a2  and a3 in memory and telling master only about a1 and a2
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // Checking whether blocks are in memory
     assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
@@ -205,7 +205,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_2)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
     store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
     assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
     assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
@@ -218,9 +218,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
 
     // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
-    store.putSingleAndReleaseLock("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // Checking whether blocks are in memory and memory size
     val memStatus = master.getMemoryStatus.head._2
@@ -265,9 +265,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
     // Putting a1, a2 and a3 in memory.
-    store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
     master.removeRdd(0, blocking = false)
 
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
@@ -283,8 +283,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       master.getLocations("nonrddblock") should have size (1)
     }
 
-    store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
     master.removeRdd(0, blocking = true)
     store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
     master.getLocations(rdd(0, 0)) should have size 0
@@ -308,10 +308,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // insert broadcast blocks in both the stores
     Seq(driverStore, executorStore).foreach { case s =>
-      s.putSingleAndReleaseLock(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
-      s.putSingleAndReleaseLock(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
-      s.putSingleAndReleaseLock(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
-      s.putSingleAndReleaseLock(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
+      s.putSingle(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
+      s.putSingle(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
+      s.putSingle(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
+      s.putSingle(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
     }
 
     // verify whether the blocks exist in both the stores
@@ -366,7 +366,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
 
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
 
     assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
@@ -384,13 +384,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
 
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
 
     master.removeExecutor(store.blockManagerId.executorId)
     assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
 
-    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
     store.waitForAsyncReregister()
 
     assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
@@ -407,13 +407,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       master.removeExecutor(store.blockManagerId.executorId)
       val t1 = new Thread {
         override def run() {
-          store.putIteratorAndReleaseLock(
+          store.putIterator(
             "a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
         }
       }
       val t2 = new Thread {
         override def run() {
-          store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+          store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
         }
       }
       val t3 = new Thread {
@@ -441,11 +441,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500))
     val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
     val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val list1Get = store.get("list1")
     assert(list1Get.isDefined, "list1 expected to be in store")
@@ -486,9 +486,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store2 = makeBlockManager(8000, "executor2")
     store3 = makeBlockManager(8000, "executor3")
     val list1 = List(new Array[Byte](4000))
-    store2.putIteratorAndReleaseLock(
+    store2.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store3.putIteratorAndReleaseLock(
+    store3.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
     store2.stop()
@@ -515,15 +515,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingleAndReleaseLock("a1", a1, storageLevel)
-    store.putSingleAndReleaseLock("a2", a2, storageLevel)
-    store.putSingleAndReleaseLock("a3", a3, storageLevel)
+    store.putSingle("a1", a1, storageLevel)
+    store.putSingle("a2", a2, storageLevel)
+    store.putSingle("a3", a3, storageLevel)
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
     assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     // At this point a2 was gotten last, so LRU will getSingle rid of a3
-    store.putSingleAndReleaseLock("a1", a1, storageLevel)
+    store.putSingle("a1", a1, storageLevel)
     assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     assert(store.getSingleAndReleaseLock("a3") === None, "a3 was in store")
@@ -534,9 +534,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingleAndReleaseLock(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
     // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
     // from the same RDD
     assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
@@ -550,9 +550,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("in-memory LRU for partitions of multiple RDDs") {
     store = makeBlockManager(12000)
-    store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // At this point rdd_1_1 should've replaced rdd_0_1
     assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
     assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
@@ -560,8 +560,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // Do a get() on rdd_0_2 so that it is the most recently used item
     assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
     // Put in more partitions from RDD 0; they should replace rdd_1_1
-    store.putSingleAndReleaseLock(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
     // when we try to add rdd_0_4.
     assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
@@ -576,9 +576,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
-    store.putSingleAndReleaseLock("a2", a2, StorageLevel.DISK_ONLY)
-    store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
+    store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
+    store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
+    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was in store")
     assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was in store")
     assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was in store")
@@ -607,9 +607,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingleAndReleaseLock("a1", a1, storageLevel)
-    store.putSingleAndReleaseLock("a2", a2, storageLevel)
-    store.putSingleAndReleaseLock("a3", a3, storageLevel)
+    store.putSingle("a1", a1, storageLevel)
+    store.putSingle("a2", a2, storageLevel)
+    store.putSingle("a3", a3, storageLevel)
     assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
     assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
     assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
@@ -624,15 +624,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
     val a4 = new Array[Byte](4000)
     // First store a1 and a2, both in memory, and a3, on disk only
-    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_SER)
-    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY_SER)
-    store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
     // At this point LRU should not kick in because a3 is only on disk
     assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
     // Now let's add in a4, which uses both disk and memory; a1 should drop out
-    store.putSingleAndReleaseLock("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
+    store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
     assert(store.getSingleAndReleaseLock("a1") == None, "a1 was in store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
@@ -644,11 +644,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
@@ -658,7 +658,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
     // At this point list2 was gotten last, so LRU will getSingle rid of list3
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
     assert(store.get("list1").get.data.size === 2)
@@ -674,11 +674,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
     // First store list1 and list2, both in memory, and list3, on disk only
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val listForSizeEstimate = new ArrayBuffer[Any]
     listForSizeEstimate ++= list1.iterator
@@ -697,7 +697,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
     assert(store.get("list3").get.data.size === 2)
     // Now let's add in list4, which uses both disk and memory; list1 should drop out
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
     assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
     assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
@@ -722,9 +722,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("overly large block") {
     store = makeBlockManager(5000)
-    store.putSingleAndReleaseLock("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+    store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
     assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
-    store.putSingleAndReleaseLock("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
+    store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
     assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
     assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
   }
@@ -733,7 +733,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     try {
       conf.set("spark.shuffle.compress", "true")
       store = makeBlockManager(20000, "exec1")
-      store.putSingleAndReleaseLock(
+      store.putSingle(
         ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
         "shuffle_0_0_0 was not compressed")
@@ -742,7 +742,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.shuffle.compress", "false")
       store = makeBlockManager(20000, "exec2")
-      store.putSingleAndReleaseLock(
+      store.putSingle(
         ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
         "shuffle_0_0_0 was compressed")
@@ -751,7 +751,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.broadcast.compress", "true")
       store = makeBlockManager(20000, "exec3")
-      store.putSingleAndReleaseLock(
+      store.putSingle(
         BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
         "broadcast_0 was not compressed")
@@ -760,7 +760,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.broadcast.compress", "false")
       store = makeBlockManager(20000, "exec4")
-      store.putSingleAndReleaseLock(
+      store.putSingle(
         BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed")
       store.stop()
@@ -768,21 +768,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.rdd.compress", "true")
       store = makeBlockManager(20000, "exec5")
-      store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "false")
       store = makeBlockManager(20000, "exec6")
-      store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed")
       store.stop()
       store = null
 
       // Check that any other block types are also kept uncompressed
       store = makeBlockManager(20000, "exec7")
-      store.putSingleAndReleaseLock("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+      store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
       assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed")
       store.stop()
       store = null
@@ -810,7 +810,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     class UnserializableClass
     val a1 = new UnserializableClass
     intercept[java.io.NotSerializableException] {
-      store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
+      store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
     }
 
     // Make sure get a1 doesn't hang and returns None.
@@ -882,7 +882,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 1 updated block (i.e. list1)
     val updatedBlocks1 = getUpdatedBlocks {
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks1.size === 1)
@@ -891,7 +891,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 1 updated block (i.e. list2)
     val updatedBlocks2 = getUpdatedBlocks {
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
     }
     assert(updatedBlocks2.size === 1)
@@ -900,7 +900,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 2 updated blocks - list1 is kicked out of memory while list3 is added
     val updatedBlocks3 = getUpdatedBlocks {
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks3.size === 2)
@@ -915,7 +915,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
     val updatedBlocks4 = getUpdatedBlocks {
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks4.size === 2)
@@ -931,7 +931,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // No updated blocks - list5 is too big to fit in store and nothing is kicked out
     val updatedBlocks5 = getUpdatedBlocks {
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         "list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks5.size === 0)
@@ -956,11 +956,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list = List.fill(2)(new Array[Byte](2000))
 
     // Tell master. By LRU, only list2 and list3 remains.
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
@@ -975,11 +975,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
 
     // This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // getLocations should return nothing because the master is not informed
@@ -1001,11 +1001,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list = List.fill(2)(new Array[Byte](100))
 
     // insert some blocks
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
@@ -1015,11 +1015,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       === 1)
 
     // insert some more blocks
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-    store.putIteratorAndReleaseLock(
+    store.putIterator(
       "newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
 
     // getLocations and getBlockStatus should yield the same locations
@@ -1030,7 +1030,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
     blockIds.foreach { blockId =>
-      store.putIteratorAndReleaseLock(
+      store.putIterator(
         blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
@@ -1042,12 +1042,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
     store = makeBlockManager(12000)
-    store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingleAndReleaseLock(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Access rdd_1_0 to ensure it's not least recently used.
     assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
     // According to the same-RDD rule, rdd_1_0 should be replaced here.
-    store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // rdd_1_0 should have been replaced, even it's not least recently used.
     assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
     assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
@@ -1126,8 +1126,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     memoryStore.releasePendingUnrollMemoryForThisTask()
 
     // Unroll with not enough space. This should succeed after kicking out someBlock1.
-    store.putIteratorAndReleaseLock("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
-    store.putIteratorAndReleaseLock("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
     unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
     verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
@@ -1138,7 +1138,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
     // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
     // In the mean time, however, we kicked out someBlock2 before giving up.
-    store.putIteratorAndReleaseLock("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
     unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator)
     verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
@@ -1170,8 +1170,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // would not know how to drop them from memory later.
     memoryStore.remove("b1")
     memoryStore.remove("b2")
-    store.putIteratorAndReleaseLock("b1", smallIterator, memOnly)
-    store.putIteratorAndReleaseLock("b2", smallIterator, memOnly)
+    store.putIterator("b1", smallIterator, memOnly)
+    store.putIterator("b2", smallIterator, memOnly)
 
     // Unroll with not enough space. This should succeed but kick out b1 in the process.
     val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
@@ -1182,7 +1182,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(memoryStore.contains("b3"))
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     memoryStore.remove("b3")
-    store.putIteratorAndReleaseLock("b3", smallIterator, memOnly)
+    store.putIterator("b3", smallIterator, memOnly)
 
     // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
     val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
@@ -1209,8 +1209,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
-    store.putIteratorAndReleaseLock("b1", smallIterator, memAndDisk)
-    store.putIteratorAndReleaseLock("b2", smallIterator, memAndDisk)
+    store.putIterator("b1", smallIterator, memAndDisk)
+    store.putIterator("b2", smallIterator, memAndDisk)
 
     // Unroll with not enough space. This should succeed but kick out b1 in the process.
     // Memory store should contain b2 and b3, while disk store should contain only b1
@@ -1223,7 +1223,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(!diskStore.contains("b2"))
     assert(!diskStore.contains("b3"))
     memoryStore.remove("b3")
-    store.putIteratorAndReleaseLock("b3", smallIterator, StorageLevel.MEMORY_ONLY)
+    store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll huge block with not enough space. This should fail and drop the new block to disk
@@ -1310,12 +1310,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store = makeBlockManager(12000)
     val arr = new Array[Byte](4000)
     // First store a1 and a2, both in memory, and a3, on disk only
-    store.putSingleAndReleaseLock("a1", arr, StorageLevel.MEMORY_ONLY_SER)
-    store.putSingleAndReleaseLock("a2", arr, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a1", arr, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a2", arr, StorageLevel.MEMORY_ONLY_SER)
     assert(store.getSingle("a1").isDefined, "a1 was not in store")
     assert(store.getSingle("a2").isDefined, "a2 was not in store")
     // This put should fail because both a1 and a2 should be read-locked:
-    store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER)
     assert(store.getSingle("a3").isEmpty, "a3 was in store")
     assert(store.getSingle("a1").isDefined, "a1 was not in store")
     assert(store.getSingle("a2").isDefined, "a2 was not in store")
@@ -1324,7 +1324,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store.releaseLock("a2")
     // Block a1 is the least-recently accessed, so an LRU eviction policy would evict it before
     // block a2. However, a1 is still pinned so this put of a3 should evict a2 instead:
-    store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER)
     assert(store.getSingle("a2").isEmpty, "a2 was in store")
     assert(store.getSingle("a1").isDefined, "a1 was not in store")
     assert(store.getSingle("a3").isDefined, "a3 was not in store")
@@ -1335,41 +1335,6 @@ private object BlockManagerSuite {
 
   private implicit class BlockManagerTestUtils(store: BlockManager) {
 
-    def putSingleAndReleaseLock(
-        block: BlockId,
-        value: Any,
-        storageLevel: StorageLevel,
-        tellMaster: Boolean): Unit = {
-      if (store.putSingle(block, value, storageLevel, tellMaster)) {
-        store.releaseLock(block)
-      }
-    }
-
-    def putSingleAndReleaseLock(block: BlockId, value: Any, storageLevel: StorageLevel): Unit = {
-      if (store.putSingle(block, value, storageLevel)) {
-        store.releaseLock(block)
-      }
-    }
-
-    def putIteratorAndReleaseLock(
-        blockId: BlockId,
-        values: Iterator[Any],
-        level: StorageLevel): Unit = {
-      if (store.putIterator(blockId, values, level)) {
-        store.releaseLock(blockId)
-      }
-    }
-
-    def putIteratorAndReleaseLock(
-        blockId: BlockId,
-        values: Iterator[Any],
-        level: StorageLevel,
-        tellMaster: Boolean): Unit = {
-      if (store.putIterator(blockId, values, level, tellMaster)) {
-        store.releaseLock(blockId)
-      }
-    }
-
     def dropFromMemoryIfExists(
         blockId: BlockId,
         data: () => Either[Array[Any], ByteBuffer]): Unit = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 3d9c085013ff58fc4b0cfee2e2f30058e62133c4..e22e320b171269aa00d3ca67ccba5edf9738bc2d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -91,8 +91,6 @@ private[streaming] class BlockManagerBasedBlockHandler(
     if (!putSucceeded) {
       throw new SparkException(
         s"Could not store $blockId to block manager with storage level $storageLevel")
-    } else {
-      blockManager.releaseLock(blockId)
     }
     BlockManagerBasedStoreResult(blockId, numRecords)
   }
@@ -191,8 +189,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
       if (!putSucceeded) {
         throw new SparkException(
           s"Could not store $blockId to block manager with storage level $storageLevel")
-      } else {
-        blockManager.releaseLock(blockId)
       }
     }