diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 8b97db8dd36f1f0f3bbf32e3be2ae628312f65b3..f7a8ebee8a544b6688b27566ca238dcc5dea20fe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -42,7 +42,7 @@ private[streaming] trait ReceivedBlockHandler { def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult /** Cleanup old blocks older than the given threshold time */ - def cleanupOldBlock(threshTime: Long) + def cleanupOldBlocks(threshTime: Long) } @@ -82,7 +82,7 @@ private[streaming] class BlockManagerBasedBlockHandler( BlockManagerBasedStoreResult(blockId) } - def cleanupOldBlock(threshTime: Long) { + def cleanupOldBlocks(threshTime: Long) { // this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing // of BlockRDDs. } @@ -192,8 +192,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler( WriteAheadLogBasedStoreResult(blockId, segment) } - def cleanupOldBlock(threshTime: Long) { - logManager.cleanupOldLogs(threshTime) + def cleanupOldBlocks(threshTime: Long) { + logManager.cleanupOldLogs(threshTime, waitForCompletion = false) } def stop() { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 02758e0bca6c5c5ec6fcfdfc9e45a22c1bdaf315..2ce458cddec1a3621a3c59f1c8b5c738adba3e67 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -139,14 +139,17 @@ private[streaming] class ReceivedBlockTracker( getReceivedBlockQueue(streamId).toSeq } - /** Clean up block information of old batches. */ - def cleanupOldBatches(cleanupThreshTime: Time): Unit = synchronized { + /** + * Clean up block information of old batches. If waitForCompletion is true, this method + * returns only after the files are cleaned up. + */ + def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized { assert(cleanupThreshTime.milliseconds < clock.currentTime()) val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq logInfo("Deleting batches " + timesToCleanup) writeToLog(BatchCleanupEvent(timesToCleanup)) timeToAllocatedBlocks --= timesToCleanup - logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds)) + logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds, waitForCompletion)) log } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 1f0e442a12283c298512434d189c97e57000eb8a..8dbb42a86e3bdb359888c6ae222f645095ec2b93 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -121,7 +121,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Clean up metadata older than the given threshold time */ def cleanupOldMetadata(cleanupThreshTime: Time) { - receivedBlockTracker.cleanupOldBatches(cleanupThreshTime) + receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false) } /** Register a receiver */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala index 70d234320be7cd07a55c9a757f95d4835704e598..166661b7496dffd5eac86052f6f695f7d43919a2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala @@ -19,11 +19,11 @@ package org.apache.spark.streaming.util import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.fs.permission.FsPermission import org.apache.spark.Logging import org.apache.spark.util.Utils import WriteAheadLogManager._ @@ -124,8 +124,12 @@ private[streaming] class WriteAheadLogManager( * files, which is usually based on the local system time. So if there is coordination necessary * between the node calculating the threshTime (say, driver node), and the local system time * (say, worker node), the caller has to take account of possible time skew. + * + * If waitForCompletion is set to true, this method will return only after old logs have been + * deleted. This should be set to true only for testing. Else the files will be deleted + * asynchronously. */ - def cleanupOldLogs(threshTime: Long): Unit = { + def cleanupOldLogs(threshTime: Long, waitForCompletion: Boolean): Unit = { val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } } logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " + s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}") @@ -146,10 +150,15 @@ private[streaming] class WriteAheadLogManager( logInfo(s"Cleared log files in $logDirectory older than $threshTime") } if (!executionContext.isShutdown) { - Future { deleteFiles() } + val f = Future { deleteFiles() } + if (waitForCompletion) { + import scala.concurrent.duration._ + Await.ready(f, 1 second) + } } } + /** Stop the manager, close any open log writer */ def stop(): Unit = synchronized { if (currentLogWriter != null) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 3661e16a9ef2ff421cb3d491e8ea6f437a092558..132ff2443fc0f3d49264bed34089f02ebdb62809 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -168,7 +168,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche manualClock.currentTime() shouldEqual 5000L val cleanupThreshTime = 3000L - handler.cleanupOldBlock(cleanupThreshTime) + handler.cleanupOldBlocks(cleanupThreshTime) eventually(timeout(10000 millis), interval(10 millis)) { getWriteAheadLogFiles().size should be < preCleanupLogFiles.size } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 01a09b67b99dc8ba6134ae9cb4905d3d7ff936e8..de7e9d624bf6bf5be30252083d945672549d953c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -166,7 +166,7 @@ class ReceivedBlockTrackerSuite // Cleanup first batch but not second batch val oldestLogFile = getWriteAheadLogFiles().head incrementTime() - tracker3.cleanupOldBatches(batchTime2) + tracker3.cleanupOldBatches(batchTime2, waitForCompletion = true) // Verify that the batch allocations have been cleaned, and the act has been written to log tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual Seq.empty diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 8f69bcb64279d0c5d4265126f44eac6d6299f826..7ce9499dc614d2971be86267c981a6ae8b0ad637 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -182,15 +182,29 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { } test("WriteAheadLogManager - cleanup old logs") { + logCleanUpTest(waitForCompletion = false) + } + + test("WriteAheadLogManager - cleanup old logs synchronously") { + logCleanUpTest(waitForCompletion = true) + } + + private def logCleanUpTest(waitForCompletion: Boolean): Unit = { // Write data with manager, recover with new manager and verify val manualClock = new ManualClock val dataToWrite = generateRandomData() manager = writeDataUsingManager(testDir, dataToWrite, manualClock, stopManager = false) val logFiles = getLogFilesInDirectory(testDir) assert(logFiles.size > 1) - manager.cleanupOldLogs(manualClock.currentTime() / 2) - eventually(timeout(1 second), interval(10 milliseconds)) { + + manager.cleanupOldLogs(manualClock.currentTime() / 2, waitForCompletion) + + if (waitForCompletion) { assert(getLogFilesInDirectory(testDir).size < logFiles.size) + } else { + eventually(timeout(1 second), interval(10 milliseconds)) { + assert(getLogFilesInDirectory(testDir).size < logFiles.size) + } } }