diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index d1bee3d2c033c11d43402967b0d9f0bb07b5988e..3f5d06e1aeee7257129e3376043aa19ae3f10021 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -22,6 +22,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
 
 import scala.concurrent.ExecutionContext.Implicits.global
 
+import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
@@ -112,6 +113,11 @@ private[spark] class BlockManager(
   private val broadcastCleaner = new MetadataCleaner(
     MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
 
+  // Field related to peer block managers that are necessary for block replication
+  @volatile private var cachedPeers: Seq[BlockManagerId] = _
+  private val peerFetchLock = new Object
+  private var lastPeerFetchTime = 0L
+
   initialize()
 
   /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
@@ -787,31 +793,111 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node.
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+    peerFetchLock.synchronized {
+      val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
+      val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
+      if (cachedPeers == null || forceFetch || timeout) {
+        cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
+        lastPeerFetchTime = System.currentTimeMillis
+        logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
+      }
+      cachedPeers
+    }
+  }
+
+  /**
+   * Replicate block to another node. Not that this is a blocking call that returns after
+   * the block has been replicated.
    */
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
+    val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
+    val numPeersToReplicateTo = level.replication - 1
+    val peersForReplication = new ArrayBuffer[BlockManagerId]
+    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
+    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
     val tLevel = StorageLevel(
       level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
-    if (cachedPeers == null) {
-      cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+    val startTime = System.currentTimeMillis
+    val random = new Random(blockId.hashCode)
+
+    var replicationFailed = false
+    var failures = 0
+    var done = false
+
+    // Get cached list of peers
+    peersForReplication ++= getPeers(forceFetch = false)
+
+    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
+    // So assuming the list of peers does not change and no replication failures,
+    // if there are multiple attempts in the same node to replicate the same block,
+    // the same set of peers will be selected.
+    def getRandomPeer(): Option[BlockManagerId] = {
+      // If replication had failed, then force update the cached list of peers and remove the peers
+      // that have been already used
+      if (replicationFailed) {
+        peersForReplication.clear()
+        peersForReplication ++= getPeers(forceFetch = true)
+        peersForReplication --= peersReplicatedTo
+        peersForReplication --= peersFailedToReplicateTo
+      }
+      if (!peersForReplication.isEmpty) {
+        Some(peersForReplication(random.nextInt(peersForReplication.size)))
+      } else {
+        None
+      }
     }
-    for (peer: BlockManagerId <- cachedPeers) {
-      val start = System.nanoTime
-      data.rewind()
-      logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " +
-        s"To node: $peer")
 
-      try {
-        blockTransferService.uploadBlockSync(
-          peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel)
-      } catch {
-        case e: Exception =>
-          logError(s"Failed to replicate block to $peer", e)
+    // One by one choose a random peer and try uploading the block to it
+    // If replication fails (e.g., target peer is down), force the list of cached peers
+    // to be re-fetched from driver and then pick another random peer for replication. Also
+    // temporarily black list the peer for which replication failed.
+    //
+    // This selection of a peer and replication is continued in a loop until one of the
+    // following 3 conditions is fulfilled:
+    // (i) specified number of peers have been replicated to
+    // (ii) too many failures in replicating to peers
+    // (iii) no peer left to replicate to
+    //
+    while (!done) {
+      getRandomPeer() match {
+        case Some(peer) =>
+          try {
+            val onePeerStartTime = System.currentTimeMillis
+            data.rewind()
+            logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
+            blockTransferService.uploadBlockSync(
+              peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel)
+            logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms"
+              .format((System.currentTimeMillis - onePeerStartTime)))
+            peersReplicatedTo += peer
+            peersForReplication -= peer
+            replicationFailed = false
+            if (peersReplicatedTo.size == numPeersToReplicateTo) {
+              done = true  // specified number of peers have been replicated to
+            }
+          } catch {
+            case e: Exception =>
+              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
+              failures += 1
+              replicationFailed = true
+              peersFailedToReplicateTo += peer
+              if (failures > maxReplicationFailures) { // too many failures in replcating to peers
+                done = true
+              }
+          }
+        case None => // no peer left to replicate to
+          done = true
       }
-
-      logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes."
-        .format(blockId, (System.nanoTime - start) / 1e6, data.limit()))
+    }
+    val timeTakeMs = (System.currentTimeMillis - startTime)
+    logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +
+      s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
+    if (peersReplicatedTo.size < numPeersToReplicateTo) {
+      logWarning(s"Block $blockId replicated to only " +
+        s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index d4487fce49ab681c9cb5f9a592800985bf5741b1..142285094342c006276ea8d01983ab708ba99fb9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -59,6 +59,8 @@ class BlockManagerId private (
 
   def port: Int = port_
 
+  def isDriver: Boolean = (executorId == "<driver>")
+
   override def writeExternal(out: ObjectOutput) {
     out.writeUTF(executorId_)
     out.writeUTF(host_)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 2e262594b353850899cf37f2683c4a9ea9fd46ad..d08e1419e3e419ebca23a76d4bf8f0f31c39c976 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -84,13 +84,8 @@ class BlockManagerMaster(
   }
 
   /** Get ids of other nodes in the cluster from the driver */
-  def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
-    val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
-    if (result.length != numPeers) {
-      throw new SparkException(
-        "Error getting peers, only got " + result.size + " instead of " + numPeers)
-    }
-    result
+  def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
+    askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 1a6c7cb24f9ac5af7c4a380a43e18c8493818e77..6a06257ed0c080708f2e110feb3b1bd4f1e30f17 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -83,8 +83,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
     case GetLocationsMultipleBlockIds(blockIds) =>
       sender ! getLocationsMultipleBlockIds(blockIds)
 
-    case GetPeers(blockManagerId, size) =>
-      sender ! getPeers(blockManagerId, size)
+    case GetPeers(blockManagerId) =>
+      sender ! getPeers(blockManagerId)
 
     case GetMemoryStatus =>
       sender ! memoryStatus
@@ -173,11 +173,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
    * from the executors, but not from the driver.
    */
   private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
-    // TODO: Consolidate usages of <driver>
     import context.dispatcher
     val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
     val requiredBlockManagers = blockManagerInfo.values.filter { info =>
-      removeFromDriver || info.blockManagerId.executorId != "<driver>"
+      removeFromDriver || !info.blockManagerId.isDriver
     }
     Future.sequence(
       requiredBlockManagers.map { bm =>
@@ -212,7 +211,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
     val minSeenTime = now - slaveTimeout
     val toRemove = new mutable.HashSet[BlockManagerId]
     for (info <- blockManagerInfo.values) {
-      if (info.lastSeenMs < minSeenTime && info.blockManagerId.executorId != "<driver>") {
+      if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
         logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
           + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
         toRemove += info.blockManagerId
@@ -232,7 +231,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
    */
   private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
     if (!blockManagerInfo.contains(blockManagerId)) {
-      blockManagerId.executorId == "<driver>" && !isLocal
+      blockManagerId.isDriver && !isLocal
     } else {
       blockManagerInfo(blockManagerId).updateLastSeenMs()
       true
@@ -355,7 +354,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
       tachyonSize: Long) {
 
     if (!blockManagerInfo.contains(blockManagerId)) {
-      if (blockManagerId.executorId == "<driver>" && !isLocal) {
+      if (blockManagerId.isDriver && !isLocal) {
         // We intentionally do not register the master (except in local mode),
         // so we should not indicate failure.
         sender ! true
@@ -403,16 +402,14 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
     blockIds.map(blockId => getLocations(blockId))
   }
 
-  private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = {
-    val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-
-    val selfIndex = peers.indexOf(blockManagerId)
-    if (selfIndex == -1) {
-      throw new SparkException("Self index for " + blockManagerId + " not found")
+  /** Get the list of the peers of the given block manager */
+  private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
+    val blockManagerIds = blockManagerInfo.keySet
+    if (blockManagerIds.contains(blockManagerId)) {
+      blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
+    } else {
+      Seq.empty
     }
-
-    // Note that this logic will select the same node multiple times if there aren't enough peers
-    Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
   }
 }
 
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 2ba16b847660081124c01de8e94246803ab0a2e1..3db5dd9774ae857e8819fe7957b468473e2365d5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -88,7 +88,7 @@ private[spark] object BlockManagerMessages {
 
   case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster
 
-  case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
+  case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
 
   case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
 
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 978a6ded808297ce1a2325e39a1b782a7c698b97..acaf321de52fb5e3ba661ba3b6824314f5415b20 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -132,7 +132,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
       val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
       assert(statuses.size === 1)
       statuses.head match { case (bm, status) =>
-        assert(bm.executorId === "<driver>", "Block should only be on the driver")
+        assert(bm.isDriver, "Block should only be on the driver")
         assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
         assert(status.memSize > 0, "Block should be in memory store on the driver")
         assert(status.diskSize === 0, "Block should not be in disk store on the driver")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..1f1d53a1ee3b06d1825cc86278690d8ded6816c6
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.implicitConversions
+import scala.language.postfixOps
+
+import akka.actor.{ActorSystem, Props}
+import org.mockito.Mockito.{mock, when}
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.network.BlockTransferService
+import org.apache.spark.network.nio.NioBlockTransferService
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.storage.StorageLevel._
+import org.apache.spark.util.{AkkaUtils, SizeEstimator}
+
+/** Testsuite that tests block replication in BlockManager */
+class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAndAfter {
+
+  private val conf = new SparkConf(false)
+  var actorSystem: ActorSystem = null
+  var master: BlockManagerMaster = null
+  val securityMgr = new SecurityManager(conf)
+  val mapOutputTracker = new MapOutputTrackerMaster(conf)
+  val shuffleManager = new HashShuffleManager(conf)
+
+  // List of block manager created during an unit test, so that all of the them can be stopped
+  // after the unit test.
+  val allStores = new ArrayBuffer[BlockManager]
+
+  // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+  conf.set("spark.kryoserializer.buffer.mb", "1")
+  val serializer = new KryoSerializer(conf)
+
+  // Implicitly convert strings to BlockIds for test clarity.
+  implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
+
+  private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = {
+    val transfer = new NioBlockTransferService(conf, securityMgr)
+    val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
+      mapOutputTracker, shuffleManager, transfer)
+    allStores += store
+    store
+  }
+
+  before {
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+      "test", "localhost", 0, conf = conf, securityManager = securityMgr)
+    this.actorSystem = actorSystem
+
+    conf.set("spark.authenticate", "false")
+    conf.set("spark.driver.port", boundPort.toString)
+    conf.set("spark.storage.unrollFraction", "0.4")
+    conf.set("spark.storage.unrollMemoryThreshold", "512")
+
+    // to make a replication attempt to inactive store fail fast
+    conf.set("spark.core.connection.ack.wait.timeout", "1")
+    // to make cached peers refresh frequently
+    conf.set("spark.storage.cachedPeersTtl", "10")
+
+    master = new BlockManagerMaster(
+      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
+      conf, true)
+    allStores.clear()
+  }
+
+  after {
+    allStores.foreach { _.stop() }
+    allStores.clear()
+    actorSystem.shutdown()
+    actorSystem.awaitTermination()
+    actorSystem = null
+    master = null
+  }
+
+
+  test("get peers with addition and removal of block managers") {
+    val numStores = 4
+    val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") }
+    val storeIds = stores.map { _.blockManagerId }.toSet
+    assert(master.getPeers(stores(0).blockManagerId).toSet ===
+      storeIds.filterNot { _ == stores(0).blockManagerId })
+    assert(master.getPeers(stores(1).blockManagerId).toSet ===
+      storeIds.filterNot { _ == stores(1).blockManagerId })
+    assert(master.getPeers(stores(2).blockManagerId).toSet ===
+      storeIds.filterNot { _ == stores(2).blockManagerId })
+
+    // Add driver store and test whether it is filtered out
+    val driverStore = makeBlockManager(1000, "<driver>")
+    assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
+    assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
+    assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))
+
+    // Add a new store and test whether get peers returns it
+    val newStore = makeBlockManager(1000, s"store$numStores")
+    assert(master.getPeers(stores(0).blockManagerId).toSet ===
+      storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId)
+    assert(master.getPeers(stores(1).blockManagerId).toSet ===
+      storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId)
+    assert(master.getPeers(stores(2).blockManagerId).toSet ===
+      storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId)
+    assert(master.getPeers(newStore.blockManagerId).toSet === storeIds)
+
+    // Remove a store and test whether get peers returns it
+    val storeIdToRemove = stores(0).blockManagerId
+    master.removeExecutor(storeIdToRemove.executorId)
+    assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove))
+    assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove))
+    assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove))
+
+    // Test whether asking for peers of a unregistered block manager id returns empty list
+    assert(master.getPeers(stores(0).blockManagerId).isEmpty)
+    assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty)
+  }
+
+
+  test("block replication - 2x replication") {
+    testReplication(2,
+      Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2)
+    )
+  }
+
+  test("block replication - 3x replication") {
+    // Generate storage levels with 3x replication
+    val storageLevels = {
+      Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map {
+        level => StorageLevel(
+          level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 3)
+      }
+    }
+    testReplication(3, storageLevels)
+  }
+
+  test("block replication - mixed between 1x to 5x") {
+    // Generate storage levels with varying replication
+    val storageLevels = Seq(
+      MEMORY_ONLY,
+      MEMORY_ONLY_SER_2,
+      StorageLevel(true, false, false, false, 3),
+      StorageLevel(true, true, false, true, 4),
+      StorageLevel(true, true, false, false, 5),
+      StorageLevel(true, true, false, true, 4),
+      StorageLevel(true, false, false, false, 3),
+      MEMORY_ONLY_SER_2,
+      MEMORY_ONLY
+    )
+    testReplication(5, storageLevels)
+  }
+
+  test("block replication - 2x replication without peers") {
+    intercept[org.scalatest.exceptions.TestFailedException] {
+      testReplication(1,
+        Seq(StorageLevel.MEMORY_AND_DISK_2, StorageLevel(true, false, false, false, 3)))
+    }
+  }
+
+  test("block replication - deterministic node selection") {
+    val blockSize = 1000
+    val storeSize = 10000
+    val stores = (1 to 5).map {
+      i => makeBlockManager(storeSize, s"store$i")
+    }
+    val storageLevel2x = StorageLevel.MEMORY_AND_DISK_2
+    val storageLevel3x = StorageLevel(true, true, false, true, 3)
+    val storageLevel4x = StorageLevel(true, true, false, true, 4)
+
+    def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = {
+      stores.head.putSingle(blockId, new Array[Byte](blockSize), level)
+      val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet
+      stores.foreach { _.removeBlock(blockId) }
+      master.removeBlock(blockId)
+      locations
+    }
+
+    // Test if two attempts to 2x replication returns same set of locations
+    val a1Locs = putBlockAndGetLocations("a1", storageLevel2x)
+    assert(putBlockAndGetLocations("a1", storageLevel2x) === a1Locs,
+      "Inserting a 2x replicated block second time gave different locations from the first")
+
+    // Test if two attempts to 3x replication returns same set of locations
+    val a2Locs3x = putBlockAndGetLocations("a2", storageLevel3x)
+    assert(putBlockAndGetLocations("a2", storageLevel3x) === a2Locs3x,
+      "Inserting a 3x replicated block second time gave different locations from the first")
+
+    // Test if 2x replication of a2 returns a strict subset of the locations of 3x replication
+    val a2Locs2x = putBlockAndGetLocations("a2", storageLevel2x)
+    assert(
+      a2Locs2x.subsetOf(a2Locs3x),
+      "Inserting a with 2x replication gave locations that are not a subset of locations" +
+        s" with 3x replication [3x: ${a2Locs3x.mkString(",")}; 2x: ${a2Locs2x.mkString(",")}"
+    )
+
+    // Test if 4x replication of a2 returns a strict superset of the locations of 3x replication
+    val a2Locs4x = putBlockAndGetLocations("a2", storageLevel4x)
+    assert(
+      a2Locs3x.subsetOf(a2Locs4x),
+      "Inserting a with 4x replication gave locations that are not a superset of locations " +
+        s"with 3x replication [3x: ${a2Locs3x.mkString(",")}; 4x: ${a2Locs4x.mkString(",")}"
+    )
+
+    // Test if 3x replication of two different blocks gives two different sets of locations
+    val a3Locs3x = putBlockAndGetLocations("a3", storageLevel3x)
+    assert(a3Locs3x !== a2Locs3x, "Two blocks gave same locations with 3x replication")
+  }
+
+  test("block replication - replication failures") {
+    /*
+      Create a system of three block managers / stores. One of them (say, failableStore)
+      cannot receive blocks. So attempts to use that as replication target fails.
+
+            +-----------/fails/-----------> failableStore
+            |
+        normalStore
+            |
+            +-----------/works/-----------> anotherNormalStore
+
+        We are first going to add a normal block manager (i.e. normalStore) and the failable block
+        manager (i.e. failableStore), and test whether 2x replication fails to create two
+        copies of a block. Then we are going to add another normal block manager
+        (i.e., anotherNormalStore), and test that now 2x replication works as the
+        new store will be used for replication.
+     */
+
+    // Add a normal block manager
+    val store = makeBlockManager(10000, "store")
+
+    // Insert a block with 2x replication and return the number of copies of the block
+    def replicateAndGetNumCopies(blockId: String): Int = {
+      store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2)
+      val numLocations = master.getLocations(blockId).size
+      allStores.foreach { _.removeBlock(blockId) }
+      numLocations
+    }
+
+    // Add a failable block manager with a mock transfer service that does not
+    // allow receiving of blocks. So attempts to use it as a replication target will fail.
+    val failableTransfer = mock(classOf[BlockTransferService]) // this wont actually work
+    when(failableTransfer.hostName).thenReturn("some-hostname")
+    when(failableTransfer.port).thenReturn(1000)
+    val failableStore = new BlockManager("failable-store", actorSystem, master, serializer,
+      10000, conf, mapOutputTracker, shuffleManager, failableTransfer)
+    allStores += failableStore // so that this gets stopped after test
+    assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId))
+
+    // Test that 2x replication fails by creating only one copy of the block
+    assert(replicateAndGetNumCopies("a1") === 1)
+
+    // Add another normal block manager and test that 2x replication works
+    makeBlockManager(10000, "anotherStore")
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      assert(replicateAndGetNumCopies("a2") === 2)
+    }
+  }
+
+  test("block replication - addition and deletion of block managers") {
+    val blockSize = 1000
+    val storeSize = 10000
+    val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") }
+
+    // Insert a block with given replication factor and return the number of copies of the block\
+    def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = {
+      val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
+      initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+      val numLocations = master.getLocations(blockId).size
+      allStores.foreach { _.removeBlock(blockId) }
+      numLocations
+    }
+
+    // 2x replication should work, 3x replication should only replicate 2x
+    assert(replicateAndGetNumCopies("a1", 2) === 2)
+    assert(replicateAndGetNumCopies("a2", 3) === 2)
+
+    // Add another store, 3x replication should work now, 4x replication should only replicate 3x
+    val newStore1 = makeBlockManager(storeSize, s"newstore1")
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      assert(replicateAndGetNumCopies("a3", 3) === 3)
+    }
+    assert(replicateAndGetNumCopies("a4", 4) === 3)
+
+    // Add another store, 4x replication should work now
+    val newStore2 = makeBlockManager(storeSize, s"newstore2")
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      assert(replicateAndGetNumCopies("a5", 4) === 4)
+    }
+
+    // Remove all but the 1st store, 2x replication should fail
+    (initialStores.tail ++ Seq(newStore1, newStore2)).foreach {
+      store =>
+        master.removeExecutor(store.blockManagerId.executorId)
+        store.stop()
+    }
+    assert(replicateAndGetNumCopies("a6", 2) === 1)
+
+    // Add new stores, 3x replication should work
+    val newStores = (3 to 5).map {
+      i => makeBlockManager(storeSize, s"newstore$i")
+    }
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      assert(replicateAndGetNumCopies("a7", 3) === 3)
+    }
+  }
+
+  /**
+   * Test replication of blocks with different storage levels (various combinations of
+   * memory, disk & serialization). For each storage level, this function tests every store
+   * whether the block is present and also tests the master whether its knowledge of blocks
+   * is correct. Then it also drops the block from memory of each store (using LRU) and
+   * again checks whether the master's knowledge gets updated.
+   */
+  private def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) {
+    import org.apache.spark.storage.StorageLevel._
+
+    assert(maxReplication > 1,
+      s"Cannot test replication factor $maxReplication")
+
+    // storage levels to test with the given replication factor
+
+    val storeSize = 10000
+    val blockSize = 1000
+
+    // As many stores as the replication factor
+    val stores = (1 to maxReplication).map {
+      i => makeBlockManager(storeSize, s"store$i")
+    }
+
+    storageLevels.foreach { storageLevel =>
+      // Put the block into one of the stores
+      val blockId = new TestBlockId(
+        "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
+      stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+
+      // Assert that master know two locations for the block
+      val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
+      assert(blockLocations.size === storageLevel.replication,
+        s"master did not have ${storageLevel.replication} locations for $blockId")
+
+      // Test state of the stores that contain the block
+      stores.filter {
+        testStore => blockLocations.contains(testStore.blockManagerId.executorId)
+      }.foreach { testStore =>
+        val testStoreName = testStore.blockManagerId.executorId
+        assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName")
+        assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
+          s"master does not have status for ${blockId.name} in $testStoreName")
+
+        val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId)
+
+        // Assert that block status in the master for this store has expected storage level
+        assert(
+          blockStatus.storageLevel.useDisk === storageLevel.useDisk &&
+            blockStatus.storageLevel.useMemory === storageLevel.useMemory &&
+            blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap &&
+            blockStatus.storageLevel.deserialized === storageLevel.deserialized,
+          s"master does not know correct storage level for ${blockId.name} in $testStoreName")
+
+        // Assert that the block status in the master for this store has correct memory usage info
+        assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize,
+          s"master does not know size of ${blockId.name} stored in memory of $testStoreName")
+
+
+        // If the block is supposed to be in memory, then drop the copy of the block in
+        // this store test whether master is updated with zero memory usage this store
+        if (storageLevel.useMemory) {
+          // Force the block to be dropped by adding a number of dummy blocks
+          (1 to 10).foreach {
+            i =>
+              testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER)
+          }
+          (1 to 10).foreach {
+            i => testStore.removeBlock(s"dummy-block-$i")
+          }
+
+          val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId)
+
+          // Assert that the block status in the master either does not exist (block removed
+          // from every store) or has zero memory usage for this store
+          assert(
+            newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0,
+            s"after dropping, master does not know size of ${blockId.name} " +
+              s"stored in memory of $testStoreName"
+          )
+        }
+
+        // If the block is supposed to be in disk (after dropping or otherwise, then
+        // test whether master has correct disk usage for this store
+        if (storageLevel.useDisk) {
+          assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize,
+            s"after dropping, master does not know size of ${blockId.name} " +
+              s"stored in disk of $testStoreName"
+          )
+        }
+      }
+      master.removeBlock(blockId)
+    }
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index e251660dae5de2cdd113635f2c15029817e15134..9d96202a3e7acddce99d0959dd1b1ba9c9138c04 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -21,8 +21,6 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
 import java.util.Arrays
 import java.util.concurrent.TimeUnit
 
-import org.apache.spark.network.nio.NioBlockTransferService
-
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.Await
 import scala.concurrent.duration._
@@ -35,13 +33,13 @@ import akka.util.Timeout
 
 import org.mockito.Mockito.{mock, when}
 
-import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.Timeouts._
-import org.scalatest.Matchers
 
 import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
 import org.apache.spark.executor.DataReadMethod
+import org.apache.spark.network.nio.NioBlockTransferService
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.shuffle.hash.HashShuffleManager
@@ -189,7 +187,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     store = makeBlockManager(2000, "exec1")
     store2 = makeBlockManager(2000, "exec2")
 
-    val peers = master.getPeers(store.blockManagerId, 1)
+    val peers = master.getPeers(store.blockManagerId)
     assert(peers.size === 1, "master did not return the other manager as a peer")
     assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager")
 
@@ -448,7 +446,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     val list2DiskGet = store.get("list2disk")
     assert(list2DiskGet.isDefined, "list2memory expected to be in store")
     assert(list2DiskGet.get.data.size === 3)
-    System.out.println(list2DiskGet)
     // We don't know the exact size of the data on disk, but it should certainly be > 0.
     assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
     assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)