diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b59191b2917c9aebd28d1feb36d484d2544b54e3..dcf359e3c29fe3e8db242de29a0966d19e23fa00 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -52,6 +52,12 @@ private[spark] class BlockResult(
     val readMethod: DataReadMethod.Value,
     val bytes: Long)
 
+// Class for representing return value of doPut()
+private sealed trait DoPutResult
+private case object DoPutSucceeded extends DoPutResult
+private case object DoPutBytesFailed extends DoPutResult
+private case class DoPutIteratorFailed(iter: Iterator[Any]) extends DoPutResult
+
 /**
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
@@ -432,98 +438,108 @@ private[spark] class BlockManager(
         logDebug(s"Block $blockId was not found")
         None
       case Some(info) =>
-        val level = info.level
-        logDebug(s"Level for block $blockId is $level")
-
-        // Look for the block in memory
-        if (level.useMemory) {
-          logDebug(s"Getting block $blockId from memory")
-          val result = if (asBlockResult) {
-            memoryStore.getValues(blockId).map { iter =>
-              val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
-              new BlockResult(ci, DataReadMethod.Memory, info.size)
-            }
-          } else {
-            memoryStore.getBytes(blockId)
-          }
-          result match {
-            case Some(values) =>
-              return result
-            case None =>
-              logDebug(s"Block $blockId not found in memory")
-          }
+        doGetLocal(blockId, info, asBlockResult)
+    }
+  }
+
+  /**
+   * Get a local block from the block manager.
+   * Assumes that the caller holds a read lock on the block.
+   */
+  private def doGetLocal(
+      blockId: BlockId,
+      info: BlockInfo,
+      asBlockResult: Boolean): Option[Any] = {
+    val level = info.level
+    logDebug(s"Level for block $blockId is $level")
+
+    // Look for the block in memory
+    if (level.useMemory) {
+      logDebug(s"Getting block $blockId from memory")
+      val result = if (asBlockResult) {
+        memoryStore.getValues(blockId).map { iter =>
+          val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
+          new BlockResult(ci, DataReadMethod.Memory, info.size)
         }
+      } else {
+        memoryStore.getBytes(blockId)
+      }
+      result match {
+        case Some(values) =>
+          return result
+        case None =>
+          logDebug(s"Block $blockId not found in memory")
+      }
+    }
 
-        // Look for block on disk, potentially storing it back in memory if required
-        if (level.useDisk) {
-          logDebug(s"Getting block $blockId from disk")
-          val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
-            case Some(b) => b
-            case None =>
-              releaseLock(blockId)
-              throw new BlockException(
-                blockId, s"Block $blockId not found on disk, though it should be")
-          }
-          assert(0 == bytes.position())
-
-          if (!level.useMemory) {
-            // If the block shouldn't be stored in memory, we can just return it
-            if (asBlockResult) {
-              val iter = CompletionIterator[Any, Iterator[Any]](
-                dataDeserialize(blockId, bytes), releaseLock(blockId))
-              return Some(new BlockResult(iter, DataReadMethod.Disk, info.size))
-            } else {
-              return Some(bytes)
-            }
-          } else {
-            // Otherwise, we also have to store something in the memory store
-            if (!level.deserialized || !asBlockResult) {
-              /* We'll store the bytes in memory if the block's storage level includes
-               * "memory serialized", or if it should be cached as objects in memory
-               * but we only requested its serialized bytes. */
-              memoryStore.putBytes(blockId, bytes.limit, () => {
-                // https://issues.apache.org/jira/browse/SPARK-6076
-                // If the file size is bigger than the free memory, OOM will happen. So if we cannot
-                // put it into MemoryStore, copyForMemory should not be created. That's why this
-                // action is put into a `() => ByteBuffer` and created lazily.
-                val copyForMemory = ByteBuffer.allocate(bytes.limit)
-                copyForMemory.put(bytes)
-              })
-              bytes.rewind()
-            }
-            if (!asBlockResult) {
-              return Some(bytes)
-            } else {
-              val values = dataDeserialize(blockId, bytes)
-              if (level.deserialized) {
-                // Cache the values before returning them
-                val putResult = memoryStore.putIterator(
-                  blockId, values, level, returnValues = true, allowPersistToDisk = false)
-                // The put may or may not have succeeded, depending on whether there was enough
-                // space to unroll the block. Either way, the put here should return an iterator.
-                putResult.data match {
-                  case Left(it) =>
-                    val ci = CompletionIterator[Any, Iterator[Any]](it, releaseLock(blockId))
-                    return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
-                  case _ =>
-                    // This only happens if we dropped the values back to disk (which is never)
-                    throw new SparkException("Memory store did not return an iterator!")
-                }
-              } else {
-                val ci = CompletionIterator[Any, Iterator[Any]](values, releaseLock(blockId))
-                return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+    // Look for block on disk, potentially storing it back in memory if required
+    if (level.useDisk) {
+      logDebug(s"Getting block $blockId from disk")
+      val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
+        case Some(b) => b
+        case None =>
+          releaseLock(blockId)
+          throw new BlockException(
+            blockId, s"Block $blockId not found on disk, though it should be")
+      }
+      assert(0 == bytes.position())
+
+      if (!level.useMemory) {
+        // If the block shouldn't be stored in memory, we can just return it
+        if (asBlockResult) {
+          val iter = CompletionIterator[Any, Iterator[Any]](
+            dataDeserialize(blockId, bytes), releaseLock(blockId))
+          return Some(new BlockResult(iter, DataReadMethod.Disk, info.size))
+        } else {
+          return Some(bytes)
+        }
+      } else {
+        // Otherwise, we also have to store something in the memory store
+        if (!level.deserialized || !asBlockResult) {
+          /* We'll store the bytes in memory if the block's storage level includes
+           * "memory serialized", or if it should be cached as objects in memory
+           * but we only requested its serialized bytes. */
+          memoryStore.putBytes(blockId, bytes.limit, () => {
+            // https://issues.apache.org/jira/browse/SPARK-6076
+            // If the file size is bigger than the free memory, OOM will happen. So if we cannot
+            // put it into MemoryStore, copyForMemory should not be created. That's why this
+            // action is put into a `() => ByteBuffer` and created lazily.
+            val copyForMemory = ByteBuffer.allocate(bytes.limit)
+            copyForMemory.put(bytes)
+          })
+          bytes.rewind()
+        }
+        if (!asBlockResult) {
+          return Some(bytes)
+        } else {
+          val values = dataDeserialize(blockId, bytes)
+          val valuesToReturn: Iterator[Any] = {
+            if (level.deserialized) {
+              // Cache the values before returning them
+              memoryStore.putIterator(blockId, values, level, allowPersistToDisk = false) match {
+                case Left(iter) =>
+                  // The memory store put() failed, so it returned the iterator back to us:
+                  iter
+                case Right(_) =>
+                  // The put() succeeded, so we can read the values back:
+                  memoryStore.getValues(blockId).get
               }
+            } else {
+              values
             }
           }
-        } else {
-          // This branch represents a case where the BlockInfoManager contained an entry for
-          // the block but the block could not be found in any of the block stores. This case
-          // should never occur, but for completeness's sake we address it here.
-          logError(
-            s"Block $blockId is supposedly stored locally but was not found in any block store")
-          releaseLock(blockId)
-          None
+          val ci = CompletionIterator[Any, Iterator[Any]](valuesToReturn, releaseLock(blockId))
+          return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         }
+      }
+    } else {
+      // This branch represents a case where the BlockInfoManager contained an entry for
+      // the block but the block could not be found in any of the block stores. This case
+      // should never occur, but for completeness's sake we address it here.
+      logError(
+        s"Block $blockId is supposedly stored locally but was not found in any block store")
+      releaseLock(blockId)
+      None
     }
   }
 
@@ -659,7 +675,7 @@ private[spark] class BlockManager(
       makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = {
     // Initially we hold no locks on this block.
     doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) match {
-      case None =>
+      case DoPutSucceeded =>
         // doPut() didn't hand work back to us, so the block already existed or was successfully
         // stored. Therefore, we now hold a read lock on the block.
         val blockResult = get(blockId).getOrElse {
@@ -669,11 +685,13 @@ private[spark] class BlockManager(
           throw new SparkException(s"get() failed for block $blockId even though we held a lock")
         }
         Left(blockResult)
-      case Some(failedPutResult) =>
+      case DoPutIteratorFailed(iter) =>
         // The put failed, likely because the data was too large to fit in memory and could not be
         // dropped to disk. Therefore, we need to pass the input iterator back to the caller so
         // that they can decide what to do with the values (e.g. process them without caching).
-       Right(failedPutResult.data.left.get)
+       Right(iter)
+      case DoPutBytesFailed =>
+        throw new SparkException("doPut returned an invalid failure response")
     }
   }
 
@@ -687,7 +705,13 @@ private[spark] class BlockManager(
       tellMaster: Boolean = true,
       effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
     require(values != null, "Values is null")
-    doPut(blockId, IteratorValues(() => values), level, tellMaster, effectiveStorageLevel).isEmpty
+    val result = doPut(
+      blockId,
+      IteratorValues(() => values),
+      level,
+      tellMaster,
+      effectiveStorageLevel)
+    result == DoPutSucceeded
   }
 
   /**
@@ -719,7 +743,8 @@ private[spark] class BlockManager(
       tellMaster: Boolean = true,
       effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
     require(bytes != null, "Bytes is null")
-    doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel).isEmpty
+    val result = doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
+    result == DoPutSucceeded
   }
 
   /**
@@ -734,9 +759,9 @@ private[spark] class BlockManager(
    * @param keepReadLock if true, this method will hold the read lock when it returns (even if the
    *                     block already exists). If false, this method will hold no locks when it
    *                     returns.
-   * @return `Some(PutResult)` if the block did not exist and could not be successfully cached,
-   *         or None if the block already existed or was successfully stored (fully consuming
-   *         the input data / input iterator).
+   * @return [[DoPutSucceeded]] if the block was already present or if the put succeeded, or
+   *        [[DoPutBytesFailed]] if the put failed and we were storing bytes, or
+   *        [[DoPutIteratorFailed]] if the put failed and we were storing an iterator.
    */
   private def doPut(
       blockId: BlockId,
@@ -744,7 +769,7 @@ private[spark] class BlockManager(
       level: StorageLevel,
       tellMaster: Boolean = true,
       effectiveStorageLevel: Option[StorageLevel] = None,
-      keepReadLock: Boolean = false): Option[PutResult] = {
+      keepReadLock: Boolean = false): DoPutResult = {
 
     require(blockId != null, "BlockId is null")
     require(level != null && level.isValid, "StorageLevel is null or invalid")
@@ -765,21 +790,12 @@ private[spark] class BlockManager(
           // lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
           releaseLock(blockId)
         }
-        return None
+        return DoPutSucceeded
       }
     }
 
     val startTimeMs = System.currentTimeMillis
 
-    /* If we're storing values and we need to replicate the data, we'll want access to the values,
-     * but because our put will read the whole iterator, there will be no values left. For the
-     * case where the put serializes data, we'll remember the bytes, above; but for the case where
-     * it doesn't, such as deserialized storage, let's rely on the put returning an Iterator. */
-    var valuesAfterPut: Iterator[Any] = null
-
-    // Ditto for the bytes after the put
-    var bytesAfterPut: ByteBuffer = null
-
     // Size of the block in bytes
     var size = 0L
 
@@ -801,43 +817,46 @@ private[spark] class BlockManager(
     }
 
     var blockWasSuccessfullyStored = false
-    var result: PutResult = null
+    var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
 
     putBlockInfo.synchronized {
       logTrace("Put for block %s took %s to get into synchronized block"
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
 
       try {
-        // returnValues - Whether to return the values put
-        // blockStore - The type of storage to put these values into
-        val (returnValues, blockStore: BlockStore) = {
-          if (putLevel.useMemory) {
-            // Put it in memory first, even if it also has useDisk set to true;
-            // We will drop it to disk later if the memory store can't hold it.
-            (true, memoryStore)
-          } else if (putLevel.useDisk) {
-            // Don't get back the bytes from put unless we replicate them
-            (putLevel.replication > 1, diskStore)
-          } else {
-            assert(putLevel == StorageLevel.NONE)
-            throw new BlockException(
-              blockId, s"Attempted to put block $blockId without specifying storage level!")
+        if (putLevel.useMemory) {
+          // Put it in memory first, even if it also has useDisk set to true;
+          // We will drop it to disk later if the memory store can't hold it.
+          data match {
+            case IteratorValues(iterator) =>
+              memoryStore.putIterator(blockId, iterator(), putLevel) match {
+                case Right(s) =>
+                  size = s
+                case Left(iter) =>
+                  iteratorFromFailedMemoryStorePut = Some(iter)
+              }
+            case ByteBufferValues(bytes) =>
+              bytes.rewind()
+              size = bytes.limit()
+              memoryStore.putBytes(blockId, bytes, putLevel)
           }
-        }
-
-        // Actually put the values
-        result = data match {
-          case IteratorValues(iterator) =>
-            blockStore.putIterator(blockId, iterator(), putLevel, returnValues)
-          case ByteBufferValues(bytes) =>
-            bytes.rewind()
-            blockStore.putBytes(blockId, bytes, putLevel)
-        }
-        size = result.size
-        result.data match {
-          case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
-          case Right (newBytes) => bytesAfterPut = newBytes
-          case _ =>
+        } else if (putLevel.useDisk) {
+          data match {
+            case IteratorValues(iterator) =>
+              diskStore.putIterator(blockId, iterator(), putLevel) match {
+                case Right(s) =>
+                  size = s
+                // putIterator() will never return Left (see its return type).
+              }
+            case ByteBufferValues(bytes) =>
+              bytes.rewind()
+              size = bytes.limit()
+              diskStore.putBytes(blockId, bytes, putLevel)
+          }
+        } else {
+          assert(putLevel == StorageLevel.NONE)
+          throw new BlockException(
+            blockId, s"Attempted to put block $blockId without specifying storage level!")
         }
 
         val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
@@ -868,34 +887,27 @@ private[spark] class BlockManager(
     }
     logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
 
-    // Either we're storing bytes and we asynchronously started replication, or we're storing
-    // values and need to serialize and replicate them now:
-    if (putLevel.replication > 1) {
-      data match {
-        case ByteBufferValues(bytes) =>
-          if (replicationFuture != null) {
-            Await.ready(replicationFuture, Duration.Inf)
-          }
-        case _ =>
-          if (blockWasSuccessfullyStored) {
-            val remoteStartTime = System.currentTimeMillis
-            // Serialize the block if not already done
-            if (bytesAfterPut == null) {
-              if (valuesAfterPut == null) {
-                throw new SparkException(
-                  "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
-              }
-              bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
-            }
-            replicate(blockId, bytesAfterPut, putLevel)
-            logDebug("Put block %s remotely took %s"
-              .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
+    if (replicationFuture != null) {
+      // Wait for asynchronous replication to finish
+      Await.ready(replicationFuture, Duration.Inf)
+    } else if (putLevel.replication > 1 && blockWasSuccessfullyStored) {
+      val remoteStartTime = System.currentTimeMillis
+      val bytesToReplicate: ByteBuffer = {
+        doGetLocal(blockId, putBlockInfo, asBlockResult = false)
+          .map(_.asInstanceOf[ByteBuffer])
+          .getOrElse {
+            throw new SparkException(s"Block $blockId was not found even though it was just stored")
           }
       }
+      try {
+        replicate(blockId, bytesToReplicate, putLevel)
+      } finally {
+        BlockManager.dispose(bytesToReplicate)
+      }
+      logDebug("Put block %s remotely took %s"
+        .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
     }
 
-    BlockManager.dispose(bytesAfterPut)
-
     if (putLevel.replication > 1) {
       logDebug("Putting block %s with replication took %s"
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
@@ -905,9 +917,11 @@ private[spark] class BlockManager(
     }
 
     if (blockWasSuccessfullyStored) {
-      None
+      DoPutSucceeded
+    } else if (iteratorFromFailedMemoryStorePut.isDefined) {
+      DoPutIteratorFailed(iteratorFromFailedMemoryStorePut.get)
     } else {
-      Some(result)
+      DoPutBytesFailed
     }
   }
 
@@ -1064,7 +1078,7 @@ private[spark] class BlockManager(
       logInfo(s"Writing block $blockId to disk")
       data() match {
         case Left(elements) =>
-          diskStore.putIterator(blockId, elements.toIterator, level, returnValues = false)
+          diskStore.putIterator(blockId, elements.toIterator, level)
         case Right(bytes) =>
           diskStore.putBytes(blockId, bytes, level)
       }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index d3af50d974232de2926b3240c0dc276803979b9f..b069918b16106740edc6b492401a063b9f74ef10 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -26,20 +26,18 @@ import org.apache.spark.Logging
  */
 private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging {
 
-  def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult
+  def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): Unit
 
   /**
-   * Put in a block and, possibly, also return its content as either bytes or another Iterator.
-   * This is used to efficiently write the values to multiple locations (e.g. for replication).
+   * Attempt to store an iterator of values.
    *
-   * @return a PutResult that contains the size of the data, as well as the values put if
-   *         returnValues is true (if not, the result's data field can be null)
+   * @return an iterator of values (in case the put failed), or the estimated size of the stored
+   *         values if the put succeeded.
    */
   def putIterator(
-    blockId: BlockId,
-    values: Iterator[Any],
-    level: StorageLevel,
-    returnValues: Boolean): PutResult
+      blockId: BlockId,
+      values: Iterator[Any],
+      level: StorageLevel): Either[Iterator[Any], Long]
 
   /**
    * Return the size of a block in bytes.
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index db12a4a1b999a5b7bfe08af5f9766a5d1b697a69..e35aa1b0684da15678c8757a0ed453bb177ac0bd 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -36,7 +36,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
     diskManager.getFile(blockId.name).length
   }
 
-  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
+  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
     // So that we do not modify the input offsets !
     // duplicate does not copy buffer, so inexpensive
     val bytes = _bytes.duplicate()
@@ -54,15 +54,12 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
     val finishTime = System.currentTimeMillis
     logDebug("Block %s stored as %s file on disk in %d ms".format(
       file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
-    PutResult(bytes.limit(), Right(bytes.duplicate()))
   }
 
   override def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
-      level: StorageLevel,
-      returnValues: Boolean): PutResult = {
-
+      level: StorageLevel): Right[Iterator[Any], Long] = {
     logDebug(s"Attempting to write values for block $blockId")
     val startTime = System.currentTimeMillis
     val file = diskManager.getFile(blockId)
@@ -90,13 +87,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
     logDebug("Block %s stored as %s file on disk in %d ms".format(
       file.getName, Utils.bytesToString(length), timeTaken))
 
-    if (returnValues) {
-      // Return a byte buffer for the contents of the file
-      val buffer = getBytes(blockId).get
-      PutResult(length, Right(buffer))
-    } else {
-      PutResult(length, null)
-    }
+    Right(length)
   }
 
   private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
@@ -127,10 +118,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
     getBytes(file, 0, file.length)
   }
 
-  def getBytes(segment: FileSegment): Option[ByteBuffer] = {
-    getBytes(segment.file, segment.offset, segment.length)
-  }
-
   override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
     getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
   }
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 317d73abba4c6dbaa8f544cec503d403679316a9..12b70d1807994ebffdbada381618eb36623b1d78 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -87,16 +87,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
     }
   }
 
-  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
+  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
     // Work on a duplicate - since the original input might be used elsewhere.
     val bytes = _bytes.duplicate()
     bytes.rewind()
     if (level.deserialized) {
       val values = blockManager.dataDeserialize(blockId, bytes)
-      putIterator(blockId, values, level, returnValues = true)
+      putIterator(blockId, values, level)
     } else {
       tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
-      PutResult(bytes.limit(), Right(bytes.duplicate()))
     }
   }
 
@@ -106,26 +105,20 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
    *
    * The caller should guarantee that `size` is correct.
    */
-  def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
+  def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = {
     // Work on a duplicate - since the original input might be used elsewhere.
     lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
     val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
-    val data =
-      if (putSuccess) {
-        assert(bytes.limit == size)
-        Right(bytes.duplicate())
-      } else {
-        null
-      }
-    PutResult(size, data)
+    if (putSuccess) {
+      assert(bytes.limit == size)
+    }
   }
 
   override def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
-      level: StorageLevel,
-      returnValues: Boolean): PutResult = {
-    putIterator(blockId, values, level, returnValues, allowPersistToDisk = true)
+      level: StorageLevel): Either[Iterator[Any], Long] = {
+    putIterator(blockId, values, level, allowPersistToDisk = true)
   }
 
   /**
@@ -144,32 +137,30 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,
-      returnValues: Boolean,
-      allowPersistToDisk: Boolean): PutResult = {
+      allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = {
     val unrolledValues = unrollSafely(blockId, values)
     unrolledValues match {
       case Left(arrayValues) =>
         // Values are fully unrolled in memory, so store them as an array
-        val res = {
+        val size = {
           if (level.deserialized) {
             val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
             tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)
-            PutResult(sizeEstimate, Left(arrayValues.iterator))
+            sizeEstimate
           } else {
             val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
             tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
-            PutResult(bytes.limit(), Right(bytes.duplicate()))
+            bytes.limit()
           }
         }
-        PutResult(res.size, res.data)
+        Right(size)
       case Right(iteratorValues) =>
         // Not enough space to unroll this block; drop to disk if applicable
         if (level.useDisk && allowPersistToDisk) {
           logWarning(s"Persisting block $blockId to disk instead.")
-          val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
-          PutResult(res.size, res.data)
+          blockManager.diskStore.putIterator(blockId, iteratorValues, level)
         } else {
-          PutResult(0, Left(iteratorValues))
+          Left(iteratorValues)
         }
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/storage/PutResult.scala b/core/src/main/scala/org/apache/spark/storage/PutResult.scala
deleted file mode 100644
index f0eac7594ecf6ec194e94973a686462edc2853c1..0000000000000000000000000000000000000000
--- a/core/src/main/scala/org/apache/spark/storage/PutResult.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-/**
- * Result of adding a block into a BlockStore. This case class contains a few things:
- *   (1) The estimated size of the put,
- *   (2) The values put if the caller asked for them to be returned (e.g. for chaining
- *       replication), and
- *   (3) A list of blocks dropped as a result of this put. This is always empty for DiskStore.
- */
-private[spark] case class PutResult(
-    size: Long,
-    data: Either[Iterator[_], ByteBuffer],
-    droppedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 89b427049b5488ba2327fc8c88adc970b8deb459..cfcbf1745d1b144ff5cdbb079ebf3c43ef020cc1 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1156,14 +1156,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll with plenty of space. This should succeed and cache both blocks.
-    val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
-    val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+    val result1 = memoryStore.putIterator("b1", smallIterator, memOnly)
+    val result2 = memoryStore.putIterator("b2", smallIterator, memOnly)
     assert(memoryStore.contains("b1"))
     assert(memoryStore.contains("b2"))
-    assert(result1.size > 0) // unroll was successful
-    assert(result2.size > 0)
-    assert(result1.data.isLeft) // unroll did not drop this block to disk
-    assert(result2.data.isLeft)
+    assert(result1.isRight) // unroll was successful
+    assert(result2.isRight)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Re-put these two blocks so block manager knows about them too. Otherwise, block manager
@@ -1174,9 +1172,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store.putIterator("b2", smallIterator, memOnly)
 
     // Unroll with not enough space. This should succeed but kick out b1 in the process.
-    val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
-    assert(result3.size > 0)
-    assert(result3.data.isLeft)
+    val result3 = memoryStore.putIterator("b3", smallIterator, memOnly)
+    assert(result3.isRight)
     assert(!memoryStore.contains("b1"))
     assert(memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
@@ -1185,9 +1182,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store.putIterator("b3", smallIterator, memOnly)
 
     // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
-    val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
-    assert(result4.size === 0) // unroll was unsuccessful
-    assert(result4.data.isLeft)
+    val result4 = memoryStore.putIterator("b4", bigIterator, memOnly)
+    assert(result4.isLeft) // unroll was unsuccessful
     assert(!memoryStore.contains("b1"))
     assert(!memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
@@ -1214,8 +1210,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     // Unroll with not enough space. This should succeed but kick out b1 in the process.
     // Memory store should contain b2 and b3, while disk store should contain only b1
-    val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, returnValues = true)
-    assert(result3.size > 0)
+    val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk)
+    assert(result3.isRight)
     assert(!memoryStore.contains("b1"))
     assert(memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
@@ -1229,9 +1225,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // Unroll huge block with not enough space. This should fail and drop the new block to disk
     // directly in addition to kicking out b2 in the process. Memory store should contain only
     // b3, while disk store should contain b1, b2 and b4.
-    val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, returnValues = true)
-    assert(result4.size > 0)
-    assert(result4.data.isRight) // unroll returned bytes from disk
+    val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk)
+    assert(result4.isRight)
     assert(!memoryStore.contains("b1"))
     assert(!memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
@@ -1252,28 +1247,28 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // All unroll memory used is released because unrollSafely returned an array
-    memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
+    memoryStore.putIterator("b1", smallIterator, memOnly)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
-    memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+    memoryStore.putIterator("b2", smallIterator, memOnly)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll memory is not released because unrollSafely returned an iterator
     // that still depends on the underlying vector used in the process
-    memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
+    memoryStore.putIterator("b3", smallIterator, memOnly)
     val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB3 > 0)
 
     // The unroll memory owned by this thread builds on top of its value after the previous unrolls
-    memoryStore.putIterator("b4", smallIterator, memOnly, returnValues = true)
+    memoryStore.putIterator("b4", smallIterator, memOnly)
     val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
 
     // ... but only to a certain extent (until we run out of free space to grant new unroll memory)
-    memoryStore.putIterator("b5", smallIterator, memOnly, returnValues = true)
+    memoryStore.putIterator("b5", smallIterator, memOnly)
     val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
-    memoryStore.putIterator("b6", smallIterator, memOnly, returnValues = true)
+    memoryStore.putIterator("b6", smallIterator, memOnly)
     val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
-    memoryStore.putIterator("b7", smallIterator, memOnly, returnValues = true)
+    memoryStore.putIterator("b7", smallIterator, memOnly)
     val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
     assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
@@ -1286,11 +1281,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val blockId = BlockId("rdd_3_10")
     store.blockInfoManager.lockNewBlockForWriting(
       blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false))
-    val result = memoryStore.putBytes(blockId, 13000, () => {
+    memoryStore.putBytes(blockId, 13000, () => {
       fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
     })
-    assert(result.size === 13000)
-    assert(result.data === null)
   }
 
   test("put a small ByteBuffer to MemoryStore") {
@@ -1298,12 +1291,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val memoryStore = store.memoryStore
     val blockId = BlockId("rdd_3_10")
     var bytes: ByteBuffer = null
-    val result = memoryStore.putBytes(blockId, 10000, () => {
+    memoryStore.putBytes(blockId, 10000, () => {
       bytes = ByteBuffer.allocate(10000)
       bytes
     })
-    assert(result.size === 10000)
-    assert(result.data === Right(bytes))
+    assert(memoryStore.getSize(blockId) === 10000)
   }
 
   test("read-locked blocks cannot be evicted from the MemoryStore") {