Skip to content
Snippets Groups Projects
Commit 518cf22e authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #128 from shimingfei/joblogger-doc

add javadoc to JobLogger, and some small fix

against Spark-941

add javadoc to JobLogger, output more info for RDD, modify recordStageDepGraph to avoid output duplicate stage dependency information
parents 7e00dee2 7374376f
No related branches found
No related tags found
No related merge requests found
......@@ -24,16 +24,19 @@ import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.{HashMap, ListBuffer}
import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.StorageLevel
/**
* A logger class to record runtime information for jobs in Spark. This class outputs one log file
* per Spark job with information such as RDD graph, tasks start/stop, shuffle information.
*
* for each Spark job, containing RDD graph, tasks start/stop, shuffle information.
* JobLogger is a subclass of SparkListener, use addSparkListener to add JobLogger to a SparkContext
* after the SparkContext is created.
* Note that each JobLogger only works for one SparkContext
* @param logDirName The base directory for the log files.
*/
class JobLogger(val logDirName: String) extends SparkListener with Logging {
......@@ -56,7 +59,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
private[scheduler] def getJobIDToStages = jobIDToStages
private[scheduler] def getEventQueue = eventQueue
// Create a folder for log files, the folder's name is the creation time of the jobLogger
/** Create a folder for log files, the folder's name is the creation time of jobLogger */
protected def createLogDir() {
val dir = new File(logDir + "/" + logDirName + "/")
if (!dir.exists() && !dir.mkdirs()) {
......@@ -64,7 +67,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
// Create a log file for one job, the file name is the jobID
/**
* Create a log file for one job
* @param jobID ID of the job
* @exception FileNotFoundException Fail to create log file
*/
protected def createLogWriter(jobID: Int) {
try {
val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
......@@ -74,8 +81,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
// Close log file, and clean the stage relationship in stageIDToJobID
protected def closeLogWriter(jobID: Int) =
/**
* Close log file, and clean the stage relationship in stageIDToJobID
* @param jobID ID of the job
*/
protected def closeLogWriter(jobID: Int) {
jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
fileWriter.close()
jobIDToStages.get(jobID).foreach(_.foreach{ stage =>
......@@ -84,9 +94,14 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
jobIDToPrintWriter -= jobID
jobIDToStages -= jobID
}
}
// Write log information to log file, withTime parameter controls whether to recored
// time stamp for the information
/**
* Write info into log file
* @param jobID ID of the job
* @param info Info to be recorded
* @param withTime Controls whether to record time stamp before the info, default is true
*/
protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
var writeInfo = info
if (withTime) {
......@@ -96,9 +111,21 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
}
protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) =
/**
* Write info into log file
* @param stageID ID of the stage
* @param info Info to be recorded
* @param withTime Controls whether to record time stamp before the info, default is true
*/
protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) {
stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
}
/**
* Build stage dependency for a job
* @param jobID ID of the job
* @param stage Root stage of the job
*/
protected def buildJobDep(jobID: Int, stage: Stage) {
if (stage.jobId == jobID) {
jobIDToStages.get(jobID) match {
......@@ -112,6 +139,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
/**
* Record stage dependency and RDD dependency for a stage
* @param jobID Job ID of the stage
*/
protected def recordStageDep(jobID: Int) {
def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
var rddList = new ListBuffer[RDD[_]]
......@@ -138,8 +169,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
// Generate indents and convert to String
protected def indentString(indent: Int) = {
/**
* Generate indents and convert to String
* @param indent Number of indents
* @return string of indents
*/
protected def indentString(indent: Int): String = {
val sb = new StringBuilder()
for (i <- 1 to indent) {
sb.append(" ")
......@@ -147,16 +182,34 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
sb.toString()
}
protected def getRddName(rdd: RDD[_]) = {
var rddName = rdd.getClass.getName
/**
* Get RDD's name
* @param rdd Input RDD
* @return String of RDD's name
*/
protected def getRddName(rdd: RDD[_]): String = {
var rddName = rdd.getClass.getSimpleName
if (rdd.name != null) {
rddName = rdd.name
}
rddName
}
/**
* Record RDD dependency graph in a stage
* @param jobID Job ID of the stage
* @param rdd Root RDD of the stage
* @param indent Indent number before info
*/
protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")"
val rddInfo =
if (rdd.getStorageLevel != StorageLevel.NONE) {
"RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " +
rdd.origin + " " + rdd.generator
} else {
"RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " +
rdd.origin + " " + rdd.generator
}
jobLogInfo(jobID, indentString(indent) + rddInfo, false)
rdd.dependencies.foreach {
case shufDep: ShuffleDependency[_, _] =>
......@@ -166,7 +219,13 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
protected def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) {
/**
* Record stage dependency graph of a job
* @param jobID Job ID of the stage
* @param stage Root stage of the job
* @param indent Indent number before info, default is 0
*/
protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0) {
val stageInfo = if (stage.isShuffleMap) {
"STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId
} else {
......@@ -174,14 +233,23 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
if (stage.jobId == jobID) {
jobLogInfo(jobID, indentString(indent) + stageInfo, false)
recordRddInStageGraph(jobID, stage.rdd, indent)
stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
if (!idSet.contains(stage.id)) {
idSet += stage.id
recordRddInStageGraph(jobID, stage.rdd, indent)
stage.parents.foreach(recordStageDepGraph(jobID, _, idSet, indent + 2))
}
} else {
jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
}
}
// Record task metrics into job log files
/**
* Record task metrics into job log files, including execution info and shuffle metrics
* @param stageID Stage ID of the task
* @param status Status info of the task
* @param taskInfo Task description info
* @param taskMetrics Task running metrics
*/
protected def recordTaskMetrics(stageID: Int, status: String,
taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID +
......@@ -206,18 +274,30 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
}
/**
* When stage is submitted, record stage submit info
* @param stageSubmitted Stage submitted event
*/
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
stageLogInfo(stageSubmitted.stage.stageId,"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
stageSubmitted.stage.stageId, stageSubmitted.stage.numTasks))
}
/**
* When stage is completed, record stage completion status
* @param stageCompleted Stage completed event
*/
override def onStageCompleted(stageCompleted: StageCompleted) {
stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
stageCompleted.stage.stageId))
}
override def onTaskStart(taskStart: SparkListenerTaskStart) { }
/**
* When task ends, record task completion status and metrics
* @param taskEnd Task end event
*/
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val task = taskEnd.task
val taskInfo = taskEnd.taskInfo
......@@ -246,6 +326,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
/**
* When job ends, recording job completion status and close log file
* @param jobEnd Job end event
*/
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
val job = jobEnd.job
var info = "JOB_ID=" + job.jobId
......@@ -259,14 +343,23 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase)
closeLogWriter(job.jobId)
}
/**
* Record job properties into job log file
* @param jobID ID of the job
* @param properties Properties of the job
*/
protected def recordJobProperties(jobID: Int, properties: Properties) {
if(properties != null) {
val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
jobLogInfo(jobID, description, false)
}
}
/**
* When job starts, record job property and stage graph
* @param jobStart Job start event
*/
override def onJobStart(jobStart: SparkListenerJobStart) {
val job = jobStart.job
val properties = jobStart.properties
......@@ -274,7 +367,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
recordJobProperties(job.jobId, properties)
buildJobDep(job.jobId, job.finalStage)
recordStageDep(job.jobId)
recordStageDepGraph(job.jobId, job.finalStage)
recordStageDepGraph(job.jobId, job.finalStage, new HashSet[Int])
jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED")
}
}
......@@ -65,7 +65,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
val rootStageInfo = new StageInfo(rootStage)
joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStageInfo, null))
joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)
joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getSimpleName)
parentRdd.setName("MyRDD")
joblogger.getRddNameTest(parentRdd) should be ("MyRDD")
joblogger.createLogWriterTest(jobID)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment