From f9151bebca986d44cdab7699959fec2bc050773a Mon Sep 17 00:00:00 2001
From: Feng Liu <fengliu@databricks.com>
Date: Thu, 29 Jun 2017 16:03:15 -0700
Subject: [PATCH] [SPARK-21188][CORE] releaseAllLocksForTask should synchronize
 the whole method

## What changes were proposed in this pull request?

Since the objects `readLocksByTask`, `writeLocksByTask` and `info`s are coupled and supposed to be modified by other threads concurrently, all the read and writes of them in the method `releaseAllLocksForTask` should be protected by a single synchronized block like other similar methods.

## How was this patch tested?

existing tests

Author: Feng Liu <fengliu@databricks.com>

Closes #18400 from liufengdb/synchronize.
---
 .../spark/storage/BlockInfoManager.scala      | 24 +++++++------------
 1 file changed, 9 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 7064872ec1..219a0e799c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -341,15 +341,11 @@ private[storage] class BlockInfoManager extends Logging {
    *
    * @return the ids of blocks whose pins were released
    */
-  def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = {
+  def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = synchronized {
     val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]()
 
-    val readLocks = synchronized {
-      readLocksByTask.remove(taskAttemptId).getOrElse(ImmutableMultiset.of[BlockId]())
-    }
-    val writeLocks = synchronized {
-      writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty)
-    }
+    val readLocks = readLocksByTask.remove(taskAttemptId).getOrElse(ImmutableMultiset.of[BlockId]())
+    val writeLocks = writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty)
 
     for (blockId <- writeLocks) {
       infos.get(blockId).foreach { info =>
@@ -358,21 +354,19 @@ private[storage] class BlockInfoManager extends Logging {
       }
       blocksWithReleasedLocks += blockId
     }
+
     readLocks.entrySet().iterator().asScala.foreach { entry =>
       val blockId = entry.getElement
       val lockCount = entry.getCount
       blocksWithReleasedLocks += blockId
-      synchronized {
-        get(blockId).foreach { info =>
-          info.readerCount -= lockCount
-          assert(info.readerCount >= 0)
-        }
+      get(blockId).foreach { info =>
+        info.readerCount -= lockCount
+        assert(info.readerCount >= 0)
       }
     }
 
-    synchronized {
-      notifyAll()
-    }
+    notifyAll()
+
     blocksWithReleasedLocks
   }
 
-- 
GitLab