-
- Downloads
[SPARK-3495] Block replication fails continuously when the replication target...
[SPARK-3495] Block replication fails continuously when the replication target node is dead AND [SPARK-3496] Block replication by mistake chooses driver as target If a block manager (say, A) wants to replicate a block and the node chosen for replication (say, B) is dead, then the attempt to send the block to B fails. However, this continues to fail indefinitely. Even if the driver learns about the demise of the B, A continues to try replicating to B and failing miserably. The reason behind this bug is that A initially fetches a list of peers from the driver (when B was active), but never updates it after B is dead. This affects Spark Streaming as its receiver uses block replication. The solution in this patch adds the following. - Changed BlockManagerMaster to return all the peers of a block manager, rather than the requested number. It also filters out driver BlockManager. - Refactored BlockManager's replication code to handle peer caching correctly. + The peer for replication is randomly selected. This is different from past behavior where for a node A, a node B was deterministically chosen for the lifetime of the application. + If replication fails to one node, the peers are refetched. + The peer cached has a TTL of 1 second to enable discovery of new peers and using them for replication. - Refactored use of \<driver\> in BlockManager into a new method `BlockManagerId.isDriver` - Added replication unit tests (replication was not tested till now, duh!) This should not make a difference in performance of Spark workloads where replication is not used. @andrewor14 @JoshRosen Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #2366 from tdas/replication-fix and squashes the following commits: 9690f57 [Tathagata Das] Moved replication tests to a new BlockManagerReplicationSuite. 0661773 [Tathagata Das] Minor changes based on PR comments. a55a65c [Tathagata Das] Added a unit test to test replication behavior. 012afa3 [Tathagata Das] Bug fix 89f91a0 [Tathagata Das] Minor change. 68e2c72 [Tathagata Das] Made replication peer selection logic more efficient. 08afaa9 [Tathagata Das] Made peer selection for replication deterministic to block id 3821ab9 [Tathagata Das] Fixes based on PR comments. 08e5646 [Tathagata Das] More minor changes. d402506 [Tathagata Das] Fixed imports. 4a20531 [Tathagata Das] Filtered driver block manager from peer list, and also consolidated the use of <driver> in BlockManager. 7598f91 [Tathagata Das] Minor changes. 03de02d [Tathagata Das] Change replication logic to correctly refetch peers from master on failure and on new worker addition. d081bf6 [Tathagata Das] Fixed bug in get peers and unit tests to test get-peers and replication under executor churn. 9f0ac9f [Tathagata Das] Modified replication tests to fail on replication bug. af0c1da [Tathagata Das] Added replication unit tests to BlockManagerSuite
Showing
- core/src/main/scala/org/apache/spark/storage/BlockManager.scala 104 additions, 18 deletions...rc/main/scala/org/apache/spark/storage/BlockManager.scala
- core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala 2 additions, 0 deletions.../main/scala/org/apache/spark/storage/BlockManagerId.scala
- core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala 2 additions, 7 deletions...n/scala/org/apache/spark/storage/BlockManagerMaster.scala
- core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala 13 additions, 16 deletions...la/org/apache/spark/storage/BlockManagerMasterActor.scala
- core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala 1 addition, 1 deletion...scala/org/apache/spark/storage/BlockManagerMessages.scala
- core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala 1 addition, 1 deletion...est/scala/org/apache/spark/broadcast/BroadcastSuite.scala
- core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala 418 additions, 0 deletions...g/apache/spark/storage/BlockManagerReplicationSuite.scala
- core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 3 additions, 6 deletions...st/scala/org/apache/spark/storage/BlockManagerSuite.scala
Loading
Please register or sign in to comment