diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index dcbbc1853186b0a718b3a5ce2f121e7fcd271ce2..5dd5fd0047c0d8707357bc83fbf00626402d5b77 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -93,7 +93,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, implicit val futureExecContext = ExecutionContext.fromExecutor( Utils.newDaemonCachedThreadPool("Connection manager future execution context")) - private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null + @volatile + private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null private val authEnabled = securityManager.isAuthenticationEnabled() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6534095811907c0e052f9a3f1e789d852f303dcd..6e450081dcb11caf0b25bcabe7b038ae1c47bf3b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -772,7 +772,7 @@ private[spark] class BlockManager( /** * Replicate block to another node. */ - var cachedPeers: Seq[BlockManagerId] = null + @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) { val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)