diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c172ac2cdc0e30a4118ff61c03f686c65304f182..aa29acfd704613d0e7eade13ea588c2d83b39853 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -283,7 +283,12 @@ private[spark] class BlockManager( } else { getLocalBytes(blockId) match { case Some(buffer) => new BlockManagerManagedBuffer(blockInfoManager, blockId, buffer) - case None => throw new BlockNotFoundException(blockId.toString) + case None => + // If this block manager receives a request for a block that it doesn't have then it's + // likely that the master has outdated block statuses for this block. Therefore, we send + // an RPC so that this block is marked as being unavailable from this block manager. + reportBlockStatus(blockId, BlockStatus.empty) + throw new BlockNotFoundException(blockId.toString) } } } @@ -859,22 +864,38 @@ private[spark] class BlockManager( } val startTimeMs = System.currentTimeMillis - var blockWasSuccessfullyStored: Boolean = false + var exceptionWasThrown: Boolean = true val result: Option[T] = try { val res = putBody(putBlockInfo) - blockWasSuccessfullyStored = res.isEmpty - res - } finally { - if (blockWasSuccessfullyStored) { + exceptionWasThrown = false + if (res.isEmpty) { + // the block was successfully stored if (keepReadLock) { blockInfoManager.downgradeLock(blockId) } else { blockInfoManager.unlock(blockId) } } else { - blockInfoManager.removeBlock(blockId) + removeBlockInternal(blockId, tellMaster = false) logWarning(s"Putting block $blockId failed") } + res + } finally { + // This cleanup is performed in a finally block rather than a `catch` to avoid having to + // catch and properly re-throw InterruptedException. + if (exceptionWasThrown) { + logWarning(s"Putting block $blockId failed due to an exception") + // If an exception was thrown then it's possible that the code in `putBody` has already + // notified the master about the availability of this block, so we need to send an update + // to remove this block location. + removeBlockInternal(blockId, tellMaster = tellMaster) + // The `putBody` code may have also added a new block status to TaskMetrics, so we need + // to cancel that out by overwriting it with an empty block status. We only do this if + // the finally block was entered via an exception because doing this unconditionally would + // cause us to send empty block statuses for every block that failed to be cached due to + // a memory shortage (which is an expected failure, unlike an uncaught exception). + addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty) + } } if (level.replication > 1) { logDebug("Putting block %s with replication took %s" @@ -1173,7 +1194,7 @@ private[spark] class BlockManager( done = true // specified number of peers have been replicated to } } catch { - case e: Exception => + case NonFatal(e) => logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e) failures += 1 replicationFailed = true diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index fdf28b7dcbcf4cf11d1d9c1d2b39ac1abb427027..6d53d2e5f0ca6ecc6aff4826f5b5a199519db9f4 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -861,6 +861,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) memoryManager.setMemoryStore(store.memoryStore) + store.initialize("app-id") // The put should fail since a1 is not serializable. class UnserializableClass @@ -1206,6 +1207,39 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE verify(mockBlockManagerMaster, times(2)).getLocations("item") } + test("SPARK-17484: block status is properly updated following an exception in put()") { + val mockBlockTransferService = new MockBlockTransferService(maxFailures = 10) { + override def uploadBlock( + hostname: String, + port: Int, execId: String, + blockId: BlockId, + blockData: ManagedBuffer, + level: StorageLevel, + classTag: ClassTag[_]): Future[Unit] = { + throw new InterruptedException("Intentional interrupt") + } + } + store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService)) + store2 = makeBlockManager(8000, "executor2", transferService = Option(mockBlockTransferService)) + intercept[InterruptedException] { + store.putSingle("item", "value", StorageLevel.MEMORY_ONLY_2, tellMaster = true) + } + assert(store.getLocalBytes("item").isEmpty) + assert(master.getLocations("item").isEmpty) + assert(store2.getRemoteBytes("item").isEmpty) + } + + test("SPARK-17484: master block locations are updated following an invalid remote block fetch") { + store = makeBlockManager(8000, "executor1") + store2 = makeBlockManager(8000, "executor2") + store.putSingle("item", "value", StorageLevel.MEMORY_ONLY, tellMaster = true) + assert(master.getLocations("item").nonEmpty) + store.removeBlock("item", tellMaster = false) + assert(master.getLocations("item").nonEmpty) + assert(store2.getRemoteBytes("item").isEmpty) + assert(master.getLocations("item").isEmpty) + } + class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService { var numCalls = 0