Skip to content
Snippets Groups Projects
Commit 9d3d9c82 authored by Tathagata Das's avatar Tathagata Das
Browse files

Refactored graph checkpoint file reading and writing code to make it cleaner and easily debuggable.

parent 4a5558ca
No related branches found
No related tags found
No related merge requests found
......@@ -53,6 +53,55 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
}
}
private[streaming]
object Checkpoint extends Logging {
val PREFIX = "checkpoint-"
val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r
/** Get the checkpoint file for the given checkpoint time */
def checkpointFile(checkpointDir: String, checkpointTime: Time) = {
new Path(checkpointDir, PREFIX + checkpointTime.milliseconds)
}
/** Get the checkpoint backup file for the given checkpoint time */
def checkpointBackupFile(checkpointDir: String, checkpointTime: Time) = {
new Path(checkpointDir, PREFIX + checkpointTime.milliseconds + ".bk")
}
/** Get checkpoint files present in the give directory, ordered by oldest-first */
def getCheckpointFiles(checkpointDir: String, fs: FileSystem): Seq[Path] = {
def sortFunc(path1: Path, path2: Path): Boolean = {
val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
logInfo("Path 1: " + path1 + " -> " + time1 + ", " + bk1)
logInfo("Path 2: " + path2 + " -> " + time2 + ", " + bk2)
val precede = (time1 < time2) || (time1 == time2 && bk1)
logInfo(precede.toString)
precede
}
val path = new Path(checkpointDir)
if (fs.exists(path)) {
val statuses = fs.listStatus(path)
if (statuses != null) {
val paths = statuses.map(_.getPath)
logInfo("Paths = " + paths.map(_.getName).mkString(", "))
val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty)
logInfo("Filtered paths = " + filtered.map(_.getName).mkString(", "))
val sorted = filtered.sortWith(sortFunc)
logInfo("Sorted paths = " + sorted.map(_.getName).mkString(", "))
sorted
} else {
logWarning("Listing " + path + " returned null")
Seq.empty
}
} else {
logInfo("Checkpoint directory " + path + " does not exist")
Seq.empty
}
}
}
/**
* Convenience class to handle the writing of graph checkpoint to file
......@@ -60,14 +109,13 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
private[streaming]
class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration)
extends Logging {
val file = new Path(checkpointDir, "graph")
val MAX_ATTEMPTS = 3
val executor = Executors.newFixedThreadPool(1)
val compressionCodec = CompressionCodec.createCodec(conf)
// The file to which we actually write - and then "move" to file
val writeFile = new Path(file.getParent, file.getName + ".next")
// val writeFile = new Path(file.getParent, file.getName + ".next")
// The file to which existing checkpoint is backed up (i.e. "moved")
val bakFile = new Path(file.getParent, file.getName + ".bk")
// val bakFile = new Path(file.getParent, file.getName + ".bk")
private var stopped = false
private var fs_ : FileSystem = _
......@@ -78,40 +126,57 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi
def run() {
var attempts = 0
val startTime = System.currentTimeMillis()
val tempFile = new Path(checkpointDir, "temp")
val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime)
val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime)
while (attempts < MAX_ATTEMPTS && !stopped) {
attempts += 1
try {
logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
// This is inherently thread unsafe, so alleviating it by writing to '.next' and
// then moving it to the final file
val fos = fs.create(writeFile)
logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile + "'")
// Write checkpoint to temp file
fs.delete(tempFile, true) // just in case it exists
val fos = fs.create(tempFile)
fos.write(bytes)
fos.close()
// Back up existing checkpoint if it exists
if (fs.exists(file) && fs.rename(file, bakFile)) {
logDebug("Moved existing checkpoint file to " + bakFile)
// If the checkpoint file exists, back it up
// If the backup exists as well, just delete it, otherwise rename will fail
if (fs.exists(checkpointFile)) {
fs.delete(backupFile, true) // just in case it exists
if (!fs.rename(checkpointFile, backupFile)) {
logWarning("Could not rename " + checkpointFile + " to " + backupFile)
}
}
// Rename temp file to the final checkpoint file
if (!fs.rename(tempFile, checkpointFile)) {
logWarning("Could not rename " + tempFile + " to " + checkpointFile)
}
fs.delete(file, false) // paranoia
// Rename temp written file to the right location
if (fs.rename(writeFile, file)) {
val finishTime = System.currentTimeMillis()
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
jobGenerator.onCheckpointCompletion(checkpointTime)
} else {
throw new SparkException("Failed to rename checkpoint file from "
+ writeFile + " to " + file)
// Delete old checkpoint files
val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs)
if (allCheckpointFiles.size > 4) {
allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => {
logInfo("Deleting " + file)
fs.delete(file, true)
})
}
// All done, print success
val finishTime = System.currentTimeMillis()
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
jobGenerator.onCheckpointCompletion(checkpointTime)
return
} catch {
case ioe: IOException =>
logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
logWarning("Error in attempt " + attempts + " of writing checkpoint to " + checkpointFile, ioe)
reset()
}
}
logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'")
logWarning("Could not write checkpoint for time " + checkpointTime + " to file " + checkpointFile + "'")
}
}
......@@ -147,7 +212,7 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi
}
private def fs = synchronized {
if (fs_ == null) fs_ = file.getFileSystem(hadoopConf)
if (fs_ == null) fs_ = new Path(checkpointDir).getFileSystem(hadoopConf)
fs_
}
......@@ -160,36 +225,21 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi
private[streaming]
object CheckpointReader extends Logging {
private val graphFileNames = Seq("graph", "graph.bk")
def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = {
val checkpointPath = new Path(checkpointDir)
def fs = checkpointPath.getFileSystem(hadoopConf)
// See if the checkpoint directory exists
if (!fs.exists(checkpointPath)) {
logInfo("Could not load checkpoint as path '" + checkpointPath + "' does not exist")
// Try to find the checkpoint files
val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse
if (checkpointFiles.isEmpty) {
return None
}
// Try to find the checkpoint data
val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists)
if (existingFiles.isEmpty) {
logInfo("Could not load checkpoint as checkpoint data was not " +
"found in directory " + checkpointDir + "")
val statuses = fs.listStatus(checkpointPath)
if (statuses!=null) {
logInfo("Checkpoint directory " + checkpointDir + " contains the files:\n" +
statuses.mkString("\n"))
}
return None
}
logInfo("Checkpoint files found: " + existingFiles.mkString(","))
// Try to read the checkpoint data
// Try to read the checkpoint files in the order
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
val compressionCodec = CompressionCodec.createCodec(conf)
existingFiles.foreach(file => {
logInfo("Attempting to load checkpoint from file '" + file + "'")
checkpointFiles.foreach(file => {
logInfo("Attempting to load checkpoint from file " + file)
try {
val fis = fs.open(file)
// ObjectInputStream uses the last defined user-defined class loader in the stack
......@@ -204,15 +254,17 @@ object CheckpointReader extends Logging {
ois.close()
fs.close()
cp.validate()
logInfo("Checkpoint successfully loaded from file '" + file + "'")
logInfo("Checkpoint successfully loaded from file " + file)
logInfo("Checkpoint was generated at time " + cp.checkpointTime)
return Some(cp)
} catch {
case e: Exception =>
logWarning("Error reading checkpoint from file '" + file + "'", e)
logWarning("Error reading checkpoint from file " + file, e)
}
})
throw new SparkException("Failed to read checkpoint from directory '" + checkpointDir + "'")
// If none of checkpoint files could be read, then throw exception
throw new SparkException("Failed to read checkpoint from directory " + checkpointPath)
}
}
......
......@@ -113,6 +113,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileModTimes.size)
if (timeTaken > slideDuration.milliseconds) {
logWarning(
"Time taken to find new files exceeds the batch size. " +
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment