Skip to content
Snippets Groups Projects
Commit f9151beb authored by Feng Liu's avatar Feng Liu Committed by Shixiong Zhu
Browse files

[SPARK-21188][CORE] releaseAllLocksForTask should synchronize the whole method

## What changes were proposed in this pull request?

Since the objects `readLocksByTask`, `writeLocksByTask` and `info`s are coupled and supposed to be modified by other threads concurrently, all the read and writes of them in the method `releaseAllLocksForTask` should be protected by a single synchronized block like other similar methods.

## How was this patch tested?

existing tests

Author: Feng Liu <fengliu@databricks.com>

Closes #18400 from liufengdb/synchronize.
parent 18066f2e
No related branches found
No related tags found
No related merge requests found
...@@ -341,15 +341,11 @@ private[storage] class BlockInfoManager extends Logging { ...@@ -341,15 +341,11 @@ private[storage] class BlockInfoManager extends Logging {
* *
* @return the ids of blocks whose pins were released * @return the ids of blocks whose pins were released
*/ */
def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = { def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = synchronized {
val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]() val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]()
val readLocks = synchronized { val readLocks = readLocksByTask.remove(taskAttemptId).getOrElse(ImmutableMultiset.of[BlockId]())
readLocksByTask.remove(taskAttemptId).getOrElse(ImmutableMultiset.of[BlockId]()) val writeLocks = writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty)
}
val writeLocks = synchronized {
writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty)
}
for (blockId <- writeLocks) { for (blockId <- writeLocks) {
infos.get(blockId).foreach { info => infos.get(blockId).foreach { info =>
...@@ -358,21 +354,19 @@ private[storage] class BlockInfoManager extends Logging { ...@@ -358,21 +354,19 @@ private[storage] class BlockInfoManager extends Logging {
} }
blocksWithReleasedLocks += blockId blocksWithReleasedLocks += blockId
} }
readLocks.entrySet().iterator().asScala.foreach { entry => readLocks.entrySet().iterator().asScala.foreach { entry =>
val blockId = entry.getElement val blockId = entry.getElement
val lockCount = entry.getCount val lockCount = entry.getCount
blocksWithReleasedLocks += blockId blocksWithReleasedLocks += blockId
synchronized { get(blockId).foreach { info =>
get(blockId).foreach { info => info.readerCount -= lockCount
info.readerCount -= lockCount assert(info.readerCount >= 0)
assert(info.readerCount >= 0)
}
} }
} }
synchronized { notifyAll()
notifyAll()
}
blocksWithReleasedLocks blocksWithReleasedLocks
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment