Skip to content
Snippets Groups Projects
Commit 8e88db3c authored by Tathagata Das's avatar Tathagata Das
Browse files

Bug fixes to the DriverRunner and minor changes here and there.

parent 3d447433
No related branches found
No related tags found
No related merge requests found
# A Spark Worker will be started on each of the machines listed below.
localhost
\ No newline at end of file
ec2-54-221-59-252.compute-1.amazonaws.com
ec2-67-202-26-243.compute-1.amazonaws.com
ec2-23-22-220-97.compute-1.amazonaws.com
ec2-50-16-98-100.compute-1.amazonaws.com
ec2-54-234-164-206.compute-1.amazonaws.com
......@@ -119,15 +119,15 @@ private[spark] class DriverRunner(
val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS path
val jarFileSystem = jarPath.getFileSystem(emptyConf)
val destPath = new Path(driverDir.getAbsolutePath())
val destFileSystem = destPath.getFileSystem(emptyConf)
val destPath = new File(driverDir.getAbsolutePath(), jarPath.getName())
// val destFileSystem = destPath.getFileSystem(emptyConf)
val jarFileName = jarPath.getName
val localJarFile = new File(driverDir, jarFileName)
val localJarFilename = localJarFile.getAbsolutePath
if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
logInfo(s"Copying user jar $jarPath to $destPath")
FileUtil.copy(jarFileSystem, jarPath, destFileSystem, destPath, false, false, emptyConf)
FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf)
}
if (!localJarFile.exists()) { // Verify copy succeeded
......@@ -161,7 +161,7 @@ private[spark] class DriverRunner(
val stderr = new File(baseDir, "stderr")
val header = "Launch Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
Files.write(header, stderr, Charsets.UTF_8)
Files.append(header, stderr, Charsets.UTF_8)
CommandUtils.redirectStream(process.get.getErrorStream, stderr)
}
......
......@@ -175,7 +175,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
override def cleanup() { }
override def restore() {
hadoopFiles.foreach {
hadoopFiles.toSeq.sortBy(_._1)(Time.ordering).foreach {
case (t, f) => {
// Restore the metadata in both files and generatedRDDs
logInfo("Restoring files for time " + t + " - " +
......
......@@ -85,14 +85,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val checkpointTime = ssc.initialCheckpoint.checkpointTime
val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
val downTimes = checkpointTime.until(restartTime, batchDuration)
logInfo("Batches during down time: " + downTimes.mkString(", "))
logInfo("Batches during down time (" + downTimes.size + " batches): " + downTimes.mkString(", "))
// Batches that were unprocessed before failure
val pendingTimes = ssc.initialCheckpoint.pendingTimes
logInfo("Batches pending processing: " + pendingTimes.mkString(", "))
val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
logInfo("Batches pending processing (" + pendingTimes.size + " batches): " + pendingTimes.mkString(", "))
// Reschedule jobs for these times
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
logInfo("Batches to reschedule: " + timesToReschedule.mkString(", "))
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", "))
timesToReschedule.foreach(time =>
jobScheduler.runJobs(time, graph.generateJobs(time))
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment