Skip to content
Snippets Groups Projects
Commit 4ca72c84 authored by Mingfei's avatar Mingfei
Browse files

modify according to comments

parent 079820f7
No related branches found
No related tags found
No related merge requests found
...@@ -32,10 +32,10 @@ import org.apache.spark.executor.TaskMetrics ...@@ -32,10 +32,10 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
/** /**
* <p>A logger class to record runtime information for jobs in Spark. This class outputs one log file * A logger class to record runtime information for jobs in Spark. This class outputs one log file
* for each Spark job, containing RDD graph, tasks start/stop, shuffle information. <br> * for each Spark job, containing RDD graph, tasks start/stop, shuffle information.
* JobLogger is a subclass of SparkListener, use addSparkListener to add JobLogger to a SparkContext * JobLogger is a subclass of SparkListener, use addSparkListener to add JobLogger to a SparkContext
* after the SparkContext is created. <br> * after the SparkContext is created.
* Note that each JobLogger only works for one SparkContext * Note that each JobLogger only works for one SparkContext
* @param logDirName The base directory for the log files. * @param logDirName The base directory for the log files.
*/ */
...@@ -68,10 +68,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -68,10 +68,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
/** /**
* Create a log file for one job * Create a log file for one job
* @param jobID ID of the job * @param jobID ID of the job
* @return No return * @return No return
* @exception FileNotFoundException Fail to create log file * @exception FileNotFoundException Fail to create log file
*/ */
protected def createLogWriter(jobID: Int) { protected def createLogWriter(jobID: Int) {
try { try {
...@@ -83,9 +83,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -83,9 +83,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
/** /**
* Close log file, and clean the stage relationship in stageIDToJobID * Close log file, and clean the stage relationship in stageIDToJobID
* @param jobID ID of the job * @param jobID ID of the job
* @return No return * @return No return
*/ */
protected def closeLogWriter(jobID: Int) { protected def closeLogWriter(jobID: Int) {
jobIDToPrintWriter.get(jobID).foreach { fileWriter => jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
...@@ -99,11 +99,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -99,11 +99,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
/** /**
* Write info into log file * Write info into log file
* @param jobID ID of the job * @param jobID ID of the job
* @param info Info to be recorded * @param info Info to be recorded
* @param withTime Controls whether to record time stamp before the info, default is true * @param withTime Controls whether to record time stamp before the info, default is true
* @return No return * @return No return
*/ */
protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) { protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
var writeInfo = info var writeInfo = info
...@@ -115,21 +115,21 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -115,21 +115,21 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
/** /**
* Write info into log file * Write info into log file
* @param stageID ID of the stage * @param stageID ID of the stage
* @param info Info to be recorded * @param info Info to be recorded
* @param withTime Controls whether to record time stamp before the info, default is true * @param withTime Controls whether to record time stamp before the info, default is true
* @return No return * @return No return
*/ */
protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) { protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) {
stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime)) stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
} }
/** /**
* Build stage dependency for a job * Build stage dependency for a job
* @param jobID ID of the job * @param jobID ID of the job
* @param stage Root stage of the job * @param stage Root stage of the job
* @return No return * @return No return
*/ */
protected def buildJobDep(jobID: Int, stage: Stage) { protected def buildJobDep(jobID: Int, stage: Stage) {
if (stage.jobId == jobID) { if (stage.jobId == jobID) {
...@@ -145,9 +145,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -145,9 +145,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
/** /**
* Record stage dependency and RDD dependency for a stage * Record stage dependency and RDD dependency for a stage
* @param jobID Job ID of the stage * @param jobID Job ID of the stage
* @return No return * @return No return
*/ */
protected def recordStageDep(jobID: Int) { protected def recordStageDep(jobID: Int) {
def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = { def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
...@@ -176,9 +176,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -176,9 +176,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
/** /**
* Generate indents and convert to String * Generate indents and convert to String
* @param indent Number of indents * @param indent Number of indents
* @return string of indents * @return string of indents
*/ */
protected def indentString(indent: Int): String = { protected def indentString(indent: Int): String = {
val sb = new StringBuilder() val sb = new StringBuilder()
...@@ -189,9 +189,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -189,9 +189,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
/** /**
* Get RDD's name * Get RDD's name
* @param rdd Input RDD * @param rdd Input RDD
* @return String of RDD's name * @return String of RDD's name
*/ */
protected def getRddName(rdd: RDD[_]): String = { protected def getRddName(rdd: RDD[_]): String = {
var rddName = rdd.getClass.getSimpleName var rddName = rdd.getClass.getSimpleName
...@@ -202,11 +202,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -202,11 +202,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
/** /**
* Record RDD dependency graph in a stage * Record RDD dependency graph in a stage
* @param jobID Job ID of the stage * @param jobID Job ID of the stage
* @param rdd Root RDD of the stage * @param rdd Root RDD of the stage
* @param indent Indent number before info * @param indent Indent number before info
* @return No return * @return No return
*/ */
protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) { protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
val rddInfo = val rddInfo =
...@@ -227,11 +227,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -227,11 +227,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
/** /**
* Record stage dependency graph of a job * Record stage dependency graph of a job
* @param jobID Job ID of the stage * @param jobID Job ID of the stage
* @param stage Root stage of the job * @param stage Root stage of the job
* @param indent Indent number before info, default is 0 * @param indent Indent number before info, default is 0
* @return No return * @return No return
*/ */
protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0) { protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0) {
...@@ -253,12 +253,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -253,12 +253,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
/** /**
* Record task metrics into job log files, including execution info and shuffle metrics * Record task metrics into job log files, including execution info and shuffle metrics
* @param stageID Stage ID of the task * @param stageID Stage ID of the task
* @param status Status info of the task * @param status Status info of the task
* @param taskInfo Task description info * @param taskInfo Task description info
* @param taskMetrics Task running metrics * @param taskMetrics Task running metrics
* @return No return * @return No return
*/ */
protected def recordTaskMetrics(stageID: Int, status: String, protected def recordTaskMetrics(stageID: Int, status: String,
taskInfo: TaskInfo, taskMetrics: TaskMetrics) { taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
...@@ -285,9 +285,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -285,9 +285,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
/** /**
* When stage is submitted, record stage submit info * When stage is submitted, record stage submit info
* @param stageSubmitted Stage submitted event * @param stageSubmitted Stage submitted event
* @return No return * @return No return
*/ */
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
stageLogInfo(stageSubmitted.stage.stageId,"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format( stageLogInfo(stageSubmitted.stage.stageId,"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
...@@ -295,9 +295,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -295,9 +295,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
/** /**
* When stage is completed, record stage completion status * When stage is completed, record stage completion status
* @param stageCompleted Stage completed event * @param stageCompleted Stage completed event
* @return No return * @return No return
*/ */
override def onStageCompleted(stageCompleted: StageCompleted) { override def onStageCompleted(stageCompleted: StageCompleted) {
stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format( stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
...@@ -307,9 +307,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -307,9 +307,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
override def onTaskStart(taskStart: SparkListenerTaskStart) { } override def onTaskStart(taskStart: SparkListenerTaskStart) { }
/** /**
* When task ends, record task completion status and metrics * When task ends, record task completion status and metrics
* @param taskEnd Task end event * @param taskEnd Task end event
* @return No return * @return No return
*/ */
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val task = taskEnd.task val task = taskEnd.task
...@@ -340,9 +340,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -340,9 +340,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
/** /**
* When job ends, recording job completion status and close log file * When job ends, recording job completion status and close log file
* @param jobEnd Job end event * @param jobEnd Job end event
* @return No return * @return No return
*/ */
override def onJobEnd(jobEnd: SparkListenerJobEnd) { override def onJobEnd(jobEnd: SparkListenerJobEnd) {
val job = jobEnd.job val job = jobEnd.job
...@@ -359,10 +359,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -359,10 +359,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
/** /**
* Record job properties into job log file * Record job properties into job log file
* @param jobID ID of the job * @param jobID ID of the job
* @param properties Properties of the job * @param properties Properties of the job
* @return No return * @return No return
*/ */
protected def recordJobProperties(jobID: Int, properties: Properties) { protected def recordJobProperties(jobID: Int, properties: Properties) {
if(properties != null) { if(properties != null) {
...@@ -372,9 +372,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -372,9 +372,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
/** /**
* When job starts, record job property and stage graph * When job starts, record job property and stage graph
* @param jobStart Job start event * @param jobStart Job start event
* @return No return * @return No return
*/ */
override def onJobStart(jobStart: SparkListenerJobStart) { override def onJobStart(jobStart: SparkListenerJobStart) {
val job = jobStart.job val job = jobStart.job
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment