Skip to content
Snippets Groups Projects
Commit 0b26a392 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #128 from shimingfei/joblogger-doc


add javadoc to JobLogger, and some small fix

against Spark-941

add javadoc to JobLogger, output more info for RDD, modify recordStageDepGraph to avoid output duplicate stage dependency information

(cherry picked from commit 518cf22e)
Signed-off-by: default avatarReynold Xin <rxin@apache.org>
parent 7a26104a
No related branches found
No related tags found
No related merge requests found
...@@ -24,16 +24,19 @@ import java.text.SimpleDateFormat ...@@ -24,16 +24,19 @@ import java.text.SimpleDateFormat
import java.util.{Date, Properties} import java.util.{Date, Properties}
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.{HashMap, ListBuffer} import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.StorageLevel
/** /**
* A logger class to record runtime information for jobs in Spark. This class outputs one log file * A logger class to record runtime information for jobs in Spark. This class outputs one log file
* per Spark job with information such as RDD graph, tasks start/stop, shuffle information. * for each Spark job, containing RDD graph, tasks start/stop, shuffle information.
* * JobLogger is a subclass of SparkListener, use addSparkListener to add JobLogger to a SparkContext
* after the SparkContext is created.
* Note that each JobLogger only works for one SparkContext
* @param logDirName The base directory for the log files. * @param logDirName The base directory for the log files.
*/ */
class JobLogger(val logDirName: String) extends SparkListener with Logging { class JobLogger(val logDirName: String) extends SparkListener with Logging {
...@@ -56,7 +59,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -56,7 +59,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
private[scheduler] def getJobIDToStages = jobIDToStages private[scheduler] def getJobIDToStages = jobIDToStages
private[scheduler] def getEventQueue = eventQueue private[scheduler] def getEventQueue = eventQueue
// Create a folder for log files, the folder's name is the creation time of the jobLogger /** Create a folder for log files, the folder's name is the creation time of jobLogger */
protected def createLogDir() { protected def createLogDir() {
val dir = new File(logDir + "/" + logDirName + "/") val dir = new File(logDir + "/" + logDirName + "/")
if (!dir.exists() && !dir.mkdirs()) { if (!dir.exists() && !dir.mkdirs()) {
...@@ -64,7 +67,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -64,7 +67,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
} }
// Create a log file for one job, the file name is the jobID /**
* Create a log file for one job
* @param jobID ID of the job
* @exception FileNotFoundException Fail to create log file
*/
protected def createLogWriter(jobID: Int) { protected def createLogWriter(jobID: Int) {
try { try {
val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID) val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
...@@ -74,8 +81,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -74,8 +81,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
} }
// Close log file, and clean the stage relationship in stageIDToJobID /**
protected def closeLogWriter(jobID: Int) = * Close log file, and clean the stage relationship in stageIDToJobID
* @param jobID ID of the job
*/
protected def closeLogWriter(jobID: Int) {
jobIDToPrintWriter.get(jobID).foreach { fileWriter => jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
fileWriter.close() fileWriter.close()
jobIDToStages.get(jobID).foreach(_.foreach{ stage => jobIDToStages.get(jobID).foreach(_.foreach{ stage =>
...@@ -84,9 +94,14 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -84,9 +94,14 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
jobIDToPrintWriter -= jobID jobIDToPrintWriter -= jobID
jobIDToStages -= jobID jobIDToStages -= jobID
} }
}
// Write log information to log file, withTime parameter controls whether to recored /**
// time stamp for the information * Write info into log file
* @param jobID ID of the job
* @param info Info to be recorded
* @param withTime Controls whether to record time stamp before the info, default is true
*/
protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) { protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
var writeInfo = info var writeInfo = info
if (withTime) { if (withTime) {
...@@ -96,9 +111,21 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -96,9 +111,21 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo)) jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
} }
protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) = /**
* Write info into log file
* @param stageID ID of the stage
* @param info Info to be recorded
* @param withTime Controls whether to record time stamp before the info, default is true
*/
protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) {
stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime)) stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
}
/**
* Build stage dependency for a job
* @param jobID ID of the job
* @param stage Root stage of the job
*/
protected def buildJobDep(jobID: Int, stage: Stage) { protected def buildJobDep(jobID: Int, stage: Stage) {
if (stage.jobId == jobID) { if (stage.jobId == jobID) {
jobIDToStages.get(jobID) match { jobIDToStages.get(jobID) match {
...@@ -112,6 +139,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -112,6 +139,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
} }
/**
* Record stage dependency and RDD dependency for a stage
* @param jobID Job ID of the stage
*/
protected def recordStageDep(jobID: Int) { protected def recordStageDep(jobID: Int) {
def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = { def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
var rddList = new ListBuffer[RDD[_]] var rddList = new ListBuffer[RDD[_]]
...@@ -138,8 +169,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -138,8 +169,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
} }
// Generate indents and convert to String /**
protected def indentString(indent: Int) = { * Generate indents and convert to String
* @param indent Number of indents
* @return string of indents
*/
protected def indentString(indent: Int): String = {
val sb = new StringBuilder() val sb = new StringBuilder()
for (i <- 1 to indent) { for (i <- 1 to indent) {
sb.append(" ") sb.append(" ")
...@@ -147,16 +182,34 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -147,16 +182,34 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
sb.toString() sb.toString()
} }
protected def getRddName(rdd: RDD[_]) = { /**
var rddName = rdd.getClass.getName * Get RDD's name
* @param rdd Input RDD
* @return String of RDD's name
*/
protected def getRddName(rdd: RDD[_]): String = {
var rddName = rdd.getClass.getSimpleName
if (rdd.name != null) { if (rdd.name != null) {
rddName = rdd.name rddName = rdd.name
} }
rddName rddName
} }
/**
* Record RDD dependency graph in a stage
* @param jobID Job ID of the stage
* @param rdd Root RDD of the stage
* @param indent Indent number before info
*/
protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) { protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")" val rddInfo =
if (rdd.getStorageLevel != StorageLevel.NONE) {
"RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " +
rdd.origin + " " + rdd.generator
} else {
"RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " +
rdd.origin + " " + rdd.generator
}
jobLogInfo(jobID, indentString(indent) + rddInfo, false) jobLogInfo(jobID, indentString(indent) + rddInfo, false)
rdd.dependencies.foreach { rdd.dependencies.foreach {
case shufDep: ShuffleDependency[_, _] => case shufDep: ShuffleDependency[_, _] =>
...@@ -166,7 +219,13 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -166,7 +219,13 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
} }
protected def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) { /**
* Record stage dependency graph of a job
* @param jobID Job ID of the stage
* @param stage Root stage of the job
* @param indent Indent number before info, default is 0
*/
protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0) {
val stageInfo = if (stage.isShuffleMap) { val stageInfo = if (stage.isShuffleMap) {
"STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId
} else { } else {
...@@ -174,14 +233,23 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -174,14 +233,23 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
if (stage.jobId == jobID) { if (stage.jobId == jobID) {
jobLogInfo(jobID, indentString(indent) + stageInfo, false) jobLogInfo(jobID, indentString(indent) + stageInfo, false)
recordRddInStageGraph(jobID, stage.rdd, indent) if (!idSet.contains(stage.id)) {
stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2)) idSet += stage.id
recordRddInStageGraph(jobID, stage.rdd, indent)
stage.parents.foreach(recordStageDepGraph(jobID, _, idSet, indent + 2))
}
} else { } else {
jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false) jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
} }
} }
// Record task metrics into job log files /**
* Record task metrics into job log files, including execution info and shuffle metrics
* @param stageID Stage ID of the task
* @param status Status info of the task
* @param taskInfo Task description info
* @param taskMetrics Task running metrics
*/
protected def recordTaskMetrics(stageID: Int, status: String, protected def recordTaskMetrics(stageID: Int, status: String,
taskInfo: TaskInfo, taskMetrics: TaskMetrics) { taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID +
...@@ -206,18 +274,30 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -206,18 +274,30 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics) stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
} }
/**
* When stage is submitted, record stage submit info
* @param stageSubmitted Stage submitted event
*/
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
stageLogInfo(stageSubmitted.stage.stageId,"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format( stageLogInfo(stageSubmitted.stage.stageId,"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
stageSubmitted.stage.stageId, stageSubmitted.stage.numTasks)) stageSubmitted.stage.stageId, stageSubmitted.stage.numTasks))
} }
/**
* When stage is completed, record stage completion status
* @param stageCompleted Stage completed event
*/
override def onStageCompleted(stageCompleted: StageCompleted) { override def onStageCompleted(stageCompleted: StageCompleted) {
stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format( stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
stageCompleted.stage.stageId)) stageCompleted.stage.stageId))
} }
override def onTaskStart(taskStart: SparkListenerTaskStart) { } override def onTaskStart(taskStart: SparkListenerTaskStart) { }
/**
* When task ends, record task completion status and metrics
* @param taskEnd Task end event
*/
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val task = taskEnd.task val task = taskEnd.task
val taskInfo = taskEnd.taskInfo val taskInfo = taskEnd.taskInfo
...@@ -246,6 +326,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -246,6 +326,10 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
} }
} }
/**
* When job ends, recording job completion status and close log file
* @param jobEnd Job end event
*/
override def onJobEnd(jobEnd: SparkListenerJobEnd) { override def onJobEnd(jobEnd: SparkListenerJobEnd) {
val job = jobEnd.job val job = jobEnd.job
var info = "JOB_ID=" + job.jobId var info = "JOB_ID=" + job.jobId
...@@ -259,14 +343,23 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -259,14 +343,23 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase) jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase)
closeLogWriter(job.jobId) closeLogWriter(job.jobId)
} }
/**
* Record job properties into job log file
* @param jobID ID of the job
* @param properties Properties of the job
*/
protected def recordJobProperties(jobID: Int, properties: Properties) { protected def recordJobProperties(jobID: Int, properties: Properties) {
if(properties != null) { if(properties != null) {
val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "") val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
jobLogInfo(jobID, description, false) jobLogInfo(jobID, description, false)
} }
} }
/**
* When job starts, record job property and stage graph
* @param jobStart Job start event
*/
override def onJobStart(jobStart: SparkListenerJobStart) { override def onJobStart(jobStart: SparkListenerJobStart) {
val job = jobStart.job val job = jobStart.job
val properties = jobStart.properties val properties = jobStart.properties
...@@ -274,7 +367,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { ...@@ -274,7 +367,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
recordJobProperties(job.jobId, properties) recordJobProperties(job.jobId, properties)
buildJobDep(job.jobId, job.finalStage) buildJobDep(job.jobId, job.finalStage)
recordStageDep(job.jobId) recordStageDep(job.jobId)
recordStageDepGraph(job.jobId, job.finalStage) recordStageDepGraph(job.jobId, job.finalStage, new HashSet[Int])
jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED") jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED")
} }
} }
...@@ -65,7 +65,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers ...@@ -65,7 +65,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
val rootStageInfo = new StageInfo(rootStage) val rootStageInfo = new StageInfo(rootStage)
joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStageInfo, null)) joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStageInfo, null))
joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName) joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getSimpleName)
parentRdd.setName("MyRDD") parentRdd.setName("MyRDD")
joblogger.getRddNameTest(parentRdd) should be ("MyRDD") joblogger.getRddNameTest(parentRdd) should be ("MyRDD")
joblogger.createLogWriterTest(jobID) joblogger.createLogWriterTest(jobID)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment