diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 8529ceac30f1ecde5c85c21bc909466a92f70cdc..5a6f9e87f6eaa562843fde5fb224625bc6238dad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -52,6 +52,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( /** Needed to serialize type T into JSON when using Jackson */ private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass) + protected val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain + /** * If we delete the old files after compaction at once, there is a race condition in S3: other * processes may see the old files are deleted but still cannot see the compaction file using @@ -152,11 +154,16 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( } override def add(batchId: Long, logs: Array[T]): Boolean = { - if (isCompactionBatch(batchId, compactInterval)) { - compact(batchId, logs) - } else { - super.add(batchId, logs) + val batchAdded = + if (isCompactionBatch(batchId, compactInterval)) { + compact(batchId, logs) + } else { + super.add(batchId, logs) + } + if (batchAdded && isDeletingExpiredLog) { + deleteExpiredLog(batchId) } + batchAdded } /** @@ -167,9 +174,6 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs if (super.add(batchId, compactLogs(allLogs).toArray)) { - if (isDeletingExpiredLog) { - deleteExpiredLog(batchId) - } true } else { // Return false as there is another writer. @@ -210,26 +214,41 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( } /** - * Since all logs before `compactionBatchId` are compacted and written into the - * `compactionBatchId` log file, they can be removed. However, due to the eventual consistency of - * S3, the compaction file may not be seen by other processes at once. So we only delete files - * created `fileCleanupDelayMs` milliseconds ago. + * Delete expired log entries that proceed the currentBatchId and retain + * sufficient minimum number of batches (given by minBatchsToRetain). This + * equates to retaining the earliest compaction log that proceeds + * batch id position currentBatchId + 1 - minBatchesToRetain. All log entries + * prior to the earliest compaction log proceeding that position will be removed. + * However, due to the eventual consistency of S3, the compaction file may not + * be seen by other processes at once. So we only delete files created + * `fileCleanupDelayMs` milliseconds ago. */ - private def deleteExpiredLog(compactionBatchId: Long): Unit = { - val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs - fileManager.list(metadataPath, new PathFilter { - override def accept(path: Path): Boolean = { - try { - val batchId = getBatchIdFromFileName(path.getName) - batchId < compactionBatchId - } catch { - case _: NumberFormatException => - false + private def deleteExpiredLog(currentBatchId: Long): Unit = { + if (compactInterval <= currentBatchId + 1 - minBatchesToRetain) { + // Find the first compaction batch id that maintains minBatchesToRetain + val minBatchId = currentBatchId + 1 - minBatchesToRetain + val minCompactionBatchId = minBatchId - (minBatchId % compactInterval) - 1 + assert(isCompactionBatch(minCompactionBatchId, compactInterval), + s"$minCompactionBatchId is not a compaction batch") + + logInfo(s"Current compact batch id = $currentBatchId " + + s"min compaction batch id to delete = $minCompactionBatchId") + + val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs + fileManager.list(metadataPath, new PathFilter { + override def accept(path: Path): Boolean = { + try { + val batchId = getBatchIdFromFileName(path.getName) + batchId < minCompactionBatchId + } catch { + case _: NumberFormatException => + false + } + } + }).foreach { f => + if (f.getModificationTime <= expiredTime) { + fileManager.delete(f.getPath) } - } - }).foreach { f => - if (f.getModificationTime <= expiredTime) { - fileManager.delete(f.getPath) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index b52810da88c3e54276bcfc1d2c118b761d77fcb8..48eee42a290112c087edd560db991230e392159f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -58,6 +58,9 @@ class StreamExecution( private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay + private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain + require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive") + /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. */ @@ -400,10 +403,11 @@ class StreamExecution( } } - // Now that we have logged the new batch, no further processing will happen for - // the batch before the previous batch, and it is safe to discard the old metadata. + // It is now safe to discard the metadata beyond the minimum number to retain. // Note that purge is exclusive, i.e. it purges everything before the target ID. - offsetLog.purge(currentBatchId - 1) + if (minBatchesToRetain < currentBatchId) { + offsetLog.purge(currentBatchId - minBatchesToRetain) + } } } else { awaitBatchLock.lock() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 493fdaaec50690a406001d850354d82db8178837..4f3f8181d1f4ee56d9946252d66a1ccffb5d253a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -303,7 +303,6 @@ private[state] class HDFSBackedStateStoreProvider( val mapFromFile = readSnapshotFile(version).getOrElse { val prevMap = loadMap(version - 1) val newMap = new MapType(prevMap) - newMap.putAll(prevMap) updateFromDeltaFile(version, newMap) newMap } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala index de72f1cf2723de6c4f935eaa295243804873ea7d..acfaa8e5eb3c4bcf2bf61e3a771c5efe082c874f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala @@ -26,9 +26,11 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex val minDeltasForSnapshot = conf.stateStoreMinDeltasForSnapshot - val minVersionsToRetain = conf.stateStoreMinVersionsToRetain + val minVersionsToRetain = conf.minBatchesToRetain } private[streaming] object StateStoreConf { val empty = new StateStoreConf() + + def apply(conf: SQLConf): StateStoreConf = new StateStoreConf(conf) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 64c373f526f61924954f71d592ab06e3b2757d1b..4d25f54caa1304364d91db158491142ed6a1e850 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -480,18 +480,17 @@ object SQLConf { .intConf .createWithDefault(10) - val STATE_STORE_MIN_VERSIONS_TO_RETAIN = - SQLConfigBuilder("spark.sql.streaming.stateStore.minBatchesToRetain") - .internal() - .doc("Minimum number of versions of a state store's data to retain after cleaning.") - .intConf - .createWithDefault(2) - val CHECKPOINT_LOCATION = SQLConfigBuilder("spark.sql.streaming.checkpointLocation") .doc("The default location for storing checkpoint data for streaming queries.") .stringConf .createOptional + val MIN_BATCHES_TO_RETAIN = SQLConfigBuilder("spark.sql.streaming.minBatchesToRetain") + .internal() + .doc("The minimum number of batches that must be retained and made recoverable.") + .intConf + .createWithDefault(100) + val UNSUPPORTED_OPERATION_CHECK_ENABLED = SQLConfigBuilder("spark.sql.streaming.unsupportedOperationCheck") .internal() @@ -668,8 +667,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) - def stateStoreMinVersionsToRetain: Int = getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN) - def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION) def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED) @@ -723,6 +720,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) + def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) + def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED) def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index e511fda57912cc43b411b94a14c0e9c12dfe957c..435d874d75b92d340e868d24d56c66cd0c78801d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -104,6 +104,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext withFakeCompactibleFileStreamLog( fileCleanupDelayMs = Long.MaxValue, defaultCompactInterval = 3, + defaultMinBatchesToRetain = 1, compactibleLog => { assert("0" === compactibleLog.batchIdToPath(0).getName) assert("1" === compactibleLog.batchIdToPath(1).getName) @@ -118,6 +119,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext withFakeCompactibleFileStreamLog( fileCleanupDelayMs = Long.MaxValue, defaultCompactInterval = 3, + defaultMinBatchesToRetain = 1, compactibleLog => { val logs = Array("entry_1", "entry_2", "entry_3") val expected = s"""${FakeCompactibleFileStreamLog.VERSION} @@ -138,6 +140,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext withFakeCompactibleFileStreamLog( fileCleanupDelayMs = Long.MaxValue, defaultCompactInterval = 3, + defaultMinBatchesToRetain = 1, compactibleLog => { val logs = s"""${FakeCompactibleFileStreamLog.VERSION} |"entry_1" @@ -157,6 +160,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext withFakeCompactibleFileStreamLog( fileCleanupDelayMs = Long.MaxValue, defaultCompactInterval = 3, + defaultMinBatchesToRetain = 1, compactibleLog => { for (batchId <- 0 to 10) { compactibleLog.add(batchId, Array("some_path_" + batchId)) @@ -175,6 +179,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext withFakeCompactibleFileStreamLog( fileCleanupDelayMs = 0, defaultCompactInterval = 3, + defaultMinBatchesToRetain = 1, compactibleLog => { val fs = compactibleLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf()) @@ -194,25 +199,29 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext compactibleLog.add(1, Array("some_path_1")) assert(Set("0", "1") === listBatchFiles()) compactibleLog.add(2, Array("some_path_2")) - assert(Set("2.compact") === listBatchFiles()) + assert(Set("0", "1", "2.compact") === listBatchFiles()) compactibleLog.add(3, Array("some_path_3")) assert(Set("2.compact", "3") === listBatchFiles()) compactibleLog.add(4, Array("some_path_4")) assert(Set("2.compact", "3", "4") === listBatchFiles()) compactibleLog.add(5, Array("some_path_5")) - assert(Set("5.compact") === listBatchFiles()) + assert(Set("2.compact", "3", "4", "5.compact") === listBatchFiles()) + compactibleLog.add(6, Array("some_path_6")) + assert(Set("5.compact", "6") === listBatchFiles()) }) } private def withFakeCompactibleFileStreamLog( fileCleanupDelayMs: Long, defaultCompactInterval: Int, + defaultMinBatchesToRetain: Int, f: FakeCompactibleFileStreamLog => Unit ): Unit = { withTempDir { file => val compactibleLog = new FakeCompactibleFileStreamLog( fileCleanupDelayMs, defaultCompactInterval, + defaultMinBatchesToRetain, spark, file.getCanonicalPath) f(compactibleLog) @@ -227,6 +236,7 @@ object FakeCompactibleFileStreamLog { class FakeCompactibleFileStreamLog( _fileCleanupDelayMs: Long, _defaultCompactInterval: Int, + _defaultMinBatchesToRetain: Int, sparkSession: SparkSession, path: String) extends CompactibleFileStreamLog[String]( @@ -241,5 +251,7 @@ class FakeCompactibleFileStreamLog( override protected def defaultCompactInterval: Int = _defaultCompactInterval + override protected val minBatchesToRetain: Int = _defaultMinBatchesToRetain + override def compactLogs(logs: Seq[String]): Seq[String] = logs } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index 8a21b76e8f029b4d517ff02347662c0630540c2a..7e0de5e2657be4f23ccae68cab7501d2a883c831 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -151,10 +151,11 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { testWithUninterruptibleThread("delete expired file") { // Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour - // deterministically + // deterministically and one min batches to retain withSQLConf( SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3", - SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0") { + SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0", + SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") { withFileStreamSinkLog { sinkLog => val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf()) @@ -174,13 +175,52 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION))) assert(Set("0", "1") === listBatchFiles()) sinkLog.add(2, Array(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION))) - assert(Set("2.compact") === listBatchFiles()) + assert(Set("0", "1", "2.compact") === listBatchFiles()) sinkLog.add(3, Array(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION))) assert(Set("2.compact", "3") === listBatchFiles()) sinkLog.add(4, Array(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION))) assert(Set("2.compact", "3", "4") === listBatchFiles()) sinkLog.add(5, Array(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION))) - assert(Set("5.compact") === listBatchFiles()) + assert(Set("2.compact", "3", "4", "5.compact") === listBatchFiles()) + sinkLog.add(6, Array(newFakeSinkFileStatus("/a/b/6", FileStreamSinkLog.ADD_ACTION))) + assert(Set("5.compact", "6") === listBatchFiles()) + } + } + + withSQLConf( + SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3", + SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0", + SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") { + withFileStreamSinkLog { sinkLog => + val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf()) + + def listBatchFiles(): Set[String] = { + fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName => + try { + getBatchIdFromFileName(fileName) + true + } catch { + case _: NumberFormatException => false + } + }.toSet + } + + sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION))) + assert(Set("0") === listBatchFiles()) + sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION))) + assert(Set("0", "1") === listBatchFiles()) + sinkLog.add(2, Array(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION))) + assert(Set("0", "1", "2.compact") === listBatchFiles()) + sinkLog.add(3, Array(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION))) + assert(Set("0", "1", "2.compact", "3") === listBatchFiles()) + sinkLog.add(4, Array(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION))) + assert(Set("2.compact", "3", "4") === listBatchFiles()) + sinkLog.add(5, Array(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION))) + assert(Set("2.compact", "3", "4", "5.compact") === listBatchFiles()) + sinkLog.add(6, Array(newFakeSinkFileStatus("/a/b/6", FileStreamSinkLog.ADD_ACTION))) + assert(Set("2.compact", "3", "4", "5.compact", "6") === listBatchFiles()) + sinkLog.add(7, Array(newFakeSinkFileStatus("/a/b/7", FileStreamSinkLog.ADD_ACTION))) + assert(Set("5.compact", "6", "7") === listBatchFiles()) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 05fc7345a7daf4166091f1b425c553d2f08290e5..3404b1143bc6297d49fce2aaa4494d10e4bbe051 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -376,7 +376,9 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth val opId = 0 val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString val storeId = StateStoreId(dir, opId, 0) - val storeConf = StateStoreConf.empty + val sqlConf = new SQLConf() + sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2) + val storeConf = StateStoreConf(sqlConf) val hadoopConf = new Configuration() val provider = new HDFSBackedStateStoreProvider( storeId, keySchema, valueSchema, storeConf, hadoopConf) @@ -606,6 +608,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth ): HDFSBackedStateStoreProvider = { val sqlConf = new SQLConf() sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, minDeltasForSnapshot) + sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2) new HDFSBackedStateStoreProvider( StateStoreId(dir, opId, partition), keySchema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 7be2f216919b05e65508ae2ef57ba6efc83c14b5..c66d6b1f8d8e614fbde69cd71f951194171f6128 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.SparkException import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.ManualClock @@ -369,25 +370,52 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { testQuietly("StreamExecution metadata garbage collection") { val inputData = MemoryStream[Int] val mapped = inputData.toDS().map(6 / _) + withSQLConf(SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") { + // Run 3 batches, and then assert that only 2 metadata files is are at the end + // since the first should have been purged. + testStream(mapped)( + AddData(inputData, 1, 2), + CheckAnswer(6, 3), + AddData(inputData, 1, 2), + CheckAnswer(6, 3, 6, 3), + AddData(inputData, 4, 6), + CheckAnswer(6, 3, 6, 3, 1, 1), + + AssertOnQuery("metadata log should contain only two files") { q => + val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) + val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) + val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475 + assert(toTest.size == 2 && toTest.head == "1") + true + } + ) + } - // Run 3 batches, and then assert that only 2 metadata files is are at the end - // since the first should have been purged. - testStream(mapped)( - AddData(inputData, 1, 2), - CheckAnswer(6, 3), - AddData(inputData, 1, 2), - CheckAnswer(6, 3, 6, 3), - AddData(inputData, 4, 6), - CheckAnswer(6, 3, 6, 3, 1, 1), - - AssertOnQuery("metadata log should contain only two files") { q => - val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) - val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) - val toTest = logFileNames.filter(! _.endsWith(".crc")).sorted // Workaround for SPARK-17475 - assert(toTest.size == 2 && toTest.head == "1") - true - } - ) + val inputData2 = MemoryStream[Int] + withSQLConf(SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") { + // Run 5 batches, and then assert that 3 metadata files is are at the end + // since the two should have been purged. + testStream(inputData2.toDS())( + AddData(inputData2, 1, 2), + CheckAnswer(1, 2), + AddData(inputData2, 1, 2), + CheckAnswer(1, 2, 1, 2), + AddData(inputData2, 3, 4), + CheckAnswer(1, 2, 1, 2, 3, 4), + AddData(inputData2, 5, 6), + CheckAnswer(1, 2, 1, 2, 3, 4, 5, 6), + AddData(inputData2, 7, 8), + CheckAnswer(1, 2, 1, 2, 3, 4, 5, 6, 7, 8), + + AssertOnQuery("metadata log should contain three files") { q => + val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) + val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) + val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475 + assert(toTest.size == 3 && toTest.head == "2") + true + } + ) + } } /** Create a streaming DF that only execute one batch in which it returns the given static DF */