diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e4170a55b79818eacce968af37d9ee77f3d13263..1021172e6afb448545f089d95c4e116f5be8faa2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -104,7 +104,7 @@ class DAGScheduler( * * All accesses to this map should be guarded by synchronizing on it (see SPARK-4454). */ - private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] + private val cacheLocs = new HashMap[Int, Seq[Seq[TaskLocation]]] // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with // every task. When we detect a node failing, we note the current epoch number and failed @@ -188,14 +188,15 @@ class DAGScheduler( eventProcessLoop.post(TaskSetFailed(taskSet, reason)) } - private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = cacheLocs.synchronized { + private[scheduler] + def getCacheLocs(rdd: RDD[_]): Seq[Seq[TaskLocation]] = cacheLocs.synchronized { // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times if (!cacheLocs.contains(rdd.id)) { val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] - val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster) - cacheLocs(rdd.id) = blockIds.map { id => - locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId)) + val locs: Seq[Seq[TaskLocation]] = blockManagerMaster.getLocations(blockIds).map { bms => + bms.map(bm => TaskLocation(bm.host, bm.executorId)) } + cacheLocs(rdd.id) = locs } cacheLocs(rdd.id) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c8b7763f03fb7d7539e9db5d590eb8c2317edada..80d66e59132dacb633bbd3e43fdeaf79a1b07782 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1245,10 +1245,10 @@ private[spark] object BlockManager extends Logging { } } - def blockIdsToBlockManagers( + def blockIdsToHosts( blockIds: Array[BlockId], env: SparkEnv, - blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[BlockManagerId]] = { + blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]] = { // blockManagerMaster != null is used in tests assert(env != null || blockManagerMaster != null) @@ -1258,24 +1258,10 @@ private[spark] object BlockManager extends Logging { blockManagerMaster.getLocations(blockIds) } - val blockManagers = new HashMap[BlockId, Seq[BlockManagerId]] + val blockManagers = new HashMap[BlockId, Seq[String]] for (i <- 0 until blockIds.length) { - blockManagers(blockIds(i)) = blockLocations(i) + blockManagers(blockIds(i)) = blockLocations(i).map(_.host) } blockManagers.toMap } - - def blockIdsToExecutorIds( - blockIds: Array[BlockId], - env: SparkEnv, - blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]] = { - blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.executorId)) - } - - def blockIdsToHosts( - blockIds: Array[BlockId], - env: SparkEnv, - blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]] = { - blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.host)) - } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 30119ce5d4eecd82b00721a35f33e5ec25ffb82a..63360a0f189a3e4db284504044a103e971aea852 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -322,6 +322,18 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar assertDataStructuresEmpty } + test("regression test for getCacheLocs") { + val rdd = new MyRDD(sc, 3, Nil) + cacheLocations(rdd.id -> 0) = + Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) + cacheLocations(rdd.id -> 1) = + Seq(makeBlockManagerId("hostB"), makeBlockManagerId("hostC")) + cacheLocations(rdd.id -> 2) = + Seq(makeBlockManagerId("hostC"), makeBlockManagerId("hostD")) + val locs = scheduler.getCacheLocs(rdd).map(_.map(_.host)) + assert(locs === Seq(Seq("hostA", "hostB"), Seq("hostB", "hostC"), Seq("hostC", "hostD"))) + } + test("avoid exponential blowup when getting preferred locs list") { // Build up a complex dependency graph with repeated zip operations, without preferred locations. var rdd: RDD[_] = new MyRDD(sc, 1, Nil)