Skip to content
Snippets Groups Projects
Commit 4f609f79 authored by Tathagata Das's avatar Tathagata Das
Browse files

Removed spark.hostPort and other setting from SparkConf before saving to checkpoint.

parent d7ec73ac
No related branches found
No related tags found
No related merge requests found
......@@ -43,6 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val pendingTimes = ssc.scheduler.getPendingTimes()
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConf = ssc.conf
// do not save these configurations
sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port")
def validate() {
assert(master != null, "Checkpoint.master is null")
......@@ -73,11 +76,7 @@ object Checkpoint extends Logging {
def sortFunc(path1: Path, path2: Path): Boolean = {
val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
logInfo("Path 1: " + path1 + " -> " + time1 + ", " + bk1)
logInfo("Path 2: " + path2 + " -> " + time2 + ", " + bk2)
val precede = (time1 < time2) || (time1 == time2 && bk1)
logInfo(precede.toString)
precede
(time1 < time2) || (time1 == time2 && bk1)
}
val path = new Path(checkpointDir)
......@@ -85,12 +84,8 @@ object Checkpoint extends Logging {
val statuses = fs.listStatus(path)
if (statuses != null) {
val paths = statuses.map(_.getPath)
logInfo("Paths = " + paths.map(_.getName).mkString(", "))
val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty)
logInfo("Filtered paths = " + filtered.map(_.getName).mkString(", "))
val sorted = filtered.sortWith(sortFunc)
logInfo("Sorted paths = " + sorted.map(_.getName).mkString(", "))
sorted
filtered.sortWith(sortFunc)
} else {
logWarning("Listing " + path + " returned null")
Seq.empty
......@@ -112,16 +107,9 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi
val MAX_ATTEMPTS = 3
val executor = Executors.newFixedThreadPool(1)
val compressionCodec = CompressionCodec.createCodec(conf)
// The file to which we actually write - and then "move" to file
// val writeFile = new Path(file.getParent, file.getName + ".next")
// The file to which existing checkpoint is backed up (i.e. "moved")
// val bakFile = new Path(file.getParent, file.getName + ".bk")
private var stopped = false
private var fs_ : FileSystem = _
// Removed code which validates whether there is only one CheckpointWriter per path 'file' since
// I did not notice any errors - reintroduce it ?
class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
def run() {
var attempts = 0
......@@ -189,7 +177,7 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi
bos.close()
try {
executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
} catch {
case rej: RejectedExecutionException =>
logError("Could not submit checkpoint task to the thread pool executor", rej)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment