Skip to content
Snippets Groups Projects
Commit 4f030b9e authored by Burak Yavuz's avatar Burak Yavuz Committed by Tathagata Das
Browse files

[SPARK-11324][STREAMING] Flag for closing Write Ahead Logs after a write

Currently the Write Ahead Log in Spark Streaming flushes data as writes need to be made. S3 does not support flushing of data, data is written once the stream is actually closed.
In case of failure, the data for the last minute (default rolling interval) will not be properly written. Therefore we need a flag to close the stream after the write, so that we achieve read after write consistency.

cc tdas zsxwing

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #9285 from brkyvz/caw-wal.
parent 9dba5fb2
No related branches found
No related tags found
No related merge requests found
...@@ -47,7 +47,8 @@ private[streaming] class FileBasedWriteAheadLog( ...@@ -47,7 +47,8 @@ private[streaming] class FileBasedWriteAheadLog(
logDirectory: String, logDirectory: String,
hadoopConf: Configuration, hadoopConf: Configuration,
rollingIntervalSecs: Int, rollingIntervalSecs: Int,
maxFailures: Int maxFailures: Int,
closeFileAfterWrite: Boolean
) extends WriteAheadLog with Logging { ) extends WriteAheadLog with Logging {
import FileBasedWriteAheadLog._ import FileBasedWriteAheadLog._
...@@ -80,6 +81,9 @@ private[streaming] class FileBasedWriteAheadLog( ...@@ -80,6 +81,9 @@ private[streaming] class FileBasedWriteAheadLog(
while (!succeeded && failures < maxFailures) { while (!succeeded && failures < maxFailures) {
try { try {
fileSegment = getLogWriter(time).write(byteBuffer) fileSegment = getLogWriter(time).write(byteBuffer)
if (closeFileAfterWrite) {
resetWriter()
}
succeeded = true succeeded = true
} catch { } catch {
case ex: Exception => case ex: Exception =>
......
...@@ -31,11 +31,15 @@ private[streaming] object WriteAheadLogUtils extends Logging { ...@@ -31,11 +31,15 @@ private[streaming] object WriteAheadLogUtils extends Logging {
val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY = val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
"spark.streaming.receiver.writeAheadLog.rollingIntervalSecs" "spark.streaming.receiver.writeAheadLog.rollingIntervalSecs"
val RECEIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.receiver.writeAheadLog.maxFailures" val RECEIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.receiver.writeAheadLog.maxFailures"
val RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
"spark.streaming.receiver.writeAheadLog.closeFileAfterWrite"
val DRIVER_WAL_CLASS_CONF_KEY = "spark.streaming.driver.writeAheadLog.class" val DRIVER_WAL_CLASS_CONF_KEY = "spark.streaming.driver.writeAheadLog.class"
val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY = val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
"spark.streaming.driver.writeAheadLog.rollingIntervalSecs" "spark.streaming.driver.writeAheadLog.rollingIntervalSecs"
val DRIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.driver.writeAheadLog.maxFailures" val DRIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.driver.writeAheadLog.maxFailures"
val DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
"spark.streaming.driver.writeAheadLog.closeFileAfterWrite"
val DEFAULT_ROLLING_INTERVAL_SECS = 60 val DEFAULT_ROLLING_INTERVAL_SECS = 60
val DEFAULT_MAX_FAILURES = 3 val DEFAULT_MAX_FAILURES = 3
...@@ -60,6 +64,14 @@ private[streaming] object WriteAheadLogUtils extends Logging { ...@@ -60,6 +64,14 @@ private[streaming] object WriteAheadLogUtils extends Logging {
} }
} }
def shouldCloseFileAfterWrite(conf: SparkConf, isDriver: Boolean): Boolean = {
if (isDriver) {
conf.getBoolean(DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY, defaultValue = false)
} else {
conf.getBoolean(RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY, defaultValue = false)
}
}
/** /**
* Create a WriteAheadLog for the driver. If configured with custom WAL class, it will try * Create a WriteAheadLog for the driver. If configured with custom WAL class, it will try
* to create instance of that class, otherwise it will create the default FileBasedWriteAheadLog. * to create instance of that class, otherwise it will create the default FileBasedWriteAheadLog.
...@@ -113,7 +125,8 @@ private[streaming] object WriteAheadLogUtils extends Logging { ...@@ -113,7 +125,8 @@ private[streaming] object WriteAheadLogUtils extends Logging {
} }
}.getOrElse { }.getOrElse {
new FileBasedWriteAheadLog(sparkConf, fileWalLogDirectory, fileWalHadoopConf, new FileBasedWriteAheadLog(sparkConf, fileWalLogDirectory, fileWalHadoopConf,
getRollingIntervalSecs(sparkConf, isDriver), getMaxFailures(sparkConf, isDriver)) getRollingIntervalSecs(sparkConf, isDriver), getMaxFailures(sparkConf, isDriver),
shouldCloseFileAfterWrite(sparkConf, isDriver))
} }
} }
......
...@@ -203,6 +203,21 @@ class WriteAheadLogSuite extends SparkFunSuite with BeforeAndAfter { ...@@ -203,6 +203,21 @@ class WriteAheadLogSuite extends SparkFunSuite with BeforeAndAfter {
assert(writtenData === dataToWrite) assert(writtenData === dataToWrite)
} }
test("FileBasedWriteAheadLog - close after write flag") {
// Write data with rotation using WriteAheadLog class
val numFiles = 3
val dataToWrite = Seq.tabulate(numFiles)(_.toString)
// total advance time is less than 1000, therefore log shouldn't be rolled, but manually closed
writeDataUsingWriteAheadLog(testDir, dataToWrite, closeLog = false, clockAdvanceTime = 100,
closeFileAfterWrite = true)
// Read data manually to verify the written data
val logFiles = getLogFilesInDirectory(testDir)
assert(logFiles.size === numFiles)
val writtenData = logFiles.flatMap { file => readDataManually(file)}
assert(writtenData === dataToWrite)
}
test("FileBasedWriteAheadLog - read rotating logs") { test("FileBasedWriteAheadLog - read rotating logs") {
// Write data manually for testing reading through WriteAheadLog // Write data manually for testing reading through WriteAheadLog
val writtenData = (1 to 10).map { i => val writtenData = (1 to 10).map { i =>
...@@ -296,8 +311,8 @@ class WriteAheadLogSuite extends SparkFunSuite with BeforeAndAfter { ...@@ -296,8 +311,8 @@ class WriteAheadLogSuite extends SparkFunSuite with BeforeAndAfter {
assert(!nonexistentTempPath.exists()) assert(!nonexistentTempPath.exists())
val writtenSegment = writeDataManually(generateRandomData(), testFile) val writtenSegment = writeDataManually(generateRandomData(), testFile)
val wal = new FileBasedWriteAheadLog( val wal = new FileBasedWriteAheadLog(new SparkConf(), tempDir.getAbsolutePath,
new SparkConf(), tempDir.getAbsolutePath, new Configuration(), 1, 1) new Configuration(), 1, 1, closeFileAfterWrite = false)
assert(!nonexistentTempPath.exists(), "Directory created just by creating log object") assert(!nonexistentTempPath.exists(), "Directory created just by creating log object")
wal.read(writtenSegment.head) wal.read(writtenSegment.head)
assert(!nonexistentTempPath.exists(), "Directory created just by attempting to read segment") assert(!nonexistentTempPath.exists(), "Directory created just by attempting to read segment")
...@@ -356,14 +371,16 @@ object WriteAheadLogSuite { ...@@ -356,14 +371,16 @@ object WriteAheadLogSuite {
logDirectory: String, logDirectory: String,
data: Seq[String], data: Seq[String],
manualClock: ManualClock = new ManualClock, manualClock: ManualClock = new ManualClock,
closeLog: Boolean = true closeLog: Boolean = true,
): FileBasedWriteAheadLog = { clockAdvanceTime: Int = 500,
closeFileAfterWrite: Boolean = false): FileBasedWriteAheadLog = {
if (manualClock.getTimeMillis() < 100000) manualClock.setTime(10000) if (manualClock.getTimeMillis() < 100000) manualClock.setTime(10000)
val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, hadoopConf, 1, 1) val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, hadoopConf, 1, 1,
closeFileAfterWrite)
// Ensure that 500 does not get sorted after 2000, so put a high base value. // Ensure that 500 does not get sorted after 2000, so put a high base value.
data.foreach { item => data.foreach { item =>
manualClock.advance(500) manualClock.advance(clockAdvanceTime)
wal.write(item, manualClock.getTimeMillis()) wal.write(item, manualClock.getTimeMillis())
} }
if (closeLog) wal.close() if (closeLog) wal.close()
...@@ -418,7 +435,8 @@ object WriteAheadLogSuite { ...@@ -418,7 +435,8 @@ object WriteAheadLogSuite {
/** Read all the data in the log file in a directory using the WriteAheadLog class. */ /** Read all the data in the log file in a directory using the WriteAheadLog class. */
def readDataUsingWriteAheadLog(logDirectory: String): Seq[String] = { def readDataUsingWriteAheadLog(logDirectory: String): Seq[String] = {
val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, hadoopConf, 1, 1) val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, hadoopConf, 1, 1,
closeFileAfterWrite = false)
val data = wal.readAll().asScala.map(byteBufferToString).toSeq val data = wal.readAll().asScala.map(byteBufferToString).toSeq
wal.close() wal.close()
data data
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment