diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 1ec9ba77557254e085e61efb35bdc56a54c03736..2b456facd9439afce1095b8c0b9c34e43c5f4ca9 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage._
+import org.apache.spark.util.CompletionIterator
 
 /**
  * Spark class responsible for passing RDDs partition contents to the BlockManager and making
@@ -47,6 +48,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
         existingMetrics.incBytesReadInternal(blockResult.bytes)
 
         val iter = blockResult.data.asInstanceOf[Iterator[T]]
+
         new InterruptibleIterator[T](context, iter) {
           override def next(): T = {
             existingMetrics.incRecordsReadInternal(1)
@@ -156,7 +158,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
         case Left(arr) =>
           // We have successfully unrolled the entire partition, so cache it in memory
           blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
-          arr.iterator.asInstanceOf[Iterator[T]]
+          CompletionIterator[T, Iterator[T]](
+            arr.iterator.asInstanceOf[Iterator[T]],
+            blockManager.releaseLock(key))
         case Right(it) =>
           // There is not enough space to cache this partition in memory
           val returnValues = it.asInstanceOf[Iterator[T]]
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 9bd69727f60864bedcc731a2eb287ecc3ad6d365..c08f87a8b45c10ac0438e2bb502ab768f915f001 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -24,10 +24,10 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.util.Random
 
-import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
+import org.apache.spark._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
+import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel}
 import org.apache.spark.util.{ByteBufferInputStream, Utils}
 import org.apache.spark.util.io.ByteArrayChunkOutputStream
 
@@ -90,22 +90,29 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
 
   /**
    * Divide the object into multiple blocks and put those blocks in the block manager.
+   *
    * @param value the object to divide
    * @return number of blocks this broadcast variable is divided into
    */
   private def writeBlocks(value: T): Int = {
+    import StorageLevel._
     // Store a copy of the broadcast variable in the driver so that tasks run on the driver
     // do not create a duplicate copy of the broadcast variable's value.
-    SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,
-      tellMaster = false)
+    val blockManager = SparkEnv.get.blockManager
+    if (blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
+      blockManager.releaseLock(broadcastId)
+    } else {
+      throw new SparkException(s"Failed to store $broadcastId in BlockManager")
+    }
     val blocks =
       TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
     blocks.zipWithIndex.foreach { case (block, i) =>
-      SparkEnv.get.blockManager.putBytes(
-        BroadcastBlockId(id, "piece" + i),
-        block,
-        StorageLevel.MEMORY_AND_DISK_SER,
-        tellMaster = true)
+      val pieceId = BroadcastBlockId(id, "piece" + i)
+      if (blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
+        blockManager.releaseLock(pieceId)
+      } else {
+        throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
+      }
     }
     blocks.length
   }
@@ -127,16 +134,18 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
       def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
         // If we found the block from remote executors/driver's BlockManager, put the block
         // in this executor's BlockManager.
-        SparkEnv.get.blockManager.putBytes(
-          pieceId,
-          block,
-          StorageLevel.MEMORY_AND_DISK_SER,
-          tellMaster = true)
+        if (!bm.putBytes(pieceId, block, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
+          throw new SparkException(
+            s"Failed to store $pieceId of $broadcastId in local BlockManager")
+        }
         block
       }
       val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
         throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
+      // At this point we are guaranteed to hold a read lock, since we either got the block locally
+      // or stored the remotely-fetched block and automatically downgraded the write lock.
       blocks(pid) = block
+      releaseLock(pieceId)
     }
     blocks
   }
@@ -165,8 +174,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
     TorrentBroadcast.synchronized {
       setConf(SparkEnv.get.conf)
-      SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
+      val blockManager = SparkEnv.get.blockManager
+      blockManager.getLocal(broadcastId).map(_.data.next()) match {
         case Some(x) =>
+          releaseLock(broadcastId)
           x.asInstanceOf[T]
 
         case None =>
@@ -179,13 +190,36 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
             blocks, SparkEnv.get.serializer, compressionCodec)
           // Store the merged copy in BlockManager so other tasks on this executor don't
           // need to re-fetch it.
-          SparkEnv.get.blockManager.putSingle(
-            broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+          val storageLevel = StorageLevel.MEMORY_AND_DISK
+          if (blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
+            releaseLock(broadcastId)
+          } else {
+            throw new SparkException(s"Failed to store $broadcastId in BlockManager")
+          }
           obj
       }
     }
   }
 
+  /**
+   * If running in a task, register the given block's locks for release upon task completion.
+   * Otherwise, if not running in a task then immediately release the lock.
+   */
+  private def releaseLock(blockId: BlockId): Unit = {
+    val blockManager = SparkEnv.get.blockManager
+    Option(TaskContext.get()) match {
+      case Some(taskContext) =>
+        taskContext.addTaskCompletionListener(_ => blockManager.releaseLock(blockId))
+      case None =>
+        // This should only happen on the driver, where broadcast variables may be accessed
+        // outside of running tasks (e.g. when computing rdd.partitions()). In order to allow
+        // broadcast variables to be garbage collected we need to free the reference here
+        // which is slightly unsafe but is technically okay because broadcast variables aren't
+        // stored off-heap.
+        blockManager.releaseLock(blockId)
+    }
+  }
+
 }
 
 
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 00be3a240dbacf448402fa20d1415ffeb9697c7e..a602fcac68a6b78b10ae4db12441638949b438b8 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -218,7 +218,9 @@ private[spark] class Executor(
           threwException = false
           res
         } finally {
+          val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
           val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
+
           if (freedMemory > 0) {
             val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
             if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
@@ -227,6 +229,17 @@ private[spark] class Executor(
               logError(errMsg)
             }
           }
+
+          if (releasedLocks.nonEmpty) {
+            val errMsg =
+              s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
+                releasedLocks.mkString("[", ", ", "]")
+            if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && !threwException) {
+              throw new SparkException(errMsg)
+            } else {
+              logError(errMsg)
+            }
+          }
         }
         val taskFinish = System.currentTimeMillis()
 
@@ -266,8 +279,11 @@ private[spark] class Executor(
             ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
           } else if (resultSize >= maxRpcMessageSize) {
             val blockId = TaskResultBlockId(taskId)
-            env.blockManager.putBytes(
+            val putSucceeded = env.blockManager.putBytes(
               blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
+            if (putSucceeded) {
+              env.blockManager.releaseLock(blockId)
+            }
             logInfo(
               s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
             ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
index 1745d52c819233aa4b88cbe8ffeb9c5fa7ea27c0..cc5e851c29b325035f31c6311a656c97579d428e 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
@@ -31,6 +31,14 @@ trait BlockDataManager {
 
   /**
    * Put the block locally, using the given storage level.
+   *
+   * Returns true if the block was stored and false if the put operation failed or the block
+   * already existed.
    */
-  def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Unit
+  def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean
+
+  /**
+   * Release locks acquired by [[putBlockData()]] and [[getBlockData()]].
+   */
+  def releaseLock(blockId: BlockId): Unit
 }
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index df8c21fb837ed6f3fdc0b2812d4dbed1a644ff49..e4246df83a6ec24fb9c6836cf6c2d3736c751906 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -65,7 +65,11 @@ class NettyBlockRpcServer(
         val level: StorageLevel =
           serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata))
         val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
-        blockManager.putBlockData(BlockId(uploadBlock.blockId), data, level)
+        val blockId = BlockId(uploadBlock.blockId)
+        val putSucceeded = blockManager.putBlockData(blockId, data, level)
+        if (putSucceeded) {
+          blockManager.releaseLock(blockId)
+        }
         responseContext.onSuccess(ByteBuffer.allocate(0))
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index a49f3716e2702e9ca09f205dce24f28fd878f5bd..5c68d001f280f40cb2ab43755644890e2f92deab 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -64,6 +64,7 @@ private[spark] abstract class Task[T](
       taskAttemptId: Long,
       attemptNumber: Int,
       metricsSystem: MetricsSystem): T = {
+    SparkEnv.get.blockManager.registerTask(taskAttemptId)
     context = new TaskContextImpl(
       stageId,
       partitionId,
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
deleted file mode 100644
index 22fdf73e9d1f47270063652ba4bf9bb6069321fc..0000000000000000000000000000000000000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.util.concurrent.ConcurrentHashMap
-
-private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
-  // To save space, 'pending' and 'failed' are encoded as special sizes:
-  @volatile var size: Long = BlockInfo.BLOCK_PENDING
-  private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
-  private def failed: Boolean = size == BlockInfo.BLOCK_FAILED
-  private def initThread: Thread = BlockInfo.blockInfoInitThreads.get(this)
-
-  setInitThread()
-
-  private def setInitThread() {
-    /* Set current thread as init thread - waitForReady will not block this thread
-     * (in case there is non trivial initialization which ends up calling waitForReady
-     * as part of initialization itself) */
-    BlockInfo.blockInfoInitThreads.put(this, Thread.currentThread())
-  }
-
-  /**
-   * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
-   * Return true if the block is available, false otherwise.
-   */
-  def waitForReady(): Boolean = {
-    if (pending && initThread != Thread.currentThread()) {
-      synchronized {
-        while (pending) {
-          this.wait()
-        }
-      }
-    }
-    !failed
-  }
-
-  /** Mark this BlockInfo as ready (i.e. block is finished writing) */
-  def markReady(sizeInBytes: Long) {
-    require(sizeInBytes >= 0, s"sizeInBytes was negative: $sizeInBytes")
-    assert(pending)
-    size = sizeInBytes
-    BlockInfo.blockInfoInitThreads.remove(this)
-    synchronized {
-      this.notifyAll()
-    }
-  }
-
-  /** Mark this BlockInfo as ready but failed */
-  def markFailure() {
-    assert(pending)
-    size = BlockInfo.BLOCK_FAILED
-    BlockInfo.blockInfoInitThreads.remove(this)
-    synchronized {
-      this.notifyAll()
-    }
-  }
-}
-
-private object BlockInfo {
-  /* initThread is logically a BlockInfo field, but we store it here because
-   * it's only needed while this block is in the 'pending' state and we want
-   * to minimize BlockInfo's memory footprint. */
-  private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
-
-  private val BLOCK_PENDING: Long = -1L
-  private val BLOCK_FAILED: Long = -2L
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
new file mode 100644
index 0000000000000000000000000000000000000000..0eda97e58d451b8d6f3b94069edb0edb6a48890a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.google.common.collect.ConcurrentHashMultiset
+
+import org.apache.spark.{Logging, SparkException, TaskContext}
+
+
+/**
+ * Tracks metadata for an individual block.
+ *
+ * Instances of this class are _not_ thread-safe and are protected by locks in the
+ * [[BlockInfoManager]].
+ *
+ * @param level the block's storage level. This is the requested persistence level, not the
+ *              effective storage level of the block (i.e. if this is MEMORY_AND_DISK, then this
+ *              does not imply that the block is actually resident in memory).
+ * @param tellMaster whether state changes for this block should be reported to the master. This
+ *                   is true for most blocks, but is false for broadcast blocks.
+ */
+private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
+
+  /**
+   * The size of the block (in bytes)
+   */
+  def size: Long = _size
+  def size_=(s: Long): Unit = {
+    _size = s
+    checkInvariants()
+  }
+  private[this] var _size: Long = 0
+
+  /**
+   * The number of times that this block has been locked for reading.
+   */
+  def readerCount: Int = _readerCount
+  def readerCount_=(c: Int): Unit = {
+    _readerCount = c
+    checkInvariants()
+  }
+  private[this] var _readerCount: Int = 0
+
+  /**
+   * The task attempt id of the task which currently holds the write lock for this block, or
+   * [[BlockInfo.NON_TASK_WRITER]] if the write lock is held by non-task code, or
+   * [[BlockInfo.NO_WRITER]] if this block is not locked for writing.
+   */
+  def writerTask: Long = _writerTask
+  def writerTask_=(t: Long): Unit = {
+    _writerTask = t
+    checkInvariants()
+  }
+  private[this] var _writerTask: Long = 0
+
+  /**
+   * True if this block has been removed from the BlockManager and false otherwise.
+   * This field is used to communicate block deletion to blocked readers / writers (see its usage
+   * in [[BlockInfoManager]]).
+   */
+  def removed: Boolean = _removed
+  def removed_=(r: Boolean): Unit = {
+    _removed = r
+    checkInvariants()
+  }
+  private[this] var _removed: Boolean = false
+
+  private def checkInvariants(): Unit = {
+    // A block's reader count must be non-negative:
+    assert(_readerCount >= 0)
+    // A block is either locked for reading or for writing, but not for both at the same time:
+    assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER)
+    // If a block is removed then it is not locked:
+    assert(!_removed || (_readerCount == 0 && _writerTask == BlockInfo.NO_WRITER))
+  }
+
+  checkInvariants()
+}
+
+private[storage] object BlockInfo {
+
+  /**
+   * Special task attempt id constant used to mark a block's write lock as being unlocked.
+   */
+  val NO_WRITER: Long = -1
+
+  /**
+   * Special task attempt id constant used to mark a block's write lock as being held by
+   * a non-task thread (e.g. by a driver thread or by unit test code).
+   */
+  val NON_TASK_WRITER: Long = -1024
+}
+
+/**
+ * Component of the [[BlockManager]] which tracks metadata for blocks and manages block locking.
+ *
+ * The locking interface exposed by this class is readers-writer lock. Every lock acquisition is
+ * automatically associated with a running task and locks are automatically released upon task
+ * completion or failure.
+ *
+ * This class is thread-safe.
+ */
+private[storage] class BlockInfoManager extends Logging {
+
+  private type TaskAttemptId = Long
+
+  /**
+   * Used to look up metadata for individual blocks. Entries are added to this map via an atomic
+   * set-if-not-exists operation ([[lockNewBlockForWriting()]]) and are removed
+   * by [[removeBlock()]].
+   */
+  @GuardedBy("this")
+  private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]
+
+  /**
+   * Tracks the set of blocks that each task has locked for writing.
+   */
+  @GuardedBy("this")
+  private[this] val writeLocksByTask =
+    new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]]
+      with mutable.MultiMap[TaskAttemptId, BlockId]
+
+  /**
+   * Tracks the set of blocks that each task has locked for reading, along with the number of times
+   * that a block has been locked (since our read locks are re-entrant).
+   */
+  @GuardedBy("this")
+  private[this] val readLocksByTask =
+    new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]
+
+  // ----------------------------------------------------------------------------------------------
+
+  // Initialization for special task attempt ids:
+  registerTask(BlockInfo.NON_TASK_WRITER)
+
+  // ----------------------------------------------------------------------------------------------
+
+  /**
+   * Called at the start of a task in order to register that task with this [[BlockInfoManager]].
+   * This must be called prior to calling any other BlockInfoManager methods from that task.
+   */
+  def registerTask(taskAttemptId: TaskAttemptId): Unit = synchronized {
+    require(!readLocksByTask.contains(taskAttemptId),
+      s"Task attempt $taskAttemptId is already registered")
+    readLocksByTask(taskAttemptId) = ConcurrentHashMultiset.create()
+  }
+
+  /**
+   * Returns the current task's task attempt id (which uniquely identifies the task), or
+   * [[BlockInfo.NON_TASK_WRITER]] if called by a non-task thread.
+   */
+  private def currentTaskAttemptId: TaskAttemptId = {
+    Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(BlockInfo.NON_TASK_WRITER)
+  }
+
+  /**
+   * Lock a block for reading and return its metadata.
+   *
+   * If another task has already locked this block for reading, then the read lock will be
+   * immediately granted to the calling task and its lock count will be incremented.
+   *
+   * If another task has locked this block for writing, then this call will block until the write
+   * lock is released or will return immediately if `blocking = false`.
+   *
+   * A single task can lock a block multiple times for reading, in which case each lock will need
+   * to be released separately.
+   *
+   * @param blockId the block to lock.
+   * @param blocking if true (default), this call will block until the lock is acquired. If false,
+   *                 this call will return immediately if the lock acquisition fails.
+   * @return None if the block did not exist or was removed (in which case no lock is held), or
+   *         Some(BlockInfo) (in which case the block is locked for reading).
+   */
+  def lockForReading(
+      blockId: BlockId,
+      blocking: Boolean = true): Option[BlockInfo] = synchronized {
+    logTrace(s"Task $currentTaskAttemptId trying to acquire read lock for $blockId")
+    infos.get(blockId).map { info =>
+      while (info.writerTask != BlockInfo.NO_WRITER) {
+        if (blocking) wait() else return None
+      }
+      if (info.removed) return None
+      info.readerCount += 1
+      readLocksByTask(currentTaskAttemptId).add(blockId)
+      logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
+      info
+    }
+  }
+
+  /**
+   * Lock a block for writing and return its metadata.
+   *
+   * If another task has already locked this block for either reading or writing, then this call
+   * will block until the other locks are released or will return immediately if `blocking = false`.
+   *
+   * If this is called by a task which already holds the block's exclusive write lock, then this
+   * method will throw an exception.
+   *
+   * @param blockId the block to lock.
+   * @param blocking if true (default), this call will block until the lock is acquired. If false,
+   *                 this call will return immediately if the lock acquisition fails.
+   * @return None if the block did not exist or was removed (in which case no lock is held), or
+   *         Some(BlockInfo) (in which case the block is locked for writing).
+   */
+  def lockForWriting(
+      blockId: BlockId,
+      blocking: Boolean = true): Option[BlockInfo] = synchronized {
+    logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId")
+    infos.get(blockId).map { info =>
+      if (info.writerTask == currentTaskAttemptId) {
+        throw new IllegalStateException(
+          s"Task $currentTaskAttemptId has already locked $blockId for writing")
+      } else {
+        while (info.writerTask != BlockInfo.NO_WRITER || info.readerCount != 0) {
+          if (blocking) wait() else return None
+        }
+        if (info.removed) return None
+      }
+      info.writerTask = currentTaskAttemptId
+      writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
+      logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
+      info
+    }
+  }
+
+  /**
+   * Throws an exception if the current task does not hold a write lock on the given block.
+   * Otherwise, returns the block's BlockInfo.
+   */
+  def assertBlockIsLockedForWriting(blockId: BlockId): BlockInfo = synchronized {
+    infos.get(blockId) match {
+      case Some(info) =>
+        if (info.writerTask != currentTaskAttemptId) {
+          throw new SparkException(
+            s"Task $currentTaskAttemptId has not locked block $blockId for writing")
+        } else {
+          info
+        }
+      case None =>
+        throw new SparkException(s"Block $blockId does not exist")
+    }
+  }
+
+  /**
+   * Get a block's metadata without acquiring any locks. This method is only exposed for use by
+   * [[BlockManager.getStatus()]] and should not be called by other code outside of this class.
+   */
+  private[storage] def get(blockId: BlockId): Option[BlockInfo] = synchronized {
+    infos.get(blockId)
+  }
+
+  /**
+   * Downgrades an exclusive write lock to a shared read lock.
+   */
+  def downgradeLock(blockId: BlockId): Unit = synchronized {
+    logTrace(s"Task $currentTaskAttemptId downgrading write lock for $blockId")
+    val info = get(blockId).get
+    require(info.writerTask == currentTaskAttemptId,
+      s"Task $currentTaskAttemptId tried to downgrade a write lock that it does not hold on" +
+        s" block $blockId")
+    unlock(blockId)
+    val lockOutcome = lockForReading(blockId, blocking = false)
+    assert(lockOutcome.isDefined)
+  }
+
+  /**
+   * Release a lock on the given block.
+   */
+  def unlock(blockId: BlockId): Unit = synchronized {
+    logTrace(s"Task $currentTaskAttemptId releasing lock for $blockId")
+    val info = get(blockId).getOrElse {
+      throw new IllegalStateException(s"Block $blockId not found")
+    }
+    if (info.writerTask != BlockInfo.NO_WRITER) {
+      info.writerTask = BlockInfo.NO_WRITER
+      writeLocksByTask.removeBinding(currentTaskAttemptId, blockId)
+    } else {
+      assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
+      info.readerCount -= 1
+      val countsForTask = readLocksByTask(currentTaskAttemptId)
+      val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
+      assert(newPinCountForTask >= 0,
+        s"Task $currentTaskAttemptId release lock on block $blockId more times than it acquired it")
+    }
+    notifyAll()
+  }
+
+  /**
+   * Atomically create metadata for a block and acquire a write lock for it, if it doesn't already
+   * exist.
+   *
+   * @param blockId the block id.
+   * @param newBlockInfo the block info for the new block.
+   * @return true if the block did not already exist, false otherwise. If this returns false, then
+   *         no new locks are acquired. If this returns true, a write lock on the new block will
+   *         be held.
+   */
+  def lockNewBlockForWriting(
+      blockId: BlockId,
+      newBlockInfo: BlockInfo): Boolean = synchronized {
+    logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
+    if (!infos.contains(blockId)) {
+      infos(blockId) = newBlockInfo
+      newBlockInfo.writerTask = currentTaskAttemptId
+      writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
+      logTrace(s"Task $currentTaskAttemptId successfully locked new block $blockId")
+      true
+    } else {
+      logTrace(s"Task $currentTaskAttemptId did not create and lock block $blockId " +
+        s"because that block already exists")
+      false
+    }
+  }
+
+  /**
+   * Release all lock held by the given task, clearing that task's pin bookkeeping
+   * structures and updating the global pin counts. This method should be called at the
+   * end of a task (either by a task completion handler or in `TaskRunner.run()`).
+   *
+   * @return the ids of blocks whose pins were released
+   */
+  def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = {
+    val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]()
+
+    val readLocks = synchronized {
+      readLocksByTask.remove(taskAttemptId).get
+    }
+    val writeLocks = synchronized {
+      writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty)
+    }
+
+    for (blockId <- writeLocks) {
+      infos.get(blockId).foreach { info =>
+        assert(info.writerTask == taskAttemptId)
+        info.writerTask = BlockInfo.NO_WRITER
+      }
+      blocksWithReleasedLocks += blockId
+    }
+    readLocks.entrySet().iterator().asScala.foreach { entry =>
+      val blockId = entry.getElement
+      val lockCount = entry.getCount
+      blocksWithReleasedLocks += blockId
+      synchronized {
+        get(blockId).foreach { info =>
+          info.readerCount -= lockCount
+          assert(info.readerCount >= 0)
+        }
+      }
+    }
+
+    synchronized {
+      notifyAll()
+    }
+    blocksWithReleasedLocks
+  }
+
+  /**
+   * Returns the number of blocks tracked.
+   */
+  def size: Int = synchronized {
+    infos.size
+  }
+
+  /**
+   * Return the number of map entries in this pin counter's internal data structures.
+   * This is used in unit tests in order to detect memory leaks.
+   */
+  private[storage] def getNumberOfMapEntries: Long = synchronized {
+    size +
+      readLocksByTask.size +
+      readLocksByTask.map(_._2.size()).sum +
+      writeLocksByTask.size +
+      writeLocksByTask.map(_._2.size).sum
+  }
+
+  /**
+   * Returns an iterator over a snapshot of all blocks' metadata. Note that the individual entries
+   * in this iterator are mutable and thus may reflect blocks that are deleted while the iterator
+   * is being traversed.
+   */
+  def entries: Iterator[(BlockId, BlockInfo)] = synchronized {
+    infos.toArray.toIterator
+  }
+
+  /**
+   * Removes the given block and releases the write lock on it.
+   *
+   * This can only be called while holding a write lock on the given block.
+   */
+  def removeBlock(blockId: BlockId): Unit = synchronized {
+    logTrace(s"Task $currentTaskAttemptId trying to remove block $blockId")
+    infos.get(blockId) match {
+      case Some(blockInfo) =>
+        if (blockInfo.writerTask != currentTaskAttemptId) {
+          throw new IllegalStateException(
+            s"Task $currentTaskAttemptId called remove() on block $blockId without a write lock")
+        } else {
+          infos.remove(blockId)
+          blockInfo.readerCount = 0
+          blockInfo.writerTask = BlockInfo.NO_WRITER
+          blockInfo.removed = true
+        }
+      case None =>
+        throw new IllegalArgumentException(
+          s"Task $currentTaskAttemptId called remove() on non-existent block $blockId")
+    }
+    notifyAll()
+  }
+
+  /**
+   * Delete all state. Called during shutdown.
+   */
+  def clear(): Unit = synchronized {
+    infos.valuesIterator.foreach { blockInfo =>
+      blockInfo.readerCount = 0
+      blockInfo.writerTask = BlockInfo.NO_WRITER
+      blockInfo.removed = true
+    }
+    infos.clear()
+    readLocksByTask.clear()
+    writeLocksByTask.clear()
+    notifyAll()
+  }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 77fd03a6bcfc5a37abf533a9815d33500826c5f5..29124b368e405e06db97c71c536c86640a8668fb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -19,9 +19,7 @@ package org.apache.spark.storage
 
 import java.io._
 import java.nio.{ByteBuffer, MappedByteBuffer}
-import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration._
@@ -77,7 +75,7 @@ private[spark] class BlockManager(
 
   val diskBlockManager = new DiskBlockManager(this, conf)
 
-  private val blockInfo = new ConcurrentHashMap[BlockId, BlockInfo]
+  private[storage] val blockInfoManager = new BlockInfoManager
 
   private val futureExecutionContext = ExecutionContext.fromExecutorService(
     ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
@@ -223,8 +221,8 @@ private[spark] class BlockManager(
    * will be made then.
    */
   private def reportAllBlocks(): Unit = {
-    logInfo(s"Reporting ${blockInfo.size} blocks to the master.")
-    for ((blockId, info) <- blockInfo.asScala) {
+    logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.")
+    for ((blockId, info) <- blockInfoManager.entries) {
       val status = getCurrentBlockStatus(blockId, info)
       if (!tryToReportBlockStatus(blockId, info, status)) {
         logError(s"Failed to report $blockId to master; giving up.")
@@ -286,7 +284,7 @@ private[spark] class BlockManager(
         .asInstanceOf[Option[ByteBuffer]]
       if (blockBytesOpt.isDefined) {
         val buffer = blockBytesOpt.get
-        new NioManagedBuffer(buffer)
+        new BlockManagerManagedBuffer(this, blockId, buffer)
       } else {
         throw new BlockNotFoundException(blockId.toString)
       }
@@ -296,7 +294,7 @@ private[spark] class BlockManager(
   /**
    * Put the block locally, using the given storage level.
    */
-  override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Unit = {
+  override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean = {
     putBytes(blockId, data.nioByteBuffer(), level)
   }
 
@@ -305,7 +303,7 @@ private[spark] class BlockManager(
    * NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
    */
   def getStatus(blockId: BlockId): Option[BlockStatus] = {
-    blockInfo.asScala.get(blockId).map { info =>
+    blockInfoManager.get(blockId).map { info =>
       val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
       val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
       BlockStatus(info.level, memSize = memSize, diskSize = diskSize)
@@ -318,7 +316,12 @@ private[spark] class BlockManager(
    * may not know of).
    */
   def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = {
-    (blockInfo.asScala.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq
+    // The `toArray` is necessary here in order to force the list to be materialized so that we
+    // don't try to serialize a lazy iterator when responding to client requests.
+    (blockInfoManager.entries.map(_._1) ++ diskBlockManager.getAllBlocks())
+      .filter(filter)
+      .toArray
+      .toSeq
   }
 
   /**
@@ -425,26 +428,11 @@ private[spark] class BlockManager(
   }
 
   private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
-    val info = blockInfo.get(blockId)
-    if (info != null) {
-      info.synchronized {
-        // Double check to make sure the block is still there. There is a small chance that the
-        // block has been removed by removeBlock (which also synchronizes on the blockInfo object).
-        // Note that this only checks metadata tracking. If user intentionally deleted the block
-        // on disk or from off heap storage without using removeBlock, this conditional check will
-        // still pass but eventually we will get an exception because we can't find the block.
-        if (blockInfo.asScala.get(blockId).isEmpty) {
-          logWarning(s"Block $blockId had been removed")
-          return None
-        }
-
-        // If another thread is writing the block, wait for it to become ready.
-        if (!info.waitForReady()) {
-          // If we get here, the block write failed.
-          logWarning(s"Block $blockId was marked as failure.")
-          return None
-        }
-
+    blockInfoManager.lockForReading(blockId) match {
+      case None =>
+        logDebug(s"Block $blockId was not found")
+        None
+      case Some(info) =>
         val level = info.level
         logDebug(s"Level for block $blockId is $level")
 
@@ -452,7 +440,10 @@ private[spark] class BlockManager(
         if (level.useMemory) {
           logDebug(s"Getting block $blockId from memory")
           val result = if (asBlockResult) {
-            memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
+            memoryStore.getValues(blockId).map { iter =>
+              val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
+              new BlockResult(ci, DataReadMethod.Memory, info.size)
+            }
           } else {
             memoryStore.getBytes(blockId)
           }
@@ -470,6 +461,7 @@ private[spark] class BlockManager(
           val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
             case Some(b) => b
             case None =>
+              releaseLock(blockId)
               throw new BlockException(
                 blockId, s"Block $blockId not found on disk, though it should be")
           }
@@ -478,8 +470,9 @@ private[spark] class BlockManager(
           if (!level.useMemory) {
             // If the block shouldn't be stored in memory, we can just return it
             if (asBlockResult) {
-              return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
-                info.size))
+              val iter = CompletionIterator[Any, Iterator[Any]](
+                dataDeserialize(blockId, bytes), releaseLock(blockId))
+              return Some(new BlockResult(iter, DataReadMethod.Disk, info.size))
             } else {
               return Some(bytes)
             }
@@ -511,26 +504,34 @@ private[spark] class BlockManager(
                 // space to unroll the block. Either way, the put here should return an iterator.
                 putResult.data match {
                   case Left(it) =>
-                    return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
+                    val ci = CompletionIterator[Any, Iterator[Any]](it, releaseLock(blockId))
+                    return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
                   case _ =>
                     // This only happens if we dropped the values back to disk (which is never)
                     throw new SparkException("Memory store did not return an iterator!")
                 }
               } else {
-                return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
+                val ci = CompletionIterator[Any, Iterator[Any]](values, releaseLock(blockId))
+                return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
               }
             }
           }
+        } else {
+          // This branch represents a case where the BlockInfoManager contained an entry for
+          // the block but the block could not be found in any of the block stores. This case
+          // should never occur, but for completeness's sake we address it here.
+          logError(
+            s"Block $blockId is supposedly stored locally but was not found in any block store")
+          releaseLock(blockId)
+          None
         }
-      }
-    } else {
-      logDebug(s"Block $blockId not registered locally")
     }
-    None
   }
 
   /**
    * Get block from remote block managers.
+   *
+   * This does not acquire a lock on this block in this JVM.
    */
   def getRemote(blockId: BlockId): Option[BlockResult] = {
     logDebug(s"Getting remote block $blockId")
@@ -597,6 +598,10 @@ private[spark] class BlockManager(
 
   /**
    * Get a block from the block manager (either local or remote).
+   *
+   * This acquires a read lock on the block if the block was stored locally and does not acquire
+   * any locks if the block was fetched from a remote block manager. The read lock will
+   * automatically be freed once the result's `data` iterator is fully consumed.
    */
   def get(blockId: BlockId): Option[BlockResult] = {
     val local = getLocal(blockId)
@@ -612,6 +617,36 @@ private[spark] class BlockManager(
     None
   }
 
+  /**
+   * Downgrades an exclusive write lock to a shared read lock.
+   */
+  def downgradeLock(blockId: BlockId): Unit = {
+    blockInfoManager.downgradeLock(blockId)
+  }
+
+  /**
+   * Release a lock on the given block.
+   */
+  def releaseLock(blockId: BlockId): Unit = {
+    blockInfoManager.unlock(blockId)
+  }
+
+  /**
+   * Registers a task with the BlockManager in order to initialize per-task bookkeeping structures.
+   */
+  def registerTask(taskAttemptId: Long): Unit = {
+    blockInfoManager.registerTask(taskAttemptId)
+  }
+
+  /**
+   * Release all locks for the given task.
+   *
+   * @return the blocks whose locks were released.
+   */
+  def releaseAllLocksForTask(taskAttemptId: Long): Seq[BlockId] = {
+    blockInfoManager.releaseAllLocksForTask(taskAttemptId)
+  }
+
   /**
    * @return true if the block was stored or false if the block was already stored or an
    *         error occurred.
@@ -703,19 +738,12 @@ private[spark] class BlockManager(
      * to be dropped right after it got put into memory. Note, however, that other threads will
      * not be able to get() this block until we call markReady on its BlockInfo. */
     val putBlockInfo = {
-      val tinfo = new BlockInfo(level, tellMaster)
-      // Do atomically !
-      val oldBlockOpt = Option(blockInfo.putIfAbsent(blockId, tinfo))
-      if (oldBlockOpt.isDefined) {
-        if (oldBlockOpt.get.waitForReady()) {
-          logWarning(s"Block $blockId already exists on this machine; not re-adding it")
-          return false
-        }
-        // TODO: So the block info exists - but previous attempt to load it (?) failed.
-        // What do we do now ? Retry on it ?
-        oldBlockOpt.get
+      val newInfo = new BlockInfo(level, tellMaster)
+      if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
+        newInfo
       } else {
-        tinfo
+        logWarning(s"Block $blockId already exists on this machine; not re-adding it")
+        return false
       }
     }
 
@@ -750,7 +778,7 @@ private[spark] class BlockManager(
       case _ => null
     }
 
-    var marked = false
+    var blockWasSuccessfullyStored = false
 
     putBlockInfo.synchronized {
       logTrace("Put for block %s took %s to get into synchronized block"
@@ -792,11 +820,11 @@ private[spark] class BlockManager(
         }
 
         val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
-        if (putBlockStatus.storageLevel != StorageLevel.NONE) {
+        blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+        if (blockWasSuccessfullyStored) {
           // Now that the block is in either the memory, externalBlockStore, or disk store,
           // let other threads read it, and tell the master about it.
-          marked = true
-          putBlockInfo.markReady(size)
+          putBlockInfo.size = size
           if (tellMaster) {
             reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
           }
@@ -805,13 +833,10 @@ private[spark] class BlockManager(
           }
         }
       } finally {
-        // If we failed in putting the block to memory/disk, notify other possible readers
-        // that it has failed, and then remove it from the block info map.
-        if (!marked) {
-          // Note that the remove must happen before markFailure otherwise another thread
-          // could've inserted a new BlockInfo before we remove it.
-          blockInfo.remove(blockId)
-          putBlockInfo.markFailure()
+        if (blockWasSuccessfullyStored) {
+          blockInfoManager.downgradeLock(blockId)
+        } else {
+          blockInfoManager.removeBlock(blockId)
           logWarning(s"Putting block $blockId failed")
         }
       }
@@ -852,7 +877,7 @@ private[spark] class BlockManager(
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     }
 
-    marked
+    blockWasSuccessfullyStored
   }
 
   /**
@@ -989,77 +1014,63 @@ private[spark] class BlockManager(
    * store reaches its limit and needs to free up space.
    *
    * If `data` is not put on disk, it won't be created.
+   *
+   * The caller of this method must hold a write lock on the block before calling this method.
+   * This method does not release the write lock.
+   *
+   * @return the block's new effective StorageLevel.
    */
   def dropFromMemory(
       blockId: BlockId,
-      data: () => Either[Array[Any], ByteBuffer]): Unit = {
-
+      data: () => Either[Array[Any], ByteBuffer]): StorageLevel = {
     logInfo(s"Dropping block $blockId from memory")
-    val info = blockInfo.get(blockId)
-
-    // If the block has not already been dropped
-    if (info != null) {
-      info.synchronized {
-        // required ? As of now, this will be invoked only for blocks which are ready
-        // But in case this changes in future, adding for consistency sake.
-        if (!info.waitForReady()) {
-          // If we get here, the block write failed.
-          logWarning(s"Block $blockId was marked as failure. Nothing to drop")
-          return
-        } else if (blockInfo.asScala.get(blockId).isEmpty) {
-          logWarning(s"Block $blockId was already dropped.")
-          return
-        }
-        var blockIsUpdated = false
-        val level = info.level
-
-        // Drop to disk, if storage level requires
-        if (level.useDisk && !diskStore.contains(blockId)) {
-          logInfo(s"Writing block $blockId to disk")
-          data() match {
-            case Left(elements) =>
-              diskStore.putArray(blockId, elements, level, returnValues = false)
-            case Right(bytes) =>
-              diskStore.putBytes(blockId, bytes, level)
-          }
-          blockIsUpdated = true
-        }
+    val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
+    var blockIsUpdated = false
+    val level = info.level
+
+    // Drop to disk, if storage level requires
+    if (level.useDisk && !diskStore.contains(blockId)) {
+      logInfo(s"Writing block $blockId to disk")
+      data() match {
+        case Left(elements) =>
+          diskStore.putArray(blockId, elements, level, returnValues = false)
+        case Right(bytes) =>
+          diskStore.putBytes(blockId, bytes, level)
+      }
+      blockIsUpdated = true
+    }
 
-        // Actually drop from memory store
-        val droppedMemorySize =
-          if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
-        val blockIsRemoved = memoryStore.remove(blockId)
-        if (blockIsRemoved) {
-          blockIsUpdated = true
-        } else {
-          logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
-        }
+    // Actually drop from memory store
+    val droppedMemorySize =
+      if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
+    val blockIsRemoved = memoryStore.remove(blockId)
+    if (blockIsRemoved) {
+      blockIsUpdated = true
+    } else {
+      logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
+    }
 
-        val status = getCurrentBlockStatus(blockId, info)
-        if (info.tellMaster) {
-          reportBlockStatus(blockId, info, status, droppedMemorySize)
-        }
-        if (!level.useDisk) {
-          // The block is completely gone from this node; forget it so we can put() it again later.
-          blockInfo.remove(blockId)
-        }
-        if (blockIsUpdated) {
-          Option(TaskContext.get()).foreach { c =>
-            c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, status)))
-          }
-        }
+    val status = getCurrentBlockStatus(blockId, info)
+    if (info.tellMaster) {
+      reportBlockStatus(blockId, info, status, droppedMemorySize)
+    }
+    if (blockIsUpdated) {
+      Option(TaskContext.get()).foreach { c =>
+        c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, status)))
       }
     }
+    status.storageLevel
   }
 
   /**
    * Remove all blocks belonging to the given RDD.
+   *
    * @return The number of blocks removed.
    */
   def removeRdd(rddId: Int): Int = {
     // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
     logInfo(s"Removing RDD $rddId")
-    val blocksToRemove = blockInfo.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
+    val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId)
     blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
     blocksToRemove.size
   }
@@ -1069,7 +1080,7 @@ private[spark] class BlockManager(
    */
   def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
     logDebug(s"Removing broadcast $broadcastId")
-    val blocksToRemove = blockInfo.asScala.keys.collect {
+    val blocksToRemove = blockInfoManager.entries.map(_._1).collect {
       case bid @ BroadcastBlockId(`broadcastId`, _) => bid
     }
     blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
@@ -1081,9 +1092,11 @@ private[spark] class BlockManager(
    */
   def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
     logDebug(s"Removing block $blockId")
-    val info = blockInfo.get(blockId)
-    if (info != null) {
-      info.synchronized {
+    blockInfoManager.lockForWriting(blockId) match {
+      case None =>
+        // The block has already been removed; do nothing.
+        logWarning(s"Asked to remove block $blockId, which does not exist")
+      case Some(info) =>
         // Removals are idempotent in disk store and memory store. At worst, we get a warning.
         val removedFromMemory = memoryStore.remove(blockId)
         val removedFromDisk = diskStore.remove(blockId)
@@ -1091,15 +1104,11 @@ private[spark] class BlockManager(
           logWarning(s"Block $blockId could not be removed as it was not found in either " +
             "the disk, memory, or external block store")
         }
-        blockInfo.remove(blockId)
+        blockInfoManager.removeBlock(blockId)
         if (tellMaster && info.tellMaster) {
           val status = getCurrentBlockStatus(blockId, info)
           reportBlockStatus(blockId, info, status)
         }
-      }
-    } else {
-      // The block has already been removed; do nothing.
-      logWarning(s"Asked to remove block $blockId, which does not exist")
     }
   }
 
@@ -1174,7 +1183,7 @@ private[spark] class BlockManager(
     }
     diskBlockManager.stop()
     rpcEnv.stop(slaveEndpoint)
-    blockInfo.clear()
+    blockInfoManager.clear()
     memoryStore.clear()
     diskStore.clear()
     futureExecutionContext.shutdownNow()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
new file mode 100644
index 0000000000000000000000000000000000000000..5886b9c00b557cf97136dfcf23904b7da9d12fcf
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+
+/**
+ * This [[ManagedBuffer]] wraps a [[ByteBuffer]] which was retrieved from the [[BlockManager]]
+ * so that the corresponding block's read lock can be released once this buffer's references
+ * are released.
+ *
+ * This is effectively a wrapper / bridge to connect the BlockManager's notion of read locks
+ * to the network layer's notion of retain / release counts.
+ */
+private[storage] class BlockManagerManagedBuffer(
+    blockManager: BlockManager,
+    blockId: BlockId,
+    buf: ByteBuffer) extends NioManagedBuffer(buf) {
+
+  override def retain(): ManagedBuffer = {
+    super.retain()
+    val locked = blockManager.blockInfoManager.lockForReading(blockId, blocking = false)
+    assert(locked.isDefined)
+    this
+  }
+
+  override def release(): ManagedBuffer = {
+    blockManager.releaseLock(blockId)
+    super.release()
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index 69985c9759e2d09844dfe61d533adc365468e01e..6f6a6773ba4fdd3adf7e96fb5e986dec645333cc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -60,8 +60,10 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
 
   /**
    * Remove a block, if it exists.
+   *
    * @param blockId the block to remove.
    * @return True if the block was found and removed, False otherwise.
+   * @throws IllegalStateException if the block is pinned by a task.
    */
   def remove(blockId: BlockId): Boolean
 
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 024b660ce6a7b0acafe17417cd942b7aeb645688..2f16c8f3d8badf40cc3684f3952553b6c7c0efa1 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -95,7 +95,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       val values = blockManager.dataDeserialize(blockId, bytes)
       putIterator(blockId, values, level, returnValues = true)
     } else {
-      tryToPut(blockId, bytes, bytes.limit, deserialized = false)
+      tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
       PutResult(bytes.limit(), Right(bytes.duplicate()))
     }
   }
@@ -127,11 +127,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       returnValues: Boolean): PutResult = {
     if (level.deserialized) {
       val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
-      tryToPut(blockId, values, sizeEstimate, deserialized = true)
+      tryToPut(blockId, () => values, sizeEstimate, deserialized = true)
       PutResult(sizeEstimate, Left(values.iterator))
     } else {
       val bytes = blockManager.dataSerialize(blockId, values.iterator)
-      tryToPut(blockId, bytes, bytes.limit, deserialized = false)
+      tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
       PutResult(bytes.limit(), Right(bytes.duplicate()))
     }
   }
@@ -208,7 +208,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
   }
 
   override def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
-    val entry = entries.synchronized { entries.remove(blockId) }
+    val entry = entries.synchronized {
+      entries.remove(blockId)
+    }
     if (entry != null) {
       memoryManager.releaseStorageMemory(entry.size)
       logDebug(s"Block $blockId of size ${entry.size} dropped " +
@@ -327,14 +329,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
     blockId.asRDDId.map(_.rddId)
   }
 
-  private def tryToPut(
-      blockId: BlockId,
-      value: Any,
-      size: Long,
-      deserialized: Boolean): Boolean = {
-    tryToPut(blockId, () => value, size, deserialized)
-  }
-
   /**
    * Try to put in a set of values, if we can free up enough space. The value should either be
    * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
@@ -410,6 +404,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       var freedMemory = 0L
       val rddToAdd = blockId.flatMap(getRddId)
       val selectedBlocks = new ArrayBuffer[BlockId]
+      def blockIsEvictable(blockId: BlockId): Boolean = {
+        rddToAdd.isEmpty || rddToAdd != getRddId(blockId)
+      }
       // This is synchronized to ensure that the set of entries is not changed
       // (because of getValue or getBytes) while traversing the iterator, as that
       // can lead to exceptions.
@@ -418,9 +415,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
         while (freedMemory < space && iterator.hasNext) {
           val pair = iterator.next()
           val blockId = pair.getKey
-          if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
-            selectedBlocks += blockId
-            freedMemory += pair.getValue.size
+          if (blockIsEvictable(blockId)) {
+            // We don't want to evict blocks which are currently being read, so we need to obtain
+            // an exclusive write lock on blocks which are candidates for eviction. We perform a
+            // non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
+            if (blockManager.blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
+              selectedBlocks += blockId
+              freedMemory += pair.getValue.size
+            }
           }
         }
       }
@@ -438,7 +440,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
             } else {
               Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
             }
-            blockManager.dropFromMemory(blockId, () => data)
+            val newEffectiveStorageLevel = blockManager.dropFromMemory(blockId, () => data)
+            if (newEffectiveStorageLevel.isValid) {
+              // The block is still present in at least one store, so release the lock
+              // but don't delete the block info
+              blockManager.releaseLock(blockId)
+            } else {
+              // The block isn't present in any store, so delete the block info so that the
+              // block can be stored again
+              blockManager.blockInfoManager.removeBlock(blockId)
+            }
           }
         }
         freedMemory
@@ -447,6 +458,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
           logInfo(s"Will not store $id as it would require dropping another block " +
             "from the same RDD")
         }
+        selectedBlocks.foreach { id =>
+          blockManager.releaseLock(id)
+        }
         0L
       }
     }
@@ -463,6 +477,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
 
   /**
    * Reserve memory for unrolling the given block for this task.
+   *
    * @return whether the request is granted.
    */
   def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = {
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 48a0282b30cf09f7c69055a83ed4d3a57bb613dc..ffc02bcb011f3ad69106a40c7c5e4230c5613cdd 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -87,6 +87,7 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before
     val context = TaskContext.empty()
     try {
       TaskContext.setTaskContext(context)
+      sc.env.blockManager.registerTask(0)
       cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
       assert(context.taskMetrics.updatedBlockStatuses.size === 2)
     } finally {
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 7b0238091730d7983aed46a415996985704ff649..d1e806b2eb80a1cf515f8df1c5c4becbe550c2ec 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -485,7 +485,8 @@ class CleanerTester(
   def assertCleanup()(implicit waitTimeout: PatienceConfiguration.Timeout) {
     try {
       eventually(waitTimeout, interval(100 millis)) {
-        assert(isAllCleanedUp)
+        assert(isAllCleanedUp,
+          "The following resources were not cleaned up:\n" + uncleanedResourcesToString)
       }
       postCleanupValidate()
     } finally {
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
index a0483f648388955573716e48770de9e71fcf4a21..c1484b0afa85fe513ad5a282111127d9418381c2 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark._
 import org.apache.spark.serializer.KryoDistributedTest._
 import org.apache.spark.util.Utils
 
-class KryoSerializerDistributedSuite extends SparkFunSuite {
+class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContext {
 
   test("kryo objects are serialised consistently in different processes") {
     val conf = new SparkConf(false)
@@ -34,7 +34,7 @@ class KryoSerializerDistributedSuite extends SparkFunSuite {
     val jar = TestUtils.createJarWithClasses(List(AppJarRegistrator.customClassName))
     conf.setJars(List(jar.getPath))
 
-    val sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
+    sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     val original = Thread.currentThread.getContextClassLoader
     val loader = new java.net.URLClassLoader(Array(jar), Utils.getContextOrSparkClassLoader)
     SparkEnv.get.serializer.setDefaultClassLoader(loader)
@@ -47,8 +47,6 @@ class KryoSerializerDistributedSuite extends SparkFunSuite {
 
     // Join the two RDDs, and force evaluation
     assert(shuffledRDD.join(cachedRDD).collect().size == 1)
-
-    LocalSparkContext.stop(sc)
   }
 }
 
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..662b18f667b0d695432cd042c1b27eff0ff9e045
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.language.implicitConversions
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.{SparkException, SparkFunSuite, TaskContext, TaskContextImpl}
+
+
+class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+  private implicit val ec = ExecutionContext.global
+  private var blockInfoManager: BlockInfoManager = _
+
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    blockInfoManager = new BlockInfoManager()
+    for (t <- 0 to 4) {
+      blockInfoManager.registerTask(t)
+    }
+  }
+
+  override protected def afterEach(): Unit = {
+    try {
+      blockInfoManager = null
+    } finally {
+      super.afterEach()
+    }
+  }
+
+  private implicit def stringToBlockId(str: String): BlockId = {
+    TestBlockId(str)
+  }
+
+  private def newBlockInfo(): BlockInfo = {
+    new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false)
+  }
+
+  private def withTaskId[T](taskAttemptId: Long)(block: => T): T = {
+    try {
+      TaskContext.setTaskContext(new TaskContextImpl(0, 0, taskAttemptId, 0, null, null))
+      block
+    } finally {
+      TaskContext.unset()
+    }
+  }
+
+  test("initial memory usage") {
+    assert(blockInfoManager.size === 0)
+  }
+
+  test("get non-existent block") {
+    assert(blockInfoManager.get("non-existent-block").isEmpty)
+    assert(blockInfoManager.lockForReading("non-existent-block").isEmpty)
+    assert(blockInfoManager.lockForWriting("non-existent-block").isEmpty)
+  }
+
+  test("basic lockNewBlockForWriting") {
+    val initialNumMapEntries = blockInfoManager.getNumberOfMapEntries
+    val blockInfo = newBlockInfo()
+    withTaskId(1) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", blockInfo))
+      assert(blockInfoManager.get("block").get eq blockInfo)
+      assert(!blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      assert(blockInfoManager.get("block").get eq blockInfo)
+      assert(blockInfo.readerCount === 0)
+      assert(blockInfo.writerTask === 1)
+      blockInfoManager.unlock("block")
+      assert(blockInfo.readerCount === 0)
+      assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
+    }
+    assert(blockInfoManager.size === 1)
+    assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries + 1)
+  }
+
+  test("read locks are reentrant") {
+    withTaskId(1) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.unlock("block")
+      assert(blockInfoManager.lockForReading("block").isDefined)
+      assert(blockInfoManager.lockForReading("block").isDefined)
+      assert(blockInfoManager.get("block").get.readerCount === 2)
+      assert(blockInfoManager.get("block").get.writerTask === BlockInfo.NO_WRITER)
+      blockInfoManager.unlock("block")
+      assert(blockInfoManager.get("block").get.readerCount === 1)
+      blockInfoManager.unlock("block")
+      assert(blockInfoManager.get("block").get.readerCount === 0)
+    }
+  }
+
+  test("multiple tasks can hold read locks") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.unlock("block")
+    }
+    withTaskId(1) { assert(blockInfoManager.lockForReading("block").isDefined) }
+    withTaskId(2) { assert(blockInfoManager.lockForReading("block").isDefined) }
+    withTaskId(3) { assert(blockInfoManager.lockForReading("block").isDefined) }
+    withTaskId(4) { assert(blockInfoManager.lockForReading("block").isDefined) }
+    assert(blockInfoManager.get("block").get.readerCount === 4)
+  }
+
+  test("single task can hold write lock") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.unlock("block")
+    }
+    withTaskId(1) {
+      assert(blockInfoManager.lockForWriting("block").isDefined)
+      assert(blockInfoManager.get("block").get.writerTask === 1)
+    }
+    withTaskId(2) {
+      assert(blockInfoManager.lockForWriting("block", blocking = false).isEmpty)
+      assert(blockInfoManager.get("block").get.writerTask === 1)
+    }
+  }
+
+  test("cannot call lockForWriting while already holding a write lock") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.unlock("block")
+    }
+    withTaskId(1) {
+      assert(blockInfoManager.lockForWriting("block").isDefined)
+      intercept[IllegalStateException] {
+        blockInfoManager.lockForWriting("block")
+      }
+      blockInfoManager.assertBlockIsLockedForWriting("block")
+    }
+  }
+
+  test("assertBlockIsLockedForWriting throws exception if block is not locked") {
+    intercept[SparkException] {
+      blockInfoManager.assertBlockIsLockedForWriting("block")
+    }
+    withTaskId(BlockInfo.NON_TASK_WRITER) {
+      intercept[SparkException] {
+        blockInfoManager.assertBlockIsLockedForWriting("block")
+      }
+    }
+  }
+
+  test("downgrade lock") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.downgradeLock("block")
+    }
+    withTaskId(1) {
+      assert(blockInfoManager.lockForReading("block").isDefined)
+    }
+    assert(blockInfoManager.get("block").get.readerCount === 2)
+    assert(blockInfoManager.get("block").get.writerTask === BlockInfo.NO_WRITER)
+  }
+
+  test("write lock will block readers") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+    }
+    val get1Future = Future {
+      withTaskId(1) {
+        blockInfoManager.lockForReading("block")
+      }
+    }
+    val get2Future = Future {
+      withTaskId(2) {
+        blockInfoManager.lockForReading("block")
+      }
+    }
+    Thread.sleep(300)  // Hack to try to ensure that both future tasks are waiting
+    withTaskId(0) {
+      blockInfoManager.unlock("block")
+    }
+    assert(Await.result(get1Future, 1.seconds).isDefined)
+    assert(Await.result(get2Future, 1.seconds).isDefined)
+    assert(blockInfoManager.get("block").get.readerCount === 2)
+  }
+
+  test("read locks will block writer") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.unlock("block")
+      blockInfoManager.lockForReading("block")
+    }
+    val write1Future = Future {
+      withTaskId(1) {
+        blockInfoManager.lockForWriting("block")
+      }
+    }
+    val write2Future = Future {
+      withTaskId(2) {
+        blockInfoManager.lockForWriting("block")
+      }
+    }
+    Thread.sleep(300)  // Hack to try to ensure that both future tasks are waiting
+    withTaskId(0) {
+      blockInfoManager.unlock("block")
+    }
+    assert(
+      Await.result(Future.firstCompletedOf(Seq(write1Future, write2Future)), 1.seconds).isDefined)
+    val firstWriteWinner = if (write1Future.isCompleted) 1 else 2
+    withTaskId(firstWriteWinner) {
+      blockInfoManager.unlock("block")
+    }
+    assert(Await.result(write1Future, 1.seconds).isDefined)
+    assert(Await.result(write2Future, 1.seconds).isDefined)
+  }
+
+  test("removing a non-existent block throws IllegalArgumentException") {
+    withTaskId(0) {
+      intercept[IllegalArgumentException] {
+        blockInfoManager.removeBlock("non-existent-block")
+      }
+    }
+  }
+
+  test("removing a block without holding any locks throws IllegalStateException") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.unlock("block")
+      intercept[IllegalStateException] {
+        blockInfoManager.removeBlock("block")
+      }
+    }
+  }
+
+  test("removing a block while holding only a read lock throws IllegalStateException") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+      blockInfoManager.unlock("block")
+      assert(blockInfoManager.lockForReading("block").isDefined)
+      intercept[IllegalStateException] {
+        blockInfoManager.removeBlock("block")
+      }
+    }
+  }
+
+  test("removing a block causes blocked callers to receive None") {
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+    }
+    val getFuture = Future {
+      withTaskId(1) {
+        blockInfoManager.lockForReading("block")
+      }
+    }
+    val writeFuture = Future {
+      withTaskId(2) {
+        blockInfoManager.lockForWriting("block")
+      }
+    }
+    Thread.sleep(300)  // Hack to try to ensure that both future tasks are waiting
+    withTaskId(0) {
+      blockInfoManager.removeBlock("block")
+    }
+    assert(Await.result(getFuture, 1.seconds).isEmpty)
+    assert(Await.result(writeFuture, 1.seconds).isEmpty)
+  }
+
+  test("releaseAllLocksForTask releases write locks") {
+    val initialNumMapEntries = blockInfoManager.getNumberOfMapEntries
+    withTaskId(0) {
+      assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+    }
+    assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries + 3)
+    blockInfoManager.releaseAllLocksForTask(0)
+    assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries)
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 3fd6fb456046577dc902e6b1abcb270b8155c44d..a94d8b424d95639c065ecc9515d4638bfbe91a63 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -190,6 +190,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
 
     def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = {
       stores.head.putSingle(blockId, new Array[Byte](blockSize), level)
+      stores.head.releaseLock(blockId)
       val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet
       stores.foreach { _.removeBlock(blockId) }
       master.removeBlock(blockId)
@@ -251,6 +252,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
     // Insert a block with 2x replication and return the number of copies of the block
     def replicateAndGetNumCopies(blockId: String): Int = {
       store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2)
+      store.releaseLock(blockId)
       val numLocations = master.getLocations(blockId).size
       allStores.foreach { _.removeBlock(blockId) }
       numLocations
@@ -288,6 +290,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
     def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = {
       val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
       initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+      initialStores.head.releaseLock(blockId)
       val numLocations = master.getLocations(blockId).size
       allStores.foreach { _.removeBlock(blockId) }
       numLocations
@@ -355,6 +358,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
       val blockId = new TestBlockId(
         "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
       stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+      stores(0).releaseLock(blockId)
 
       // Assert that master know two locations for the block
       val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
@@ -367,6 +371,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
       }.foreach { testStore =>
         val testStoreName = testStore.blockManagerId.executorId
         assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName")
+        testStore.releaseLock(blockId)
         assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
           s"master does not have status for ${blockId.name} in $testStoreName")
 
@@ -392,6 +397,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
           (1 to 10).foreach {
             i =>
               testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER)
+              testStore.releaseLock(s"dummy-block-$i")
           }
           (1 to 10).foreach {
             i => testStore.removeBlock(s"dummy-block-$i")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index e1b2c9633edca958a1a4f98a0293f64c0dd7cb83..e4ab9ee0ebb38b41740d98edf31697e50603e8b6 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -45,6 +45,8 @@ import org.apache.spark.util._
 class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
   with PrivateMethodTester with ResetSystemProperties {
 
+  import BlockManagerSuite._
+
   var conf: SparkConf = null
   var store: BlockManager = null
   var store2: BlockManager = null
@@ -66,6 +68,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       maxMem: Long,
       name: String = SparkContext.DRIVER_IDENTIFIER,
       master: BlockManagerMaster = this.master): BlockManager = {
+    val serializer = new KryoSerializer(conf)
     val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
     val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
     val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf,
@@ -169,14 +172,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
 
     // Putting a1, a2  and a3 in memory and telling master only about a1 and a2
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
-    store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // Checking whether blocks are in memory
-    assert(store.getSingle("a1").isDefined, "a1 was not in store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3").isDefined, "a3 was not in store")
+    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
+    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
 
     // Checking whether master knows about the blocks or not
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
@@ -184,10 +187,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(master.getLocations("a3").size === 0, "master was told about a3")
 
     // Drop a1 and a2 from memory; this should be reported back to the master
-    store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer])
-    store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer])
-    assert(store.getSingle("a1") === None, "a1 not removed from store")
-    assert(store.getSingle("a2") === None, "a2 not removed from store")
+    store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer])
+    store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer])
+    assert(store.getSingleAndReleaseLock("a1") === None, "a1 not removed from store")
+    assert(store.getSingleAndReleaseLock("a2") === None, "a2 not removed from store")
     assert(master.getLocations("a1").size === 0, "master did not remove a1")
     assert(master.getLocations("a2").size === 0, "master did not remove a2")
   }
@@ -202,7 +205,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_2)
     store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
     assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
     assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
@@ -215,17 +218,17 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
 
     // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
-    store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
-    store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putSingleAndReleaseLock("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // Checking whether blocks are in memory and memory size
     val memStatus = master.getMemoryStatus.head._2
     assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should equal 20000")
     assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " should <= 12000")
-    assert(store.getSingle("a1-to-remove").isDefined, "a1 was not in store")
-    assert(store.getSingle("a2-to-remove").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3-to-remove").isDefined, "a3 was not in store")
+    assert(store.getSingleAndReleaseLock("a1-to-remove").isDefined, "a1 was not in store")
+    assert(store.getSingleAndReleaseLock("a2-to-remove").isDefined, "a2 was not in store")
+    assert(store.getSingleAndReleaseLock("a3-to-remove").isDefined, "a3 was not in store")
 
     // Checking whether master knows about the blocks or not
     assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1")
@@ -238,15 +241,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     master.removeBlock("a3-to-remove")
 
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle("a1-to-remove") should be (None)
+      assert(!store.hasLocalBlock("a1-to-remove"))
       master.getLocations("a1-to-remove") should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle("a2-to-remove") should be (None)
+      assert(!store.hasLocalBlock("a2-to-remove"))
       master.getLocations("a2-to-remove") should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle("a3-to-remove") should not be (None)
+      assert(store.hasLocalBlock("a3-to-remove"))
       master.getLocations("a3-to-remove") should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
@@ -262,30 +265,30 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
     // Putting a1, a2 and a3 in memory.
-    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
-    store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
     master.removeRdd(0, blocking = false)
 
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle(rdd(0, 0)) should be (None)
+      store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
       master.getLocations(rdd(0, 0)) should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle(rdd(0, 1)) should be (None)
+      store.getSingleAndReleaseLock(rdd(0, 1)) should be (None)
       master.getLocations(rdd(0, 1)) should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle("nonrddblock") should not be (None)
+      store.getSingleAndReleaseLock("nonrddblock") should not be (None)
       master.getLocations("nonrddblock") should have size (1)
     }
 
-    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
     master.removeRdd(0, blocking = true)
-    store.getSingle(rdd(0, 0)) should be (None)
+    store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
     master.getLocations(rdd(0, 0)) should have size 0
-    store.getSingle(rdd(0, 1)) should be (None)
+    store.getSingleAndReleaseLock(rdd(0, 1)) should be (None)
     master.getLocations(rdd(0, 1)) should have size 0
   }
 
@@ -305,54 +308,54 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // insert broadcast blocks in both the stores
     Seq(driverStore, executorStore).foreach { case s =>
-      s.putSingle(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
-      s.putSingle(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
-      s.putSingle(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
-      s.putSingle(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
+      s.putSingleAndReleaseLock(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
+      s.putSingleAndReleaseLock(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
+      s.putSingleAndReleaseLock(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
+      s.putSingleAndReleaseLock(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
     }
 
     // verify whether the blocks exist in both the stores
     Seq(driverStore, executorStore).foreach { case s =>
-      s.getLocal(broadcast0BlockId) should not be (None)
-      s.getLocal(broadcast1BlockId) should not be (None)
-      s.getLocal(broadcast2BlockId) should not be (None)
-      s.getLocal(broadcast2BlockId2) should not be (None)
+      assert(s.hasLocalBlock(broadcast0BlockId))
+      assert(s.hasLocalBlock(broadcast1BlockId))
+      assert(s.hasLocalBlock(broadcast2BlockId))
+      assert(s.hasLocalBlock(broadcast2BlockId2))
     }
 
     // remove broadcast 0 block only from executors
     master.removeBroadcast(0, removeFromMaster = false, blocking = true)
 
     // only broadcast 0 block should be removed from the executor store
-    executorStore.getLocal(broadcast0BlockId) should be (None)
-    executorStore.getLocal(broadcast1BlockId) should not be (None)
-    executorStore.getLocal(broadcast2BlockId) should not be (None)
+    assert(!executorStore.hasLocalBlock(broadcast0BlockId))
+    assert(executorStore.hasLocalBlock(broadcast1BlockId))
+    assert(executorStore.hasLocalBlock(broadcast2BlockId))
 
     // nothing should be removed from the driver store
-    driverStore.getLocal(broadcast0BlockId) should not be (None)
-    driverStore.getLocal(broadcast1BlockId) should not be (None)
-    driverStore.getLocal(broadcast2BlockId) should not be (None)
+    assert(driverStore.hasLocalBlock(broadcast0BlockId))
+    assert(driverStore.hasLocalBlock(broadcast1BlockId))
+    assert(driverStore.hasLocalBlock(broadcast2BlockId))
 
     // remove broadcast 0 block from the driver as well
     master.removeBroadcast(0, removeFromMaster = true, blocking = true)
-    driverStore.getLocal(broadcast0BlockId) should be (None)
-    driverStore.getLocal(broadcast1BlockId) should not be (None)
+    assert(!driverStore.hasLocalBlock(broadcast0BlockId))
+    assert(driverStore.hasLocalBlock(broadcast1BlockId))
 
     // remove broadcast 1 block from both the stores asynchronously
     // and verify all broadcast 1 blocks have been removed
     master.removeBroadcast(1, removeFromMaster = true, blocking = false)
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      driverStore.getLocal(broadcast1BlockId) should be (None)
-      executorStore.getLocal(broadcast1BlockId) should be (None)
+      assert(!driverStore.hasLocalBlock(broadcast1BlockId))
+      assert(!executorStore.hasLocalBlock(broadcast1BlockId))
     }
 
     // remove broadcast 2 from both the stores asynchronously
     // and verify all broadcast 2 blocks have been removed
     master.removeBroadcast(2, removeFromMaster = true, blocking = false)
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      driverStore.getLocal(broadcast2BlockId) should be (None)
-      driverStore.getLocal(broadcast2BlockId2) should be (None)
-      executorStore.getLocal(broadcast2BlockId) should be (None)
-      executorStore.getLocal(broadcast2BlockId2) should be (None)
+      assert(!driverStore.hasLocalBlock(broadcast2BlockId))
+      assert(!driverStore.hasLocalBlock(broadcast2BlockId2))
+      assert(!executorStore.hasLocalBlock(broadcast2BlockId))
+      assert(!executorStore.hasLocalBlock(broadcast2BlockId2))
     }
     executorStore.stop()
     driverStore.stop()
@@ -363,9 +366,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
 
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
 
-    assert(store.getSingle("a1").isDefined, "a1 was not in store")
+    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
 
     master.removeExecutor(store.blockManagerId.executorId)
@@ -381,13 +384,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
 
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
 
     master.removeExecutor(store.blockManagerId.executorId)
     assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
 
-    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
     store.waitForAsyncReregister()
 
     assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
@@ -404,12 +407,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       master.removeExecutor(store.blockManagerId.executorId)
       val t1 = new Thread {
         override def run() {
-          store.putIterator("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+          store.putIteratorAndReleaseLock(
+            "a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
         }
       }
       val t2 = new Thread {
         override def run() {
-          store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+          store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
         }
       }
       val t3 = new Thread {
@@ -425,8 +429,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       t2.join()
       t3.join()
 
-      store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer])
-      store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer])
+      store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer])
+      store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer])
       store.waitForAsyncReregister()
     }
   }
@@ -437,9 +441,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500))
     val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
     val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
-    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIterator("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIterator("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val list1Get = store.get("list1")
     assert(list1Get.isDefined, "list1 expected to be in store")
     assert(list1Get.get.data.size === 2)
@@ -479,8 +486,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store2 = makeBlockManager(8000, "executor2")
     store3 = makeBlockManager(8000, "executor3")
     val list1 = List(new Array[Byte](4000))
-    store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store2.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store3.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
     store2.stop()
     store2 = null
@@ -506,18 +515,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingle("a1", a1, storageLevel)
-    store.putSingle("a2", a2, storageLevel)
-    store.putSingle("a3", a3, storageLevel)
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3").isDefined, "a3 was not in store")
-    assert(store.getSingle("a1") === None, "a1 was in store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
+    store.putSingleAndReleaseLock("a1", a1, storageLevel)
+    store.putSingleAndReleaseLock("a2", a2, storageLevel)
+    store.putSingleAndReleaseLock("a3", a3, storageLevel)
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
+    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
+    assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
     // At this point a2 was gotten last, so LRU will getSingle rid of a3
-    store.putSingle("a1", a1, storageLevel)
-    assert(store.getSingle("a1").isDefined, "a1 was not in store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3") === None, "a3 was in store")
+    store.putSingleAndReleaseLock("a1", a1, storageLevel)
+    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
+    assert(store.getSingleAndReleaseLock("a3") === None, "a3 was in store")
   }
 
   test("in-memory LRU for partitions of same RDD") {
@@ -525,34 +534,34 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
     // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
     // from the same RDD
-    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
-    assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
-    assert(store.getSingle(rdd(0, 1)).isDefined, "rdd_0_1 was not in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 1)).isDefined, "rdd_0_1 was not in store")
     // Check that rdd_0_3 doesn't replace them even after further accesses
-    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
-    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
-    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
   }
 
   test("in-memory LRU for partitions of multiple RDDs") {
     store = makeBlockManager(12000)
-    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // At this point rdd_1_1 should've replaced rdd_0_1
     assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
     assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
     assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store")
     // Do a get() on rdd_0_2 so that it is the most recently used item
-    assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
+    assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
     // Put in more partitions from RDD 0; they should replace rdd_1_1
-    store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
     // when we try to add rdd_0_4.
     assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
@@ -567,28 +576,28 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
-    store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
-    store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
-    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
-    assert(store.getSingle("a2").isDefined, "a2 was in store")
-    assert(store.getSingle("a3").isDefined, "a3 was in store")
-    assert(store.getSingle("a1").isDefined, "a1 was in store")
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
+    store.putSingleAndReleaseLock("a2", a2, StorageLevel.DISK_ONLY)
+    store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was in store")
+    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was in store")
+    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was in store")
   }
 
   test("disk and memory storage") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingle)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingleAndReleaseLock)
   }
 
   test("disk and memory storage with getLocalBytes") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytes)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytesAndReleaseLock)
   }
 
   test("disk and memory storage with serialization") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingle)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingleAndReleaseLock)
   }
 
   test("disk and memory storage with serialization and getLocalBytes") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytes)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytesAndReleaseLock)
   }
 
   def testDiskAndMemoryStorage(
@@ -598,9 +607,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
-    store.putSingle("a1", a1, storageLevel)
-    store.putSingle("a2", a2, storageLevel)
-    store.putSingle("a3", a3, storageLevel)
+    store.putSingleAndReleaseLock("a1", a1, storageLevel)
+    store.putSingleAndReleaseLock("a2", a2, storageLevel)
+    store.putSingleAndReleaseLock("a3", a3, storageLevel)
     assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
     assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
     assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
@@ -615,19 +624,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val a3 = new Array[Byte](4000)
     val a4 = new Array[Byte](4000)
     // First store a1 and a2, both in memory, and a3, on disk only
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
-    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
-    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
+    store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
     // At this point LRU should not kick in because a3 is only on disk
-    assert(store.getSingle("a1").isDefined, "a1 was not in store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3").isDefined, "a3 was not in store")
+    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
+    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
     // Now let's add in a4, which uses both disk and memory; a1 should drop out
-    store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
-    assert(store.getSingle("a1") == None, "a1 was in store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
-    assert(store.getSingle("a3").isDefined, "a3 was not in store")
-    assert(store.getSingle("a4").isDefined, "a4 was not in store")
+    store.putSingleAndReleaseLock("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
+    assert(store.getSingleAndReleaseLock("a1") == None, "a1 was in store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
+    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
+    assert(store.getSingleAndReleaseLock("a4").isDefined, "a4 was not in store")
   }
 
   test("in-memory LRU with streams") {
@@ -635,23 +644,27 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
-    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIterator("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    store.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
-    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
     assert(store.get("list3").get.data.size === 2)
-    assert(store.get("list1") === None, "list1 was in store")
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
     // At this point list2 was gotten last, so LRU will getSingle rid of list3
-    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    assert(store.get("list1").isDefined, "list1 was not in store")
+    store.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
     assert(store.get("list1").get.data.size === 2)
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
-    assert(store.get("list3") === None, "list1 was in store")
+    assert(store.getAndReleaseLock("list3") === None, "list1 was in store")
   }
 
   test("LRU with mixed storage levels and streams") {
@@ -661,33 +674,37 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
     val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
     // First store list1 and list2, both in memory, and list3, on disk only
-    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
-    store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
-    store.putIterator("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val listForSizeEstimate = new ArrayBuffer[Any]
     listForSizeEstimate ++= list1.iterator
     val listSize = SizeEstimator.estimate(listForSizeEstimate)
     // At this point LRU should not kick in because list3 is only on disk
-    assert(store.get("list1").isDefined, "list1 was not in store")
+    assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
     assert(store.get("list1").get.data.size === 2)
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
-    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
     assert(store.get("list3").get.data.size === 2)
-    assert(store.get("list1").isDefined, "list1 was not in store")
+    assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
     assert(store.get("list1").get.data.size === 2)
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
-    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
     assert(store.get("list3").get.data.size === 2)
     // Now let's add in list4, which uses both disk and memory; list1 should drop out
-    store.putIterator("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
-    assert(store.get("list1") === None, "list1 was in store")
-    assert(store.get("list2").isDefined, "list2 was not in store")
+    store.putIteratorAndReleaseLock(
+      "list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
+    assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
+    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
-    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
     assert(store.get("list3").get.data.size === 2)
-    assert(store.get("list4").isDefined, "list4 was not in store")
+    assert(store.getAndReleaseLock("list4").isDefined, "list4 was not in store")
     assert(store.get("list4").get.data.size === 2)
   }
 
@@ -705,18 +722,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("overly large block") {
     store = makeBlockManager(5000)
-    store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
-    assert(store.getSingle("a1") === None, "a1 was in store")
-    store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
+    store.putSingleAndReleaseLock("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+    assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
+    store.putSingleAndReleaseLock("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
     assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
-    assert(store.getSingle("a2").isDefined, "a2 was not in store")
+    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
   }
 
   test("block compression") {
     try {
       conf.set("spark.shuffle.compress", "true")
       store = makeBlockManager(20000, "exec1")
-      store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(
+        ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
         "shuffle_0_0_0 was not compressed")
       store.stop()
@@ -724,7 +742,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.shuffle.compress", "false")
       store = makeBlockManager(20000, "exec2")
-      store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(
+        ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
         "shuffle_0_0_0 was compressed")
       store.stop()
@@ -732,7 +751,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.broadcast.compress", "true")
       store = makeBlockManager(20000, "exec3")
-      store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(
+        BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
         "broadcast_0 was not compressed")
       store.stop()
@@ -740,28 +760,29 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       conf.set("spark.broadcast.compress", "false")
       store = makeBlockManager(20000, "exec4")
-      store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(
+        BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "true")
       store = makeBlockManager(20000, "exec5")
-      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "false")
       store = makeBlockManager(20000, "exec6")
-      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed")
       store.stop()
       store = null
 
       // Check that any other block types are also kept uncompressed
       store = makeBlockManager(20000, "exec7")
-      store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+      store.putSingleAndReleaseLock("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
       assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed")
       store.stop()
       store = null
@@ -789,12 +810,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     class UnserializableClass
     val a1 = new UnserializableClass
     intercept[java.io.NotSerializableException] {
-      store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
+      store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
     }
 
     // Make sure get a1 doesn't hang and returns None.
     failAfter(1 second) {
-      assert(store.getSingle("a1").isEmpty, "a1 should not be in store")
+      assert(store.getSingleAndReleaseLock("a1").isEmpty, "a1 should not be in store")
     }
   }
 
@@ -844,6 +865,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("updated block statuses") {
     store = makeBlockManager(12000)
+    store.registerTask(0)
     val list = List.fill(2)(new Array[Byte](2000))
     val bigList = List.fill(8)(new Array[Byte](2000))
 
@@ -860,7 +882,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 1 updated block (i.e. list1)
     val updatedBlocks1 = getUpdatedBlocks {
-      store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks1.size === 1)
     assert(updatedBlocks1.head._1 === TestBlockId("list1"))
@@ -868,7 +891,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 1 updated block (i.e. list2)
     val updatedBlocks2 = getUpdatedBlocks {
-      store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
     }
     assert(updatedBlocks2.size === 1)
     assert(updatedBlocks2.head._1 === TestBlockId("list2"))
@@ -876,7 +900,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 2 updated blocks - list1 is kicked out of memory while list3 is added
     val updatedBlocks3 = getUpdatedBlocks {
-      store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks3.size === 2)
     updatedBlocks3.foreach { case (id, status) =>
@@ -890,7 +915,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
     val updatedBlocks4 = getUpdatedBlocks {
-      store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks4.size === 2)
     updatedBlocks4.foreach { case (id, status) =>
@@ -905,7 +931,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // No updated blocks - list5 is too big to fit in store and nothing is kicked out
     val updatedBlocks5 = getUpdatedBlocks {
-      store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        "list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     assert(updatedBlocks5.size === 0)
 
@@ -929,9 +956,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list = List.fill(2)(new Array[Byte](2000))
 
     // Tell master. By LRU, only list2 and list3 remains.
-    store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getLocations("list1").size === 0)
@@ -945,9 +975,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
 
     // This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
-    store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
-    store.putIterator("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-    store.putIterator("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putIteratorAndReleaseLock(
+      "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putIteratorAndReleaseLock(
+      "list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+    store.putIteratorAndReleaseLock(
+      "list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // getLocations should return nothing because the master is not informed
     // getBlockStatus without asking slaves should have the same result
@@ -968,9 +1001,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val list = List.fill(2)(new Array[Byte](100))
 
     // insert some blocks
-    store.putIterator("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIterator("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size
@@ -979,9 +1015,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       === 1)
 
     // insert some more blocks
-    store.putIterator("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.putIterator("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-    store.putIterator("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+    store.putIteratorAndReleaseLock(
+      "newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIteratorAndReleaseLock(
+      "newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+    store.putIteratorAndReleaseLock(
+      "newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size
@@ -991,7 +1030,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
     blockIds.foreach { blockId =>
-      store.putIterator(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIteratorAndReleaseLock(
+        blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
       case RDDBlockId(1, _) => true
@@ -1002,12 +1042,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
     store = makeBlockManager(12000)
-    store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Access rdd_1_0 to ensure it's not least recently used.
-    assert(store.getSingle(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
+    assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
     // According to the same-RDD rule, rdd_1_0 should be replaced here.
-    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // rdd_1_0 should have been replaced, even it's not least recently used.
     assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
     assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
@@ -1086,8 +1126,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     memoryStore.releasePendingUnrollMemoryForThisTask()
 
     // Unroll with not enough space. This should succeed after kicking out someBlock1.
-    store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
-    store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIteratorAndReleaseLock("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIteratorAndReleaseLock("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
     unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
     verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
@@ -1098,7 +1138,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
     // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
     // In the mean time, however, we kicked out someBlock2 before giving up.
-    store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIteratorAndReleaseLock("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
     unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator)
     verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
@@ -1130,8 +1170,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // would not know how to drop them from memory later.
     memoryStore.remove("b1")
     memoryStore.remove("b2")
-    store.putIterator("b1", smallIterator, memOnly)
-    store.putIterator("b2", smallIterator, memOnly)
+    store.putIteratorAndReleaseLock("b1", smallIterator, memOnly)
+    store.putIteratorAndReleaseLock("b2", smallIterator, memOnly)
 
     // Unroll with not enough space. This should succeed but kick out b1 in the process.
     val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
@@ -1142,7 +1182,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(memoryStore.contains("b3"))
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     memoryStore.remove("b3")
-    store.putIterator("b3", smallIterator, memOnly)
+    store.putIteratorAndReleaseLock("b3", smallIterator, memOnly)
 
     // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
     val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
@@ -1169,8 +1209,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
-    store.putIterator("b1", smallIterator, memAndDisk)
-    store.putIterator("b2", smallIterator, memAndDisk)
+    store.putIteratorAndReleaseLock("b1", smallIterator, memAndDisk)
+    store.putIteratorAndReleaseLock("b2", smallIterator, memAndDisk)
 
     // Unroll with not enough space. This should succeed but kick out b1 in the process.
     // Memory store should contain b2 and b3, while disk store should contain only b1
@@ -1183,7 +1223,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(!diskStore.contains("b2"))
     assert(!diskStore.contains("b3"))
     memoryStore.remove("b3")
-    store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
+    store.putIteratorAndReleaseLock("b3", smallIterator, StorageLevel.MEMORY_ONLY)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll huge block with not enough space. This should fail and drop the new block to disk
@@ -1244,6 +1284,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store = makeBlockManager(12000)
     val memoryStore = store.memoryStore
     val blockId = BlockId("rdd_3_10")
+    store.blockInfoManager.lockNewBlockForWriting(
+      blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false))
     val result = memoryStore.putBytes(blockId, 13000, () => {
       fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
     })
@@ -1263,4 +1305,104 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(result.size === 10000)
     assert(result.data === Right(bytes))
   }
+
+  test("read-locked blocks cannot be evicted from the MemoryStore") {
+    store = makeBlockManager(12000)
+    val arr = new Array[Byte](4000)
+    // First store a1 and a2, both in memory, and a3, on disk only
+    store.putSingleAndReleaseLock("a1", arr, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingleAndReleaseLock("a2", arr, StorageLevel.MEMORY_ONLY_SER)
+    assert(store.getSingle("a1").isDefined, "a1 was not in store")
+    assert(store.getSingle("a2").isDefined, "a2 was not in store")
+    // This put should fail because both a1 and a2 should be read-locked:
+    store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+    assert(store.getSingle("a3").isEmpty, "a3 was in store")
+    assert(store.getSingle("a1").isDefined, "a1 was not in store")
+    assert(store.getSingle("a2").isDefined, "a2 was not in store")
+    // Release both pins of block a2:
+    store.releaseLock("a2")
+    store.releaseLock("a2")
+    // Block a1 is the least-recently accessed, so an LRU eviction policy would evict it before
+    // block a2. However, a1 is still pinned so this put of a3 should evict a2 instead:
+    store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
+    assert(store.getSingle("a2").isEmpty, "a2 was in store")
+    assert(store.getSingle("a1").isDefined, "a1 was not in store")
+    assert(store.getSingle("a3").isDefined, "a3 was not in store")
+  }
+}
+
+private object BlockManagerSuite {
+
+  private implicit class BlockManagerTestUtils(store: BlockManager) {
+
+    def putSingleAndReleaseLock(
+        block: BlockId,
+        value: Any,
+        storageLevel: StorageLevel,
+        tellMaster: Boolean): Unit = {
+      if (store.putSingle(block, value, storageLevel, tellMaster)) {
+        store.releaseLock(block)
+      }
+    }
+
+    def putSingleAndReleaseLock(block: BlockId, value: Any, storageLevel: StorageLevel): Unit = {
+      if (store.putSingle(block, value, storageLevel)) {
+        store.releaseLock(block)
+      }
+    }
+
+    def putIteratorAndReleaseLock(
+        blockId: BlockId,
+        values: Iterator[Any],
+        level: StorageLevel): Unit = {
+      if (store.putIterator(blockId, values, level)) {
+        store.releaseLock(blockId)
+      }
+    }
+
+    def putIteratorAndReleaseLock(
+        blockId: BlockId,
+        values: Iterator[Any],
+        level: StorageLevel,
+        tellMaster: Boolean): Unit = {
+      if (store.putIterator(blockId, values, level, tellMaster)) {
+        store.releaseLock(blockId)
+      }
+    }
+
+    def dropFromMemoryIfExists(
+        blockId: BlockId,
+        data: () => Either[Array[Any], ByteBuffer]): Unit = {
+      store.blockInfoManager.lockForWriting(blockId).foreach { info =>
+        val newEffectiveStorageLevel = store.dropFromMemory(blockId, data)
+        if (newEffectiveStorageLevel.isValid) {
+          // The block is still present in at least one store, so release the lock
+          // but don't delete the block info
+          store.releaseLock(blockId)
+        } else {
+          // The block isn't present in any store, so delete the block info so that the
+          // block can be stored again
+          store.blockInfoManager.removeBlock(blockId)
+        }
+      }
+    }
+
+    private def wrapGet[T](f: BlockId => Option[T]): BlockId => Option[T] = (blockId: BlockId) => {
+      val result = f(blockId)
+      if (result.isDefined) {
+        store.releaseLock(blockId)
+      }
+      result
+    }
+
+    def hasLocalBlock(blockId: BlockId): Boolean = {
+      getLocalAndReleaseLock(blockId).isDefined
+    }
+
+    val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocal)
+    val getAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.get)
+    val getSingleAndReleaseLock: (BlockId) => Option[Any] = wrapGet(store.getSingle)
+    val getLocalBytesAndReleaseLock: (BlockId) => Option[ByteBuffer] = wrapGet(store.getLocalBytes)
+  }
+
 }
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
index f55b884bc45cee4f51bd7c54f4e7a6911bc2a4ff..631d7677152561a0bd70a43189b06377e83ed1c6 100644
--- a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
@@ -28,7 +28,7 @@ import io.netty.buffer.Unpooled;
 /**
  * A {@link ManagedBuffer} backed by {@link ByteBuffer}.
  */
-public final class NioManagedBuffer extends ManagedBuffer {
+public class NioManagedBuffer extends ManagedBuffer {
   private final ByteBuffer buf;
 
   public NioManagedBuffer(ByteBuffer buf) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 83d7953aaf7001a24ecafda70de17274bfd44f6b..efa2eeaf4d75107a928a64cc95cc67b34edffa84 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -46,7 +46,9 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
   }
 
   def isMaterialized(rddId: Int): Boolean = {
-    sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
+    val maybeBlock = sparkContext.env.blockManager.get(RDDBlockId(rddId, 0))
+    maybeBlock.foreach(_ => sparkContext.env.blockManager.releaseLock(RDDBlockId(rddId, 0)))
+    maybeBlock.nonEmpty
   }
 
   test("withColumn doesn't invalidate cached dataframe") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 11863caffed7559c363c897a04bf60036ee7f89e..86f02e68e5b0e41f7e453157d718326cc29f266c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -40,7 +40,9 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
   }
 
   def isMaterialized(rddId: Int): Boolean = {
-    sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
+    val maybeBlock = sparkContext.env.blockManager.get(RDDBlockId(rddId, 0))
+    maybeBlock.foreach(_ => sparkContext.env.blockManager.releaseLock(RDDBlockId(rddId, 0)))
+    maybeBlock.nonEmpty
   }
 
   test("cache table") {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index e22e320b171269aa00d3ca67ccba5edf9738bc2d..3d9c085013ff58fc4b0cfee2e2f30058e62133c4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -91,6 +91,8 @@ private[streaming] class BlockManagerBasedBlockHandler(
     if (!putSucceeded) {
       throw new SparkException(
         s"Could not store $blockId to block manager with storage level $storageLevel")
+    } else {
+      blockManager.releaseLock(blockId)
     }
     BlockManagerBasedStoreResult(blockId, numRecords)
   }
@@ -189,6 +191,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
       if (!putSucceeded) {
         throw new SparkException(
           s"Could not store $blockId to block manager with storage level $storageLevel")
+      } else {
+        blockManager.releaseLock(blockId)
       }
     }