diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 3219969bcd06fa1756a97ef0f8970725badcf766..33ce30c58e1adb205e2108467b888a657af7645b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -612,12 +612,19 @@ private[spark] class BlockManager(
 
   /**
    * Return a list of locations for the given block, prioritizing the local machine since
-   * multiple block managers can share the same host.
+   * multiple block managers can share the same host, followed by hosts on the same rack.
    */
   private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
     val locs = Random.shuffle(master.getLocations(blockId))
     val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host }
-    preferredLocs ++ otherLocs
+    blockManagerId.topologyInfo match {
+      case None => preferredLocs ++ otherLocs
+      case Some(_) =>
+        val (sameRackLocs, differentRackLocs) = otherLocs.partition {
+          loc => blockManagerId.topologyInfo == loc.topologyInfo
+        }
+        preferredLocs ++ sameRackLocs ++ differentRackLocs
+    }
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index a8b960489983823e05315df866a37517caedab58..1e7bcdb6740f6487dfee6d2443475e52cc46249b 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -496,8 +496,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
   }
 
-  test("optimize a location order of blocks") {
-    val localHost = Utils.localHostName()
+  test("optimize a location order of blocks without topology information") {
+    val localHost = "localhost"
     val otherHost = "otherHost"
     val bmMaster = mock(classOf[BlockManagerMaster])
     val bmId1 = BlockManagerId("id1", localHost, 1)
@@ -508,7 +508,32 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val blockManager = makeBlockManager(128, "exec", bmMaster)
     val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
     val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
-    assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost))
+    assert(locations.map(_.host) === Seq(localHost, localHost, otherHost))
+  }
+
+  test("optimize a location order of blocks with topology information") {
+    val localHost = "localhost"
+    val otherHost = "otherHost"
+    val localRack = "localRack"
+    val otherRack = "otherRack"
+
+    val bmMaster = mock(classOf[BlockManagerMaster])
+    val bmId1 = BlockManagerId("id1", localHost, 1, Some(localRack))
+    val bmId2 = BlockManagerId("id2", localHost, 2, Some(localRack))
+    val bmId3 = BlockManagerId("id3", otherHost, 3, Some(otherRack))
+    val bmId4 = BlockManagerId("id4", otherHost, 4, Some(otherRack))
+    val bmId5 = BlockManagerId("id5", otherHost, 5, Some(localRack))
+    when(bmMaster.getLocations(mc.any[BlockId]))
+      .thenReturn(Seq(bmId1, bmId2, bmId5, bmId3, bmId4))
+
+    val blockManager = makeBlockManager(128, "exec", bmMaster)
+    blockManager.blockManagerId =
+      BlockManagerId(SparkContext.DRIVER_IDENTIFIER, localHost, 1, Some(localRack))
+    val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
+    val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
+    assert(locations.map(_.host) === Seq(localHost, localHost, otherHost, otherHost, otherHost))
+    assert(locations.flatMap(_.topologyInfo)
+      === Seq(localRack, localRack, localRack, otherRack, otherRack))
   }
 
   test("SPARK-9591: getRemoteBytes from another location when Exception throw") {