Skip to content
Snippets Groups Projects
Commit ed2de029 authored by Josh Rosen's avatar Josh Rosen Committed by Tathagata Das
Browse files

[SPARK-14719] WriteAheadLogBasedBlockHandler should ignore BlockManager put errors

WriteAheadLogBasedBlockHandler will currently throw exceptions if its BlockManager `put()` calls fail, even though those calls are only performed as a performance optimization. Instead, it should log and ignore exceptions during that `put()`.

This is a longstanding issue that was masked by an incorrect test case. I think that we haven't noticed this in production because

1. most people probably use a `MEMORY_AND_DISK` storage level, and
2. typically, individual blocks may be small enough relative to the total storage memory such that they're able to evict blocks from previous batches, so `put()` failures here may be rare in practice.

This patch fixes the faulty test and fixes the bug.

/cc tdas

Author: Josh Rosen <joshrosen@databricks.com>

Closes #12484 from JoshRosen/received-block-hadndler-fix.
parent 5e92583d
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,7 @@ package org.apache.spark.streaming.receiver ...@@ -20,6 +20,7 @@ package org.apache.spark.streaming.receiver
import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.language.{existentials, postfixOps} import scala.language.{existentials, postfixOps}
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
...@@ -189,14 +190,19 @@ private[streaming] class WriteAheadLogBasedBlockHandler( ...@@ -189,14 +190,19 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// Store the block in block manager // Store the block in block manager
val storeInBlockManagerFuture = Future { val storeInBlockManagerFuture = Future {
val putSucceeded = blockManager.putBytes( try {
blockId, val putSucceeded = blockManager.putBytes(
serializedBlock, blockId,
effectiveStorageLevel, serializedBlock,
tellMaster = true) effectiveStorageLevel,
if (!putSucceeded) { tellMaster = true)
throw new SparkException( if (!putSucceeded) {
s"Could not store $blockId to block manager with storage level $storageLevel") logWarning(
s"Could not store $blockId to block manager with storage level $storageLevel")
}
} catch {
case NonFatal(t) =>
logError(s"Could not store $blockId to block manager with storage level $storageLevel", t)
} }
} }
......
...@@ -127,7 +127,17 @@ class ReceivedBlockHandlerSuite ...@@ -127,7 +127,17 @@ class ReceivedBlockHandlerSuite
test("BlockManagerBasedBlockHandler - handle errors in storing block") { test("BlockManagerBasedBlockHandler - handle errors in storing block") {
withBlockManagerBasedBlockHandler { handler => withBlockManagerBasedBlockHandler { handler =>
testErrorHandling(handler) // Handle error in iterator (e.g. divide-by-zero error)
intercept[Exception] {
val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 }
handler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator))
}
// Handler error in block manager storing (e.g. too big block)
intercept[SparkException] {
val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1))
handler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer))
}
} }
} }
...@@ -167,7 +177,15 @@ class ReceivedBlockHandlerSuite ...@@ -167,7 +177,15 @@ class ReceivedBlockHandlerSuite
test("WriteAheadLogBasedBlockHandler - handle errors in storing block") { test("WriteAheadLogBasedBlockHandler - handle errors in storing block") {
withWriteAheadLogBasedBlockHandler { handler => withWriteAheadLogBasedBlockHandler { handler =>
testErrorHandling(handler) // Handle error in iterator (e.g. divide-by-zero error)
intercept[Exception] {
val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 }
handler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator))
}
// Throws no errors when storing blocks that are too large to be cached
val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1))
handler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer))
} }
} }
...@@ -204,26 +222,26 @@ class ReceivedBlockHandlerSuite ...@@ -204,26 +222,26 @@ class ReceivedBlockHandlerSuite
sparkConf.set("spark.storage.unrollFraction", "0.4") sparkConf.set("spark.storage.unrollFraction", "0.4")
// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
blockManager = createBlockManager(12000, sparkConf) blockManager = createBlockManager(12000, sparkConf)
// This block is way too large to possibly be cached in memory:
def hugeBlock: IteratorBlock = IteratorBlock(List.fill(100)(new Array[Byte](1000)).iterator)
// there is not enough space to store this block in MEMORY, // there is not enough space to store this block in MEMORY,
// But BlockManager will be able to serialize this block to WAL // But BlockManager will be able to serialize this block to WAL
// and hence count returns correct value. // and hence count returns correct value.
testRecordcount(false, StorageLevel.MEMORY_ONLY, testRecordcount(false, StorageLevel.MEMORY_ONLY, hugeBlock, blockManager, Some(100))
IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
// there is not enough space to store this block in MEMORY, // there is not enough space to store this block in MEMORY,
// But BlockManager will be able to serialize this block to DISK // But BlockManager will be able to serialize this block to DISK
// and hence count returns correct value. // and hence count returns correct value.
testRecordcount(true, StorageLevel.MEMORY_AND_DISK, testRecordcount(true, StorageLevel.MEMORY_AND_DISK, hugeBlock, blockManager, Some(100))
IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
// there is not enough space to store this block With MEMORY_ONLY StorageLevel. // there is not enough space to store this block With MEMORY_ONLY StorageLevel.
// BlockManager will not be able to unroll this block // BlockManager will not be able to unroll this block
// and hence it will not tryToPut this block, resulting the SparkException // and hence it will not tryToPut this block, resulting the SparkException
storageLevel = StorageLevel.MEMORY_ONLY storageLevel = StorageLevel.MEMORY_ONLY
withBlockManagerBasedBlockHandler { handler => withBlockManagerBasedBlockHandler { handler =>
val thrown = intercept[SparkException] { intercept[SparkException] {
storeSingleBlock(handler, IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator)) storeSingleBlock(handler, hugeBlock)
} }
} }
} }
...@@ -346,21 +364,6 @@ class ReceivedBlockHandlerSuite ...@@ -346,21 +364,6 @@ class ReceivedBlockHandlerSuite
storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b).toByteBuffer) }) storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b).toByteBuffer) })
} }
/** Test error handling when blocks that cannot be stored */
private def testErrorHandling(receivedBlockHandler: ReceivedBlockHandler) {
// Handle error in iterator (e.g. divide-by-zero error)
intercept[Exception] {
val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 }
receivedBlockHandler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator))
}
// Handler error in block manager storing (e.g. too big block)
intercept[SparkException] {
val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1))
receivedBlockHandler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer))
}
}
/** Instantiate a BlockManagerBasedBlockHandler and run a code with it */ /** Instantiate a BlockManagerBasedBlockHandler and run a code with it */
private def withBlockManagerBasedBlockHandler(body: BlockManagerBasedBlockHandler => Unit) { private def withBlockManagerBasedBlockHandler(body: BlockManagerBasedBlockHandler => Unit) {
body(new BlockManagerBasedBlockHandler(blockManager, storageLevel)) body(new BlockManagerBasedBlockHandler(blockManager, storageLevel))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment