Skip to content
Snippets Groups Projects
Commit 3027f06b authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-5147][Streaming] Delete the received data WAL log periodically

This is a refactored fix based on jerryshao 's PR #4037
This enabled deletion of old WAL files containing the received block data.
Improvements over #4037
- Respecting the rememberDuration of all receiver streams. In #4037, if there were two receiver streams with multiple remember durations, the deletion would have delete based on the shortest remember duration, thus deleting data prematurely for the receiver stream with longer remember duration.
- Added unit test to test creation of receiver WAL, automatic deletion, and respecting of remember duration.

jerryshao I am going to merge this ASAP to make it 1.2.1 Thanks for the initial draft of this PR. Made my job much easier.

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: jerryshao <saisai.shao@intel.com>

Closes #4149 from tdas/SPARK-5147 and squashes the following commits:

730798b [Tathagata Das] Added comments.
c4cf067 [Tathagata Das] Minor fixes
2579b27 [Tathagata Das] Refactored the fix to make sure that the cleanup respects the remember duration of all the receiver streams
2736fd1 [jerryshao] Delete the old WAL log periodically
parent fcb3e186
No related branches found
No related tags found
No related merge requests found
Showing
with 172 additions and 50 deletions
......@@ -160,6 +160,14 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
}
/**
* Get the maximum remember duration across all the input streams. This is a conservative but
* safe remember duration which can be used to perform cleanup operations.
*/
def getMaxInputStreamRememberDuration(): Duration = {
inputStreams.map { _.rememberDuration }.maxBy { _.milliseconds }
}
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
logDebug("DStreamGraph.writeObject used")
......
......@@ -94,15 +94,4 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
}
Some(blockRDD)
}
/**
* Clear metadata that are older than `rememberDuration` of this DStream.
* This is an internal method that should not be called directly. This
* implementation overrides the default implementation to clear received
* block information.
*/
private[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time)
ssc.scheduler.receiverTracker.cleanupOldMetadata(time - rememberDuration)
}
}
......@@ -17,7 +17,10 @@
package org.apache.spark.streaming.receiver
/** Messages sent to the NetworkReceiver. */
import org.apache.spark.streaming.Time
/** Messages sent to the Receiver. */
private[streaming] sealed trait ReceiverMessage extends Serializable
private[streaming] object StopReceiver extends ReceiverMessage
private[streaming] case class CleanupOldBlocks(threshTime: Time) extends ReceiverMessage
......@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.scheduler._
import org.apache.spark.util.{AkkaUtils, Utils}
......@@ -82,6 +83,9 @@ private[streaming] class ReceiverSupervisorImpl(
case StopReceiver =>
logInfo("Received stop signal")
stop("Stopped by driver", None)
case CleanupOldBlocks(threshTime) =>
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
}
def ref = self
......@@ -193,4 +197,9 @@ private[streaming] class ReceiverSupervisorImpl(
/** Generate new block ID */
private def nextBlockId = StreamBlockId(streamId, newBlockId.getAndIncrement)
private def cleanupOldBlocks(cleanupThreshTime: Time): Unit = {
logDebug(s"Cleaning up blocks older then $cleanupThreshTime")
receivedBlockHandler.cleanupOldBlocks(cleanupThreshTime.milliseconds)
}
}
......@@ -238,13 +238,17 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
/** Clear DStream metadata for the given `time`. */
private def clearMetadata(time: Time) {
ssc.graph.clearMetadata(time)
jobScheduler.receiverTracker.cleanupOldMetadata(time - graph.batchDuration)
// If checkpointing is enabled, then checkpoint,
// else mark batch to be fully processed
if (shouldCheckpoint) {
eventActor ! DoCheckpoint(time)
} else {
// If checkpointing is not enabled, then delete metadata information about
// received blocks (block data not saved in any case). Otherwise, wait for
// checkpointing of this batch to complete.
val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
markBatchFullyProcessed(time)
}
}
......@@ -252,6 +256,11 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
/** Clear DStream checkpoint data for the given `time`. */
private def clearCheckpointData(time: Time) {
ssc.graph.clearCheckpointData(time)
// All the checkpoint information about which batches have been processed, etc have
// been saved to checkpoints, so its safe to delete block metadata and data WAL files
val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
markBatchFullyProcessed(time)
}
......
......@@ -150,7 +150,6 @@ private[streaming] class ReceivedBlockTracker(
writeToLog(BatchCleanupEvent(timesToCleanup))
timeToAllocatedBlocks --= timesToCleanup
logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds, waitForCompletion))
log
}
/** Stop the block tracker. */
......
......@@ -24,9 +24,8 @@ import scala.language.existentials
import akka.actor._
import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver}
import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl, StopReceiver}
/**
* Messages used by the NetworkReceiver and the ReceiverTracker to communicate
......@@ -119,9 +118,20 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
}
/** Clean up metadata older than the given threshold time */
def cleanupOldMetadata(cleanupThreshTime: Time) {
/**
* Clean up the data and metadata of blocks and batches that are strictly
* older than the threshold time. Note that this does not
*/
def cleanupOldBlocksAndBatches(cleanupThreshTime: Time) {
// Clean up old block and batch metadata
receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false)
// Signal the receivers to delete old block data
if (ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
logInfo(s"Cleanup old received batch data: $cleanupThreshTime")
receiverInfo.values.flatMap { info => Option(info.actor) }
.foreach { _ ! CleanupOldBlocks(cleanupThreshTime) }
}
}
/** Register a receiver */
......
......@@ -63,7 +63,7 @@ private[streaming] object HdfsUtils {
}
def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = {
// For local file systems, return the raw loca file system, such calls to flush()
// For local file systems, return the raw local file system, such calls to flush()
// actually flushes the stream.
val fs = path.getFileSystem(conf)
fs match {
......
......@@ -17,21 +17,26 @@
package org.apache.spark.streaming
import java.io.File
import java.nio.ByteBuffer
import java.util.concurrent.Semaphore
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkConf
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver, ReceiverSupervisor}
import org.scalatest.FunSuite
import com.google.common.io.Files
import org.scalatest.concurrent.Timeouts
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._
/** Testsuite for testing the network receiver behavior */
class ReceiverSuite extends FunSuite with Timeouts {
class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
test("receiver life cycle") {
......@@ -192,7 +197,6 @@ class ReceiverSuite extends FunSuite with Timeouts {
val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 3
val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 1
val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",")
println(minExpectedMessagesPerBlock, maxExpectedMessagesPerBlock, ":", receivedBlockSizes)
assert(
// the first and last block may be incomplete, so we slice them out
recordedBlocks.drop(1).dropRight(1).forall { block =>
......@@ -203,39 +207,91 @@ class ReceiverSuite extends FunSuite with Timeouts {
)
}
/**
* An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
* Test whether write ahead logs are generated by received,
* and automatically cleaned up. The clean up must be aware of the
* remember duration of the input streams. E.g., input streams on which window()
* has been applied must remember the data for longer, and hence corresponding
* WALs should be cleaned later.
*/
class FakeReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
@volatile var otherThread: Thread = null
@volatile var receiving = false
@volatile var onStartCalled = false
@volatile var onStopCalled = false
def onStart() {
otherThread = new Thread() {
override def run() {
receiving = true
while(!isStopped()) {
Thread.sleep(10)
}
test("write ahead log - generating and cleaning") {
val sparkConf = new SparkConf()
.setMaster("local[4]") // must be at least 3 as we are going to start 2 receivers
.setAppName(framework)
.set("spark.ui.enabled", "true")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
.set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
val batchDuration = Milliseconds(500)
val tempDirectory = Files.createTempDir()
val logDirectory1 = new File(checkpointDirToLogDir(tempDirectory.getAbsolutePath, 0))
val logDirectory2 = new File(checkpointDirToLogDir(tempDirectory.getAbsolutePath, 1))
val allLogFiles1 = new mutable.HashSet[String]()
val allLogFiles2 = new mutable.HashSet[String]()
logInfo("Temp checkpoint directory = " + tempDirectory)
def getBothCurrentLogFiles(): (Seq[String], Seq[String]) = {
(getCurrentLogFiles(logDirectory1), getCurrentLogFiles(logDirectory2))
}
def getCurrentLogFiles(logDirectory: File): Seq[String] = {
try {
if (logDirectory.exists()) {
logDirectory1.listFiles().filter { _.getName.startsWith("log") }.map { _.toString }
} else {
Seq.empty
}
} catch {
case e: Exception =>
Seq.empty
}
onStartCalled = true
otherThread.start()
}
def onStop() {
onStopCalled = true
otherThread.join()
def printLogFiles(message: String, files: Seq[String]) {
logInfo(s"$message (${files.size} files):\n" + files.mkString("\n"))
}
def reset() {
receiving = false
onStartCalled = false
onStopCalled = false
withStreamingContext(new StreamingContext(sparkConf, batchDuration)) { ssc =>
tempDirectory.deleteOnExit()
val receiver1 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
val receiver2 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
val receiverStream1 = ssc.receiverStream(receiver1)
val receiverStream2 = ssc.receiverStream(receiver2)
receiverStream1.register()
receiverStream2.window(batchDuration * 6).register() // 3 second window
ssc.checkpoint(tempDirectory.getAbsolutePath())
ssc.start()
// Run until sufficient WAL files have been generated and
// the first WAL files has been deleted
eventually(timeout(20 seconds), interval(batchDuration.milliseconds millis)) {
val (logFiles1, logFiles2) = getBothCurrentLogFiles()
allLogFiles1 ++= logFiles1
allLogFiles2 ++= logFiles2
if (allLogFiles1.size > 0) {
assert(!logFiles1.contains(allLogFiles1.toSeq.sorted.head))
}
if (allLogFiles2.size > 0) {
assert(!logFiles2.contains(allLogFiles2.toSeq.sorted.head))
}
assert(allLogFiles1.size >= 7)
assert(allLogFiles2.size >= 7)
}
ssc.stop(stopSparkContext = true, stopGracefully = true)
val sortedAllLogFiles1 = allLogFiles1.toSeq.sorted
val sortedAllLogFiles2 = allLogFiles2.toSeq.sorted
val (leftLogFiles1, leftLogFiles2) = getBothCurrentLogFiles()
printLogFiles("Receiver 0: all", sortedAllLogFiles1)
printLogFiles("Receiver 0: left", leftLogFiles1)
printLogFiles("Receiver 1: all", sortedAllLogFiles2)
printLogFiles("Receiver 1: left", leftLogFiles2)
// Verify that necessary latest log files are not deleted
// receiverStream1 needs to retain just the last batch = 1 log file
// receiverStream2 needs to retain 3 seconds (3-seconds window) = 3 log files
assert(sortedAllLogFiles1.takeRight(1).forall(leftLogFiles1.contains))
assert(sortedAllLogFiles2.takeRight(3).forall(leftLogFiles2.contains))
}
}
......@@ -315,3 +371,42 @@ class ReceiverSuite extends FunSuite with Timeouts {
}
}
/**
* An implementation of Receiver that is used for testing a receiver's life cycle.
*/
class FakeReceiver(sendData: Boolean = false) extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
@volatile var otherThread: Thread = null
@volatile var receiving = false
@volatile var onStartCalled = false
@volatile var onStopCalled = false
def onStart() {
otherThread = new Thread() {
override def run() {
receiving = true
var count = 0
while(!isStopped()) {
if (sendData) {
store(count)
count += 1
}
Thread.sleep(10)
}
}
}
onStartCalled = true
otherThread.start()
}
def onStop() {
onStopCalled = true
otherThread.join()
}
def reset() {
receiving = false
onStartCalled = false
onStopCalled = false
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment