diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 0ae6a505037a810deceba4fdd2397e46b1aff56b..a1d5e6cf62daa0a5847960b5243a75029cc79992 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -32,10 +32,10 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.StorageLevel /** - * <p>A logger class to record runtime information for jobs in Spark. This class outputs one log file - * for each Spark job, containing RDD graph, tasks start/stop, shuffle information. <br> + * A logger class to record runtime information for jobs in Spark. This class outputs one log file + * for each Spark job, containing RDD graph, tasks start/stop, shuffle information. * JobLogger is a subclass of SparkListener, use addSparkListener to add JobLogger to a SparkContext - * after the SparkContext is created. <br> + * after the SparkContext is created. * Note that each JobLogger only works for one SparkContext * @param logDirName The base directory for the log files. */ @@ -68,10 +68,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } /** - * Create a log file for one job - * @param jobID ID of the job - * @return No return - * @exception FileNotFoundException Fail to create log file + * Create a log file for one job + * @param jobID ID of the job + * @return No return + * @exception FileNotFoundException Fail to create log file */ protected def createLogWriter(jobID: Int) { try { @@ -83,9 +83,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } /** - * Close log file, and clean the stage relationship in stageIDToJobID - * @param jobID ID of the job - * @return No return + * Close log file, and clean the stage relationship in stageIDToJobID + * @param jobID ID of the job + * @return No return */ protected def closeLogWriter(jobID: Int) { jobIDToPrintWriter.get(jobID).foreach { fileWriter => @@ -99,11 +99,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } /** - * Write info into log file - * @param jobID ID of the job - * @param info Info to be recorded - * @param withTime Controls whether to record time stamp before the info, default is true - * @return No return + * Write info into log file + * @param jobID ID of the job + * @param info Info to be recorded + * @param withTime Controls whether to record time stamp before the info, default is true + * @return No return */ protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) { var writeInfo = info @@ -115,21 +115,21 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } /** - * Write info into log file - * @param stageID ID of the stage - * @param info Info to be recorded - * @param withTime Controls whether to record time stamp before the info, default is true - * @return No return + * Write info into log file + * @param stageID ID of the stage + * @param info Info to be recorded + * @param withTime Controls whether to record time stamp before the info, default is true + * @return No return */ protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) { stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime)) } /** - * Build stage dependency for a job - * @param jobID ID of the job - * @param stage Root stage of the job - * @return No return + * Build stage dependency for a job + * @param jobID ID of the job + * @param stage Root stage of the job + * @return No return */ protected def buildJobDep(jobID: Int, stage: Stage) { if (stage.jobId == jobID) { @@ -145,9 +145,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } /** - * Record stage dependency and RDD dependency for a stage - * @param jobID Job ID of the stage - * @return No return + * Record stage dependency and RDD dependency for a stage + * @param jobID Job ID of the stage + * @return No return */ protected def recordStageDep(jobID: Int) { def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = { @@ -176,9 +176,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } /** - * Generate indents and convert to String - * @param indent Number of indents - * @return string of indents + * Generate indents and convert to String + * @param indent Number of indents + * @return string of indents */ protected def indentString(indent: Int): String = { val sb = new StringBuilder() @@ -189,9 +189,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } /** - * Get RDD's name - * @param rdd Input RDD - * @return String of RDD's name + * Get RDD's name + * @param rdd Input RDD + * @return String of RDD's name */ protected def getRddName(rdd: RDD[_]): String = { var rddName = rdd.getClass.getSimpleName @@ -202,11 +202,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } /** - * Record RDD dependency graph in a stage - * @param jobID Job ID of the stage - * @param rdd Root RDD of the stage - * @param indent Indent number before info - * @return No return + * Record RDD dependency graph in a stage + * @param jobID Job ID of the stage + * @param rdd Root RDD of the stage + * @param indent Indent number before info + * @return No return */ protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) { val rddInfo = @@ -227,11 +227,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } /** - * Record stage dependency graph of a job - * @param jobID Job ID of the stage - * @param stage Root stage of the job - * @param indent Indent number before info, default is 0 - * @return No return + * Record stage dependency graph of a job + * @param jobID Job ID of the stage + * @param stage Root stage of the job + * @param indent Indent number before info, default is 0 + * @return No return */ protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0) { @@ -253,12 +253,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } /** - * Record task metrics into job log files, including execution info and shuffle metrics - * @param stageID Stage ID of the task - * @param status Status info of the task - * @param taskInfo Task description info - * @param taskMetrics Task running metrics - * @return No return + * Record task metrics into job log files, including execution info and shuffle metrics + * @param stageID Stage ID of the task + * @param status Status info of the task + * @param taskInfo Task description info + * @param taskMetrics Task running metrics + * @return No return */ protected def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics) { @@ -285,9 +285,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } /** - * When stage is submitted, record stage submit info - * @param stageSubmitted Stage submitted event - * @return No return + * When stage is submitted, record stage submit info + * @param stageSubmitted Stage submitted event + * @return No return */ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { stageLogInfo(stageSubmitted.stage.stageId,"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format( @@ -295,9 +295,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } /** - * When stage is completed, record stage completion status - * @param stageCompleted Stage completed event - * @return No return + * When stage is completed, record stage completion status + * @param stageCompleted Stage completed event + * @return No return */ override def onStageCompleted(stageCompleted: StageCompleted) { stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format( @@ -307,9 +307,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { override def onTaskStart(taskStart: SparkListenerTaskStart) { } /** - * When task ends, record task completion status and metrics - * @param taskEnd Task end event - * @return No return + * When task ends, record task completion status and metrics + * @param taskEnd Task end event + * @return No return */ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val task = taskEnd.task @@ -340,9 +340,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } /** - * When job ends, recording job completion status and close log file - * @param jobEnd Job end event - * @return No return + * When job ends, recording job completion status and close log file + * @param jobEnd Job end event + * @return No return */ override def onJobEnd(jobEnd: SparkListenerJobEnd) { val job = jobEnd.job @@ -359,10 +359,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } /** - * Record job properties into job log file - * @param jobID ID of the job - * @param properties Properties of the job - * @return No return + * Record job properties into job log file + * @param jobID ID of the job + * @param properties Properties of the job + * @return No return */ protected def recordJobProperties(jobID: Int, properties: Properties) { if(properties != null) { @@ -372,9 +372,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } /** - * When job starts, record job property and stage graph - * @param jobStart Job start event - * @return No return + * When job starts, record job property and stage graph + * @param jobStart Job start event + * @return No return */ override def onJobStart(jobStart: SparkListenerJobStart) { val job = jobStart.job