diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala index 731a369fc92c06db374bef4a25a3db050f66b582..7f9e2c97349706a9e992fbe9378e6887e71fba40 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala @@ -67,7 +67,7 @@ private[streaming] object WriteAheadLogUtils extends Logging { } def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = { - isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = false) + isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = true) } /** diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java index 175b8a496b4e5116c8e6b02a8aaf95c046deb70a..09b5f8ed03279163ea8729f10b5e3c7e208fc766 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java @@ -108,6 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { public void testCustomWAL() { SparkConf conf = new SparkConf(); conf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName()); + conf.set("spark.streaming.driver.writeAheadLog.allowBatching", "false"); WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null); String data1 = "data1"; diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 7db17abb7947c04a6c38d2a4ee2d940177a38990..081f5a1c93e6e5939423e4c4dd7de3982e72576a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -330,8 +330,13 @@ class ReceivedBlockTrackerSuite : Seq[ReceivedBlockTrackerLogEvent] = { logFiles.flatMap { file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq - }.map { byteBuffer => - Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) + }.flatMap { byteBuffer => + val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) { + Utils.deserialize[Array[Array[Byte]]](byteBuffer.array()).map(ByteBuffer.wrap) + } else { + Array(byteBuffer) + } + validBuffer.map(b => Utils.deserialize[ReceivedBlockTrackerLogEvent](b.array())) }.toList } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 4273fd7dda8bec01009077c2fe45bbb9ac2b71c9..7f80d6ecdbbb5612449d96cef1dcc850713e52bd 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -20,7 +20,7 @@ import java.io._ import java.nio.ByteBuffer import java.util.{Iterator => JIterator} import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{TimeUnit, CountDownLatch, ThreadPoolExecutor} +import java.util.concurrent.{RejectedExecutionException, TimeUnit, CountDownLatch, ThreadPoolExecutor} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -190,6 +190,28 @@ abstract class CommonWriteAheadLogTests( } assert(!nonexistentTempPath.exists(), "Directory created just by attempting to read segment") } + + test(testPrefix + "parallel recovery not enabled if closeFileAfterWrite = false") { + // write some data + val writtenData = (1 to 10).map { i => + val data = generateRandomData() + val file = testDir + s"/log-$i-$i" + writeDataManually(data, file, allowBatching) + data + }.flatten + + val wal = createWriteAheadLog(testDir, closeFileAfterWrite, allowBatching) + // create iterator but don't materialize it + val readData = wal.readAll().asScala.map(byteBufferToString) + wal.close() + if (closeFileAfterWrite) { + // the threadpool is shutdown by the wal.close call above, therefore we shouldn't be able + // to materialize the iterator with parallel recovery + intercept[RejectedExecutionException](readData.toArray) + } else { + assert(readData.toSeq === writtenData) + } + } } class FileBasedWriteAheadLogSuite diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala index 9152728191ea10aed0a097597b03e1207fde6380..bfc5b0cf60fb12f0d4432e3e60314cbeb4913ca8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala @@ -56,19 +56,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite { test("log selection and creation") { val emptyConf = new SparkConf() // no log configuration - assertDriverLogClass[FileBasedWriteAheadLog](emptyConf) + assertDriverLogClass[FileBasedWriteAheadLog](emptyConf, isBatched = true) assertReceiverLogClass[FileBasedWriteAheadLog](emptyConf) // Verify setting driver WAL class val driverWALConf = new SparkConf().set("spark.streaming.driver.writeAheadLog.class", classOf[MockWriteAheadLog0].getName()) - assertDriverLogClass[MockWriteAheadLog0](driverWALConf) + assertDriverLogClass[MockWriteAheadLog0](driverWALConf, isBatched = true) assertReceiverLogClass[FileBasedWriteAheadLog](driverWALConf) // Verify setting receiver WAL class val receiverWALConf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.class", classOf[MockWriteAheadLog0].getName()) - assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf) + assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true) assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf) // Verify setting receiver WAL class with 1-arg constructor @@ -104,6 +104,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite { assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true) assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf) } + + test("batching is enabled by default in WriteAheadLog") { + val conf = new SparkConf() + assert(WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) + // batching is not valid for receiver WALs + assert(!WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = false)) + } + + test("closeFileAfterWrite is disabled by default in WriteAheadLog") { + val conf = new SparkConf() + assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = true)) + assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = false)) + } } object WriteAheadLogUtilsSuite {