Skip to content
Snippets Groups Projects
Commit aa7aa587 authored by Mingfei's avatar Mingfei
Browse files

some format modification

parent 52407951
No related branches found
No related tags found
No related merge requests found
...@@ -324,8 +324,7 @@ class DAGScheduler( ...@@ -324,8 +324,7 @@ class DAGScheduler(
for (job <- activeJobs) { for (job <- activeJobs) {
val error = new SparkException("Job cancelled because SparkContext was shut down") val error = new SparkException("Job cancelled because SparkContext was shut down")
job.listener.jobFailed(error) job.listener.jobFailed(error)
sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error))))
JobFailed(error))))
} }
return true return true
} }
...@@ -671,9 +670,9 @@ class DAGScheduler( ...@@ -671,9 +670,9 @@ class DAGScheduler(
val job = resultStageToJob(resultStage) val job = resultStageToJob(resultStage)
val error = new SparkException("Job failed: " + reason) val error = new SparkException("Job failed: " + reason)
job.listener.jobFailed(error) job.listener.jobFailed(error)
sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error))))
activeJobs -= job activeJobs -= job
resultStageToJob -= resultStage resultStageToJob -= resultStage
sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error))))
} }
if (dependentStages.isEmpty) { if (dependentStages.isEmpty) {
logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
......
...@@ -70,7 +70,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -70,7 +70,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
} }
// create a log file for one job, the file name is the jobID // Create a log file for one job, the file name is the jobID
protected def createLogWriter(jobID: Int) { protected def createLogWriter(jobID: Int) {
try{ try{
val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID) val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
...@@ -80,7 +80,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -80,7 +80,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
} }
// close log file for one job, and clean the stage relationship in stageIDToJobID // Close log file, and clean the stage relationship in stageIDToJobID
protected def closeLogWriter(jobID: Int) = protected def closeLogWriter(jobID: Int) =
jobIDToPrintWriter.get(jobID).foreach { fileWriter => jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
fileWriter.close() fileWriter.close()
...@@ -91,7 +91,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -91,7 +91,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
jobIDToStages -= jobID jobIDToStages -= jobID
} }
// write log information to log file, withTime parameter controls whether to recored // Write log information to log file, withTime parameter controls whether to recored
// time stamp for the information // time stamp for the information
protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) { protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
var writeInfo = info var writeInfo = info
...@@ -145,7 +145,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -145,7 +145,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
} }
// generate indents and convert to String // Generate indents and convert to String
protected def indentString(indent: Int) = { protected def indentString(indent: Int) = {
val sb = new StringBuilder() val sb = new StringBuilder()
for (i <- 1 to indent) { for (i <- 1 to indent) {
...@@ -190,7 +190,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -190,7 +190,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false) jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false)
} }
// record task metrics into job log files // Record task metrics into job log files
protected def recordTaskMetrics(stageID: Int, status: String, protected def recordTaskMetrics(stageID: Int, status: String,
taskInfo: TaskInfo, taskMetrics: TaskMetrics) { taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID +
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment