Skip to content
Snippets Groups Projects
Commit 2213a5a4 authored by Tathagata Das's avatar Tathagata Das
Browse files

Merge branch 'driver-test' of github.com:tdas/incubator-spark into driver-test

parents 740730a1 4f609f79
No related branches found
No related tags found
No related merge requests found
......@@ -43,6 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val pendingTimes = ssc.scheduler.getPendingTimes()
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConf = ssc.conf
// do not save these configurations
sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port")
def validate() {
assert(master != null, "Checkpoint.master is null")
......@@ -53,6 +56,47 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
}
}
private[streaming]
object Checkpoint extends Logging {
val PREFIX = "checkpoint-"
val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r
/** Get the checkpoint file for the given checkpoint time */
def checkpointFile(checkpointDir: String, checkpointTime: Time) = {
new Path(checkpointDir, PREFIX + checkpointTime.milliseconds)
}
/** Get the checkpoint backup file for the given checkpoint time */
def checkpointBackupFile(checkpointDir: String, checkpointTime: Time) = {
new Path(checkpointDir, PREFIX + checkpointTime.milliseconds + ".bk")
}
/** Get checkpoint files present in the give directory, ordered by oldest-first */
def getCheckpointFiles(checkpointDir: String, fs: FileSystem): Seq[Path] = {
def sortFunc(path1: Path, path2: Path): Boolean = {
val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
(time1 < time2) || (time1 == time2 && bk1)
}
val path = new Path(checkpointDir)
if (fs.exists(path)) {
val statuses = fs.listStatus(path)
if (statuses != null) {
val paths = statuses.map(_.getPath)
val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty)
filtered.sortWith(sortFunc)
} else {
logWarning("Listing " + path + " returned null")
Seq.empty
}
} else {
logInfo("Checkpoint directory " + path + " does not exist")
Seq.empty
}
}
}
/**
* Convenience class to handle the writing of graph checkpoint to file
......@@ -60,58 +104,67 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
private[streaming]
class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration)
extends Logging {
val file = new Path(checkpointDir, "graph")
val MAX_ATTEMPTS = 3
val executor = Executors.newFixedThreadPool(1)
val compressionCodec = CompressionCodec.createCodec(conf)
// The file to which we actually write - and then "move" to file
val writeFile = new Path(file.getParent, file.getName + ".next")
// The file to which existing checkpoint is backed up (i.e. "moved")
val bakFile = new Path(file.getParent, file.getName + ".bk")
private var stopped = false
private var fs_ : FileSystem = _
// Removed code which validates whether there is only one CheckpointWriter per path 'file' since
// I did not notice any errors - reintroduce it ?
class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
def run() {
var attempts = 0
val startTime = System.currentTimeMillis()
val tempFile = new Path(checkpointDir, "temp")
val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime)
val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime)
while (attempts < MAX_ATTEMPTS && !stopped) {
attempts += 1
try {
logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
// This is inherently thread unsafe, so alleviating it by writing to '.next' and
// then moving it to the final file
val fos = fs.create(writeFile)
logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile + "'")
// Write checkpoint to temp file
fs.delete(tempFile, true) // just in case it exists
val fos = fs.create(tempFile)
fos.write(bytes)
fos.close()
// Back up existing checkpoint if it exists
if (fs.exists(file) && fs.rename(file, bakFile)) {
logDebug("Moved existing checkpoint file to " + bakFile)
// If the checkpoint file exists, back it up
// If the backup exists as well, just delete it, otherwise rename will fail
if (fs.exists(checkpointFile)) {
fs.delete(backupFile, true) // just in case it exists
if (!fs.rename(checkpointFile, backupFile)) {
logWarning("Could not rename " + checkpointFile + " to " + backupFile)
}
}
fs.delete(file, false) // paranoia
// Rename temp written file to the right location
if (fs.rename(writeFile, file)) {
val finishTime = System.currentTimeMillis()
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
jobGenerator.onCheckpointCompletion(checkpointTime)
} else {
throw new SparkException("Failed to rename checkpoint file from "
+ writeFile + " to " + file)
// Rename temp file to the final checkpoint file
if (!fs.rename(tempFile, checkpointFile)) {
logWarning("Could not rename " + tempFile + " to " + checkpointFile)
}
// Delete old checkpoint files
val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs)
if (allCheckpointFiles.size > 4) {
allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => {
logInfo("Deleting " + file)
fs.delete(file, true)
})
}
// All done, print success
val finishTime = System.currentTimeMillis()
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
jobGenerator.onCheckpointCompletion(checkpointTime)
return
} catch {
case ioe: IOException =>
logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
logWarning("Error in attempt " + attempts + " of writing checkpoint to " + checkpointFile, ioe)
reset()
}
}
logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'")
logWarning("Could not write checkpoint for time " + checkpointTime + " to file " + checkpointFile + "'")
}
}
......@@ -124,7 +177,7 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi
bos.close()
try {
executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
} catch {
case rej: RejectedExecutionException =>
logError("Could not submit checkpoint task to the thread pool executor", rej)
......@@ -147,7 +200,7 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi
}
private def fs = synchronized {
if (fs_ == null) fs_ = file.getFileSystem(hadoopConf)
if (fs_ == null) fs_ = new Path(checkpointDir).getFileSystem(hadoopConf)
fs_
}
......@@ -160,36 +213,21 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi
private[streaming]
object CheckpointReader extends Logging {
private val graphFileNames = Seq("graph", "graph.bk")
def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = {
val checkpointPath = new Path(checkpointDir)
def fs = checkpointPath.getFileSystem(hadoopConf)
// See if the checkpoint directory exists
if (!fs.exists(checkpointPath)) {
logInfo("Could not load checkpoint as path '" + checkpointPath + "' does not exist")
return None
}
// Try to find the checkpoint data
val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists)
if (existingFiles.isEmpty) {
logInfo("Could not load checkpoint as checkpoint data was not " +
"found in directory " + checkpointDir + "")
val statuses = fs.listStatus(checkpointPath)
if (statuses!=null) {
logInfo("Checkpoint directory " + checkpointDir + " contains the files:\n" +
statuses.mkString("\n"))
}
// Try to find the checkpoint files
val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse
if (checkpointFiles.isEmpty) {
return None
}
logInfo("Checkpoint files found: " + existingFiles.mkString(","))
// Try to read the checkpoint data
// Try to read the checkpoint files in the order
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
val compressionCodec = CompressionCodec.createCodec(conf)
existingFiles.foreach(file => {
logInfo("Attempting to load checkpoint from file '" + file + "'")
checkpointFiles.foreach(file => {
logInfo("Attempting to load checkpoint from file " + file)
try {
val fis = fs.open(file)
// ObjectInputStream uses the last defined user-defined class loader in the stack
......@@ -204,15 +242,17 @@ object CheckpointReader extends Logging {
ois.close()
fs.close()
cp.validate()
logInfo("Checkpoint successfully loaded from file '" + file + "'")
logInfo("Checkpoint successfully loaded from file " + file)
logInfo("Checkpoint was generated at time " + cp.checkpointTime)
return Some(cp)
} catch {
case e: Exception =>
logWarning("Error reading checkpoint from file '" + file + "'", e)
logWarning("Error reading checkpoint from file " + file, e)
}
})
throw new SparkException("Failed to read checkpoint from directory '" + checkpointDir + "'")
// If none of checkpoint files could be read, then throw exception
throw new SparkException("Failed to read checkpoint from directory " + checkpointPath)
}
}
......
......@@ -113,6 +113,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileModTimes.size)
if (timeTaken > slideDuration.milliseconds) {
logWarning(
"Time taken to find new files exceeds the batch size. " +
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment