From 15526653a93a32cde3c9ea0c0e68e35622b0a590 Mon Sep 17 00:00:00 2001
From: Xianyang Liu <xianyang.liu@intel.com>
Date: Mon, 8 May 2017 17:33:47 +0800
Subject: [PATCH] [SPARK-19956][CORE] Optimize a location order of blocks with
 topology information

## What changes were proposed in this pull request?

When call the method getLocations of BlockManager, we only compare the data block host. Random selection for non-local data blocks, this may cause the selected data block to be in a different rack. So in this patch to increase the sort of the rack.

## How was this patch tested?

New test case.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Xianyang Liu <xianyang.liu@intel.com>

Closes #17300 from ConeyLiu/blockmanager.
---
 .../apache/spark/storage/BlockManager.scala   | 11 +++++--
 .../spark/storage/BlockManagerSuite.scala     | 31 +++++++++++++++++--
 2 files changed, 37 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 3219969bcd..33ce30c58e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -612,12 +612,19 @@ private[spark] class BlockManager(
 
   /**
    * Return a list of locations for the given block, prioritizing the local machine since
-   * multiple block managers can share the same host.
+   * multiple block managers can share the same host, followed by hosts on the same rack.
    */
   private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
     val locs = Random.shuffle(master.getLocations(blockId))
     val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host }
-    preferredLocs ++ otherLocs
+    blockManagerId.topologyInfo match {
+      case None => preferredLocs ++ otherLocs
+      case Some(_) =>
+        val (sameRackLocs, differentRackLocs) = otherLocs.partition {
+          loc => blockManagerId.topologyInfo == loc.topologyInfo
+        }
+        preferredLocs ++ sameRackLocs ++ differentRackLocs
+    }
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index a8b9604899..1e7bcdb674 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -496,8 +496,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
   }
 
-  test("optimize a location order of blocks") {
-    val localHost = Utils.localHostName()
+  test("optimize a location order of blocks without topology information") {
+    val localHost = "localhost"
     val otherHost = "otherHost"
     val bmMaster = mock(classOf[BlockManagerMaster])
     val bmId1 = BlockManagerId("id1", localHost, 1)
@@ -508,7 +508,32 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val blockManager = makeBlockManager(128, "exec", bmMaster)
     val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
     val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
-    assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost))
+    assert(locations.map(_.host) === Seq(localHost, localHost, otherHost))
+  }
+
+  test("optimize a location order of blocks with topology information") {
+    val localHost = "localhost"
+    val otherHost = "otherHost"
+    val localRack = "localRack"
+    val otherRack = "otherRack"
+
+    val bmMaster = mock(classOf[BlockManagerMaster])
+    val bmId1 = BlockManagerId("id1", localHost, 1, Some(localRack))
+    val bmId2 = BlockManagerId("id2", localHost, 2, Some(localRack))
+    val bmId3 = BlockManagerId("id3", otherHost, 3, Some(otherRack))
+    val bmId4 = BlockManagerId("id4", otherHost, 4, Some(otherRack))
+    val bmId5 = BlockManagerId("id5", otherHost, 5, Some(localRack))
+    when(bmMaster.getLocations(mc.any[BlockId]))
+      .thenReturn(Seq(bmId1, bmId2, bmId5, bmId3, bmId4))
+
+    val blockManager = makeBlockManager(128, "exec", bmMaster)
+    blockManager.blockManagerId =
+      BlockManagerId(SparkContext.DRIVER_IDENTIFIER, localHost, 1, Some(localRack))
+    val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
+    val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
+    assert(locations.map(_.host) === Seq(localHost, localHost, otherHost, otherHost, otherHost))
+    assert(locations.flatMap(_.topologyInfo)
+      === Seq(localRack, localRack, localRack, otherRack, otherRack))
   }
 
   test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
-- 
GitLab