From aa7aa587beff22e2db50d2afadd95097856a299a Mon Sep 17 00:00:00 2001 From: Mingfei <mingfei.shi@intel.com> Date: Fri, 21 Jun 2013 17:48:41 +0800 Subject: [PATCH] some format modification --- core/src/main/scala/spark/scheduler/DAGScheduler.scala | 5 ++--- core/src/main/scala/spark/scheduler/JobLogger.scala | 10 +++++----- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 4336f2f36d..e412baa803 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -324,8 +324,7 @@ class DAGScheduler( for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, - JobFailed(error)))) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error)))) } return true } @@ -671,9 +670,9 @@ class DAGScheduler( val job = resultStageToJob(resultStage) val error = new SparkException("Job failed: " + reason) job.listener.jobFailed(error) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error)))) activeJobs -= job resultStageToJob -= resultStage - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error)))) } if (dependentStages.isEmpty) { logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index 760a0252b7..178bfaba3d 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -70,7 +70,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } } - // create a log file for one job, the file name is the jobID + // Create a log file for one job, the file name is the jobID protected def createLogWriter(jobID: Int) { try{ val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID) @@ -80,7 +80,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } } - // close log file for one job, and clean the stage relationship in stageIDToJobID + // Close log file, and clean the stage relationship in stageIDToJobID protected def closeLogWriter(jobID: Int) = jobIDToPrintWriter.get(jobID).foreach { fileWriter => fileWriter.close() @@ -91,7 +91,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { jobIDToStages -= jobID } - // write log information to log file, withTime parameter controls whether to recored + // Write log information to log file, withTime parameter controls whether to recored // time stamp for the information protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) { var writeInfo = info @@ -145,7 +145,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } } - // generate indents and convert to String + // Generate indents and convert to String protected def indentString(indent: Int) = { val sb = new StringBuilder() for (i <- 1 to indent) { @@ -190,7 +190,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false) } - // record task metrics into job log files + // Record task metrics into job log files protected def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics) { val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + -- GitLab