Skip to content
Snippets Groups Projects
Commit 22fc4e75 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-4482][Streaming] Disable ReceivedBlockTracker's write ahead log by default

The write ahead log of ReceivedBlockTracker gets enabled as soon as checkpoint directory is set. This should not happen, as the WAL should be enabled only if the WAL is enabled in the Spark configuration.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #3358 from tdas/SPARK-4482 and squashes the following commits:

b740136 [Tathagata Das] Fixed bug in ReceivedBlockTracker
parent eacc7883
No related branches found
No related tags found
No related merge requests found
......@@ -70,18 +70,7 @@ private[streaming] class ReceivedBlockTracker(
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
private val logManagerRollingIntervalSecs = conf.getInt(
"spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
private val logManagerOption = checkpointDirOption.map { checkpointDir =>
new WriteAheadLogManager(
ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
hadoopConf,
rollingIntervalSecs = logManagerRollingIntervalSecs,
callerName = "ReceivedBlockHandlerMaster",
clock = clock
)
}
private val logManagerOption = createLogManager()
private var lastAllocatedBatchTime: Time = null
......@@ -221,6 +210,30 @@ private[streaming] class ReceivedBlockTracker(
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
}
/** Optionally create the write ahead log manager only if the feature is enabled */
private def createLogManager(): Option[WriteAheadLogManager] = {
if (conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
val rollingIntervalSecs = conf.getInt(
"spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
val logManager = new WriteAheadLogManager(logDir, hadoopConf,
rollingIntervalSecs = rollingIntervalSecs, clock = clock,
callerName = "ReceivedBlockHandlerMaster")
Some(logManager)
} else {
None
}
}
/** Check if the log manager is enabled. This is only used for testing purposes. */
private[streaming] def isLogManagerEnabled: Boolean = logManagerOption.nonEmpty
}
private[streaming] object ReceivedBlockTracker {
......
......@@ -41,17 +41,16 @@ import org.apache.spark.util.Utils
class ReceivedBlockTrackerSuite
extends FunSuite with BeforeAndAfter with Matchers with Logging {
val conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1")
val hadoopConf = new Configuration()
val akkaTimeout = 10 seconds
val streamId = 1
var allReceivedBlockTrackers = new ArrayBuffer[ReceivedBlockTracker]()
var checkpointDirectory: File = null
var conf: SparkConf = null
before {
conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
checkpointDirectory = Files.createTempDir()
}
......@@ -64,7 +63,8 @@ class ReceivedBlockTrackerSuite
}
test("block addition, and block to batch allocation") {
val receivedBlockTracker = createTracker(enableCheckpoint = false)
val receivedBlockTracker = createTracker(setCheckpointDir = false)
receivedBlockTracker.isLogManagerEnabled should be (false) // should be disable by default
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
val blockInfos = generateBlockInfos()
......@@ -95,13 +95,11 @@ class ReceivedBlockTrackerSuite
test("block addition, block to batch allocation and cleanup with write ahead log") {
val manualClock = new ManualClock
conf.getInt(
"spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", -1) should be (1)
// Set the time increment level to twice the rotation interval so that every increment creates
// a new log file
val timeIncrementMillis = 2000L
def incrementTime() {
val timeIncrementMillis = 2000L
manualClock.addToTime(timeIncrementMillis)
}
......@@ -121,7 +119,11 @@ class ReceivedBlockTrackerSuite
}
// Start tracker and add blocks
val tracker1 = createTracker(enableCheckpoint = true, clock = manualClock)
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1")
val tracker1 = createTracker(clock = manualClock)
tracker1.isLogManagerEnabled should be (true)
val blockInfos1 = addBlockInfos(tracker1)
tracker1.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
......@@ -132,7 +134,7 @@ class ReceivedBlockTrackerSuite
// Restart tracker and verify recovered list of unallocated blocks
incrementTime()
val tracker2 = createTracker(enableCheckpoint = true, clock = manualClock)
val tracker2 = createTracker(clock = manualClock)
tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
// Allocate blocks to batch and verify whether the unallocated blocks got allocated
......@@ -156,7 +158,7 @@ class ReceivedBlockTrackerSuite
// Restart tracker and verify recovered state
incrementTime()
val tracker3 = createTracker(enableCheckpoint = true, clock = manualClock)
val tracker3 = createTracker(clock = manualClock)
tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
tracker3.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
tracker3.getUnallocatedBlocks(streamId) shouldBe empty
......@@ -179,18 +181,38 @@ class ReceivedBlockTrackerSuite
// Restart tracker and verify recovered state, specifically whether info about the first
// batch has been removed, but not the second batch
incrementTime()
val tracker4 = createTracker(enableCheckpoint = true, clock = manualClock)
val tracker4 = createTracker(clock = manualClock)
tracker4.getUnallocatedBlocks(streamId) shouldBe empty
tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
}
test("enabling write ahead log but not setting checkpoint dir") {
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
intercept[SparkException] {
createTracker(setCheckpointDir = false)
}
}
test("setting checkpoint dir but not enabling write ahead log") {
// When WAL config is not set, log manager should not be enabled
val tracker1 = createTracker(setCheckpointDir = true)
tracker1.isLogManagerEnabled should be (false)
// When WAL is explicitly disabled, log manager should not be enabled
conf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
val tracker2 = createTracker(setCheckpointDir = true)
tracker2.isLogManagerEnabled should be(false)
}
/**
* Create tracker object with the optional provided clock. Use fake clock if you
* want to control time by manually incrementing it to test log cleanup.
*/
def createTracker(enableCheckpoint: Boolean, clock: Clock = new SystemClock): ReceivedBlockTracker = {
val cpDirOption = if (enableCheckpoint) Some(checkpointDirectory.toString) else None
def createTracker(
setCheckpointDir: Boolean = true,
clock: Clock = new SystemClock): ReceivedBlockTracker = {
val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None
val tracker = new ReceivedBlockTracker(conf, hadoopConf, Seq(streamId), clock, cpDirOption)
allReceivedBlockTrackers += tracker
tracker
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment