diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 4336f2f36d2e7f5c9f90ea102881e36ebfbaaa48..e412baa803c5935cead10f6a75d1eeea98229999 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -324,8 +324,7 @@ class DAGScheduler( for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, - JobFailed(error)))) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error)))) } return true } @@ -671,9 +670,9 @@ class DAGScheduler( val job = resultStageToJob(resultStage) val error = new SparkException("Job failed: " + reason) job.listener.jobFailed(error) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error)))) activeJobs -= job resultStageToJob -= resultStage - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error)))) } if (dependentStages.isEmpty) { logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index 760a0252b7091cdec8f924312b8099055311adc3..178bfaba3d923de7f116656e65902f7d5fa3ee9e 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -70,7 +70,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } } - // create a log file for one job, the file name is the jobID + // Create a log file for one job, the file name is the jobID protected def createLogWriter(jobID: Int) { try{ val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID) @@ -80,7 +80,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } } - // close log file for one job, and clean the stage relationship in stageIDToJobID + // Close log file, and clean the stage relationship in stageIDToJobID protected def closeLogWriter(jobID: Int) = jobIDToPrintWriter.get(jobID).foreach { fileWriter => fileWriter.close() @@ -91,7 +91,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { jobIDToStages -= jobID } - // write log information to log file, withTime parameter controls whether to recored + // Write log information to log file, withTime parameter controls whether to recored // time stamp for the information protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) { var writeInfo = info @@ -145,7 +145,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } } - // generate indents and convert to String + // Generate indents and convert to String protected def indentString(indent: Int) = { val sb = new StringBuilder() for (i <- 1 to indent) { @@ -190,7 +190,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false) } - // record task metrics into job log files + // Record task metrics into job log files protected def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics) { val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID +