Skip to content
Snippets Groups Projects
Commit 214f90ee authored by zsxwing's avatar zsxwing Committed by Aaron Davidson
Browse files

SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers

`var cachedPeers: Seq[BlockManagerId] = null` is used in `def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel)` without proper protection.

There are two place will call `replicate(blockId, bytesAfterPut, level)`
* https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L644 runs in `connectionManager.futureExecContext`
* https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L752

 `doPut` runs in `connectionManager.handleMessageExecutor`. `org.apache.spark.storage.BlockManagerWorker` calls `blockManager.putBytes` in `connectionManager.handleMessageExecutor`.

As they run in different `Executor`s, this is a race condition which may cause the memory pointed by `cachedPeers` is not correct even if `cachedPeers != null`.

The race condition of `onReceiveCallback` is that it's set in `BlockManagerWorker` but read in a different thread in `ConnectionManager.handleMessageExecutor`.

Author: zsxwing <zsxwing@gmail.com>

Closes #887 from zsxwing/SPARK-1932 and squashes the following commits:

524f69c [zsxwing] SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers

(cherry picked from commit 549830b0)
Signed-off-by: default avatarAaron Davidson <aaron@databricks.com>
parent fcb37502
No related branches found
No related tags found
No related merge requests found
......@@ -93,7 +93,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
implicit val futureExecContext = ExecutionContext.fromExecutor(
Utils.newDaemonCachedThreadPool("Connection manager future execution context"))
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
@volatile
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null
private val authEnabled = securityManager.isAuthenticationEnabled()
......
......@@ -772,7 +772,7 @@ private[spark] class BlockManager(
/**
* Replicate block to another node.
*/
var cachedPeers: Seq[BlockManagerId] = null
@volatile var cachedPeers: Seq[BlockManagerId] = null
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) {
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment