From 0b26a392dfd3a33a0c6db55ed9392bde2b23e61a Mon Sep 17 00:00:00 2001 From: Reynold Xin <rxin@apache.org> Date: Mon, 4 Nov 2013 18:21:27 -0800 Subject: [PATCH] Merge pull request #128 from shimingfei/joblogger-doc add javadoc to JobLogger, and some small fix against Spark-941 add javadoc to JobLogger, output more info for RDD, modify recordStageDepGraph to avoid output duplicate stage dependency information (cherry picked from commit 518cf22eb2436d019e4f7087a38080ad4a20df58) Signed-off-by: Reynold Xin <rxin@apache.org> --- .../apache/spark/scheduler/JobLogger.scala | 143 +++++++++++++++--- .../spark/scheduler/JobLoggerSuite.scala | 2 +- 2 files changed, 119 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 12b0d74fb5..94f8b01285 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -24,16 +24,19 @@ import java.text.SimpleDateFormat import java.util.{Date, Properties} import java.util.concurrent.LinkedBlockingQueue -import scala.collection.mutable.{HashMap, ListBuffer} +import scala.collection.mutable.{HashMap, HashSet, ListBuffer} import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.executor.TaskMetrics +import org.apache.spark.storage.StorageLevel /** * A logger class to record runtime information for jobs in Spark. This class outputs one log file - * per Spark job with information such as RDD graph, tasks start/stop, shuffle information. - * + * for each Spark job, containing RDD graph, tasks start/stop, shuffle information. + * JobLogger is a subclass of SparkListener, use addSparkListener to add JobLogger to a SparkContext + * after the SparkContext is created. + * Note that each JobLogger only works for one SparkContext * @param logDirName The base directory for the log files. */ class JobLogger(val logDirName: String) extends SparkListener with Logging { @@ -56,7 +59,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { private[scheduler] def getJobIDToStages = jobIDToStages private[scheduler] def getEventQueue = eventQueue - // Create a folder for log files, the folder's name is the creation time of the jobLogger + /** Create a folder for log files, the folder's name is the creation time of jobLogger */ protected def createLogDir() { val dir = new File(logDir + "/" + logDirName + "/") if (!dir.exists() && !dir.mkdirs()) { @@ -64,7 +67,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } } - // Create a log file for one job, the file name is the jobID + /** + * Create a log file for one job + * @param jobID ID of the job + * @exception FileNotFoundException Fail to create log file + */ protected def createLogWriter(jobID: Int) { try { val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID) @@ -74,8 +81,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } } - // Close log file, and clean the stage relationship in stageIDToJobID - protected def closeLogWriter(jobID: Int) = + /** + * Close log file, and clean the stage relationship in stageIDToJobID + * @param jobID ID of the job + */ + protected def closeLogWriter(jobID: Int) { jobIDToPrintWriter.get(jobID).foreach { fileWriter => fileWriter.close() jobIDToStages.get(jobID).foreach(_.foreach{ stage => @@ -84,9 +94,14 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { jobIDToPrintWriter -= jobID jobIDToStages -= jobID } + } - // Write log information to log file, withTime parameter controls whether to recored - // time stamp for the information + /** + * Write info into log file + * @param jobID ID of the job + * @param info Info to be recorded + * @param withTime Controls whether to record time stamp before the info, default is true + */ protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) { var writeInfo = info if (withTime) { @@ -96,9 +111,21 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo)) } - protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) = + /** + * Write info into log file + * @param stageID ID of the stage + * @param info Info to be recorded + * @param withTime Controls whether to record time stamp before the info, default is true + */ + protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) { stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime)) - + } + + /** + * Build stage dependency for a job + * @param jobID ID of the job + * @param stage Root stage of the job + */ protected def buildJobDep(jobID: Int, stage: Stage) { if (stage.jobId == jobID) { jobIDToStages.get(jobID) match { @@ -112,6 +139,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } } + /** + * Record stage dependency and RDD dependency for a stage + * @param jobID Job ID of the stage + */ protected def recordStageDep(jobID: Int) { def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = { var rddList = new ListBuffer[RDD[_]] @@ -138,8 +169,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } } - // Generate indents and convert to String - protected def indentString(indent: Int) = { + /** + * Generate indents and convert to String + * @param indent Number of indents + * @return string of indents + */ + protected def indentString(indent: Int): String = { val sb = new StringBuilder() for (i <- 1 to indent) { sb.append(" ") @@ -147,16 +182,34 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { sb.toString() } - protected def getRddName(rdd: RDD[_]) = { - var rddName = rdd.getClass.getName + /** + * Get RDD's name + * @param rdd Input RDD + * @return String of RDD's name + */ + protected def getRddName(rdd: RDD[_]): String = { + var rddName = rdd.getClass.getSimpleName if (rdd.name != null) { rddName = rdd.name } rddName } + /** + * Record RDD dependency graph in a stage + * @param jobID Job ID of the stage + * @param rdd Root RDD of the stage + * @param indent Indent number before info + */ protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) { - val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")" + val rddInfo = + if (rdd.getStorageLevel != StorageLevel.NONE) { + "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " + + rdd.origin + " " + rdd.generator + } else { + "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " + + rdd.origin + " " + rdd.generator + } jobLogInfo(jobID, indentString(indent) + rddInfo, false) rdd.dependencies.foreach { case shufDep: ShuffleDependency[_, _] => @@ -166,7 +219,13 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } } - protected def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) { + /** + * Record stage dependency graph of a job + * @param jobID Job ID of the stage + * @param stage Root stage of the job + * @param indent Indent number before info, default is 0 + */ + protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0) { val stageInfo = if (stage.isShuffleMap) { "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId } else { @@ -174,14 +233,23 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } if (stage.jobId == jobID) { jobLogInfo(jobID, indentString(indent) + stageInfo, false) - recordRddInStageGraph(jobID, stage.rdd, indent) - stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2)) + if (!idSet.contains(stage.id)) { + idSet += stage.id + recordRddInStageGraph(jobID, stage.rdd, indent) + stage.parents.foreach(recordStageDepGraph(jobID, _, idSet, indent + 2)) + } } else { jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false) } } - // Record task metrics into job log files + /** + * Record task metrics into job log files, including execution info and shuffle metrics + * @param stageID Stage ID of the task + * @param status Status info of the task + * @param taskInfo Task description info + * @param taskMetrics Task running metrics + */ protected def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics) { val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + @@ -206,18 +274,30 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics) } + /** + * When stage is submitted, record stage submit info + * @param stageSubmitted Stage submitted event + */ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { stageLogInfo(stageSubmitted.stage.stageId,"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format( stageSubmitted.stage.stageId, stageSubmitted.stage.numTasks)) } + /** + * When stage is completed, record stage completion status + * @param stageCompleted Stage completed event + */ override def onStageCompleted(stageCompleted: StageCompleted) { stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format( stageCompleted.stage.stageId)) } - + override def onTaskStart(taskStart: SparkListenerTaskStart) { } - + + /** + * When task ends, record task completion status and metrics + * @param taskEnd Task end event + */ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val task = taskEnd.task val taskInfo = taskEnd.taskInfo @@ -246,6 +326,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } } + /** + * When job ends, recording job completion status and close log file + * @param jobEnd Job end event + */ override def onJobEnd(jobEnd: SparkListenerJobEnd) { val job = jobEnd.job var info = "JOB_ID=" + job.jobId @@ -259,14 +343,23 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase) closeLogWriter(job.jobId) } - + + /** + * Record job properties into job log file + * @param jobID ID of the job + * @param properties Properties of the job + */ protected def recordJobProperties(jobID: Int, properties: Properties) { if(properties != null) { val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "") jobLogInfo(jobID, description, false) } } - + + /** + * When job starts, record job property and stage graph + * @param jobStart Job start event + */ override def onJobStart(jobStart: SparkListenerJobStart) { val job = jobStart.job val properties = jobStart.properties @@ -274,7 +367,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { recordJobProperties(job.jobId, properties) buildJobDep(job.jobId, job.finalStage) recordStageDep(job.jobId) - recordStageDepGraph(job.jobId, job.finalStage) + recordStageDepGraph(job.jobId, job.finalStage, new HashSet[Int]) jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED") } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala index 8406093246..7d7ca9ba8c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala @@ -65,7 +65,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers val rootStageInfo = new StageInfo(rootStage) joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStageInfo, null)) - joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName) + joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getSimpleName) parentRdd.setName("MyRDD") joblogger.getRddNameTest(parentRdd) should be ("MyRDD") joblogger.createLogWriterTest(jobID) -- GitLab