Skip to content
Snippets Groups Projects
Commit ad182086 authored by Davies Liu's avatar Davies Liu Committed by Andrew Or
Browse files

[SPARK-15300] Fix writer lock conflict when remove a block

## What changes were proposed in this pull request?

A writer lock could be acquired when 1) create a new block 2) remove a block 3) evict a block to disk. 1) and 3) could happen in the same time within the same task, all of them could happen in the same time outside a task. It's OK that when someone try to grab the write block for a block, but the block is acquired by another one that has the same task attempt id.

This PR remove the check.

## How was this patch tested?

Updated existing tests.

Author: Davies Liu <davies@databricks.com>

Closes #13082 from davies/write_lock_conflict.
parent ef7a5e0b
No related branches found
No related tags found
No related merge requests found
...@@ -228,10 +228,7 @@ private[storage] class BlockInfoManager extends Logging { ...@@ -228,10 +228,7 @@ private[storage] class BlockInfoManager extends Logging {
infos.get(blockId) match { infos.get(blockId) match {
case None => return None case None => return None
case Some(info) => case Some(info) =>
if (info.writerTask == currentTaskAttemptId) { if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) {
throw new IllegalStateException(
s"Task $currentTaskAttemptId has already locked $blockId for writing")
} else if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) {
info.writerTask = currentTaskAttemptId info.writerTask = currentTaskAttemptId
writeLocksByTask.addBinding(currentTaskAttemptId, blockId) writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId") logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
......
...@@ -208,16 +208,14 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach { ...@@ -208,16 +208,14 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
} }
} }
test("cannot call lockForWriting while already holding a write lock") { test("cannot grab a writer lock while already holding a write lock") {
withTaskId(0) { withTaskId(0) {
assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())) assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
blockInfoManager.unlock("block") blockInfoManager.unlock("block")
} }
withTaskId(1) { withTaskId(1) {
assert(blockInfoManager.lockForWriting("block").isDefined) assert(blockInfoManager.lockForWriting("block").isDefined)
intercept[IllegalStateException] { assert(blockInfoManager.lockForWriting("block", false).isEmpty)
blockInfoManager.lockForWriting("block")
}
blockInfoManager.assertBlockIsLockedForWriting("block") blockInfoManager.assertBlockIsLockedForWriting("block")
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment