Skip to content
Snippets Groups Projects
Commit b308182f authored by Gabor Somogyi's avatar Gabor Somogyi Committed by Marcelo Vanzin
Browse files

[SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes

## What changes were proposed in this pull request?

There is a race condition introduced in SPARK-11141 which could cause data loss.
The problem is that ReceivedBlockTracker.insertAllocatedBatch function assumes that all blocks from streamIdToUnallocatedBlockQueues allocated to the batch and clears the queue.

In this PR only the allocated blocks will be removed from the queue which will prevent data loss.

## How was this patch tested?

Additional unit test + manually.

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #20620 from gaborgsomogyi/SPARK-23438.
parent 3ca9a2c5
No related branches found
No related tags found
No related merge requests found
......@@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker(
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
// Insert the recovered block-to-batch allocations and clear the queue of received blocks
// (when the blocks were originally allocated to the batch, the queue must have been cleared).
// Insert the recovered block-to-batch allocations and removes them from queue of
// received blocks.
def insertAllocatedBatch(batchTime: Time, allocatedBlocks: AllocatedBlocks) {
logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " +
s"${allocatedBlocks.streamIdToAllocatedBlocks}")
streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
allocatedBlocks.streamIdToAllocatedBlocks.foreach {
case (streamId, allocatedBlocksInStream) =>
getReceivedBlockQueue(streamId).dequeueAll(allocatedBlocksInStream.toSet)
}
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
}
......@@ -227,7 +230,7 @@ private[streaming] class ReceivedBlockTracker(
}
/** Write an update to the tracker to the write ahead log */
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
private[streaming] def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
if (isWriteAheadLogEnabled) {
logTrace(s"Writing record: $record")
try {
......
......@@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.scheduler.{AllocatedBlocks, _}
import org.apache.spark.streaming.util._
import org.apache.spark.streaming.util.WriteAheadLogSuite._
import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
......@@ -94,6 +94,27 @@ class ReceivedBlockTrackerSuite
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
}
test("recovery with write ahead logs should remove only allocated blocks from received queue") {
val manualClock = new ManualClock
val batchTime = manualClock.getTimeMillis()
val tracker1 = createTracker(clock = manualClock)
tracker1.isWriteAheadLogEnabled should be (true)
val allocatedBlockInfos = generateBlockInfos()
val unallocatedBlockInfos = generateBlockInfos()
val receivedBlockInfos = allocatedBlockInfos ++ unallocatedBlockInfos
receivedBlockInfos.foreach { b => tracker1.writeToLog(BlockAdditionEvent(b)) }
val allocatedBlocks = AllocatedBlocks(Map(streamId -> allocatedBlockInfos))
tracker1.writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
tracker1.stop()
val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
tracker2.getBlocksOfBatch(batchTime) shouldEqual allocatedBlocks.streamIdToAllocatedBlocks
tracker2.getUnallocatedBlocks(streamId) shouldEqual unallocatedBlockInfos
tracker2.stop()
}
test("recovery and cleanup with write ahead logs") {
val manualClock = new ManualClock
// Set the time increment level to twice the rotation interval so that every increment creates
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment