Skip to content
Snippets Groups Projects
Commit a250933c authored by Shubham Chopra's avatar Shubham Chopra Committed by Wenchen Fan
Browse files

[SPARK-19803][CORE][TEST] Proactive replication test failures

## What changes were proposed in this pull request?
Executors cache a list of their peers that is refreshed by default every minute. The cached stale references were randomly being used for replication. Since those executors were removed from the master, they did not occur in the block locations as reported by the master. This was fixed by
1. Refreshing peer cache in the block manager before trying to pro-actively replicate. This way the probability of replicating to a failed executor is eliminated.
2. Explicitly stopping the block manager in the tests. This shuts down the RPC endpoint use by the block manager. This way, even if a block manager tries to replicate using a stale reference, the replication logic should take care of refreshing the list of peers after failure.

## How was this patch tested?
Tested manually

Author: Shubham Chopra <schopra31@bloomberg.net>
Author: Kay Ousterhout <kayousterhout@gmail.com>
Author: Shubham Chopra <shubhamchopra@users.noreply.github.com>

Closes #17325 from shubhamchopra/SPARK-19803.
parent 1d00761b
No related branches found
No related tags found
No related merge requests found
......@@ -371,6 +371,12 @@ private[storage] class BlockInfoManager extends Logging {
blocksWithReleasedLocks
}
/** Returns the number of locks held by the given task. Used only for testing. */
private[storage] def getTaskLockCount(taskAttemptId: TaskAttemptId): Int = {
readLocksByTask.get(taskAttemptId).map(_.size()).getOrElse(0) +
writeLocksByTask.get(taskAttemptId).map(_.size).getOrElse(0)
}
/**
* Returns the number of blocks tracked.
*/
......
......@@ -1187,7 +1187,7 @@ private[spark] class BlockManager(
blockId: BlockId,
existingReplicas: Set[BlockManagerId],
maxReplicas: Int): Unit = {
logInfo(s"Pro-actively replicating $blockId")
logInfo(s"Using $blockManagerId to pro-actively replicate $blockId")
blockInfoManager.lockForReading(blockId).foreach { info =>
val data = doGetLocalBytes(blockId, info)
val storageLevel = StorageLevel(
......@@ -1196,9 +1196,13 @@ private[spark] class BlockManager(
useOffHeap = info.level.useOffHeap,
deserialized = info.level.deserialized,
replication = maxReplicas)
// we know we are called as a result of an executor removal, so we refresh peer cache
// this way, we won't try to replicate to a missing executor with a stale reference
getPeers(forceFetch = true)
try {
replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
} finally {
logDebug(s"Releasing lock for $blockId")
releaseLock(blockId)
}
}
......
......@@ -493,27 +493,34 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav
assert(blockLocations.size === replicationFactor)
// remove a random blockManager
val executorsToRemove = blockLocations.take(replicationFactor - 1)
val executorsToRemove = blockLocations.take(replicationFactor - 1).toSet
logInfo(s"Removing $executorsToRemove")
executorsToRemove.foreach{exec =>
master.removeExecutor(exec.executorId)
initialStores.filter(bm => executorsToRemove.contains(bm.blockManagerId)).foreach { bm =>
master.removeExecutor(bm.blockManagerId.executorId)
bm.stop()
// giving enough time for replication to happen and new block be reported to master
Thread.sleep(200)
eventually(timeout(5 seconds), interval(100 millis)) {
val newLocations = master.getLocations(blockId).toSet
assert(newLocations.size === replicationFactor)
}
}
val newLocations = eventually(timeout(5 seconds), interval(10 millis)) {
val newLocations = eventually(timeout(5 seconds), interval(100 millis)) {
val _newLocations = master.getLocations(blockId).toSet
assert(_newLocations.size === replicationFactor)
_newLocations
}
logInfo(s"New locations : $newLocations")
// there should only be one common block manager between initial and new locations
assert(newLocations.intersect(blockLocations.toSet).size === 1)
// check if all the read locks have been released
initialStores.filter(bm => newLocations.contains(bm.blockManagerId)).foreach { bm =>
val locks = bm.releaseAllLocksForTask(BlockInfo.NON_TASK_WRITER)
assert(locks.size === 0, "Read locks unreleased!")
// new locations should not contain stopped block managers
assert(newLocations.forall(bmId => !executorsToRemove.contains(bmId)),
"New locations contain stopped block managers.")
// Make sure all locks have been released.
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
initialStores.filter(bm => newLocations.contains(bm.blockManagerId)).foreach { bm =>
assert(bm.blockInfoManager.getTaskLockCount(BlockInfo.NON_TASK_WRITER) === 0)
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment