diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index e1fb02157aafecd591d8dd2631c8bc88ed52eb60..3239f4c3854a91ab6ed773a1b201684bbbf6ed1b 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -58,6 +58,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
 
     CompletionIterator[(K,V), Iterator[(K,V)]](itr, {
       val shuffleMetrics = new ShuffleReadMetrics
+      shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
       shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
       shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
       shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index e6c0438d76da6498f103d02d7cb00ec1717803fc..8c0b7ca417add945bae2b1be1c336a88627a9ce8 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -114,6 +114,14 @@ abstract class RDD[T: ClassManifest](
     this
   }
 
+  /**User-defined generator of this RDD*/
+  var generator = Utils.getCallSiteInfo._4
+  
+  /**reset generator*/
+  def setGenerator(_generator: String) = {
+    generator = _generator
+  }
+
   /**
    * Set this RDD's storage level to persist its values across operations after the first time
    * it is computed. This can only be used to assign a new storage level if the RDD does not
@@ -788,7 +796,7 @@ abstract class RDD[T: ClassManifest](
   private var storageLevel: StorageLevel = StorageLevel.NONE
 
   /** Record user function generating this RDD. */
-  private[spark] val origin = Utils.getSparkCallSite
+  private[spark] val origin = Utils.formatSparkCallSite
 
   private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]
 
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index bc05d08fd6ce94221026af4426ae29bacf1f0caa..b67a2066c8dd7c158d6819307a77e8500fbcdbf9 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -48,7 +48,7 @@ import spark.scheduler.local.LocalScheduler
 import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo}
 import spark.util.{MetadataCleaner, TimeStampedHashMap}
-
+import spark.scheduler.JobLogger
 
 /**
  * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -510,7 +510,7 @@ class SparkContext(
   def addSparkListener(listener: SparkListener) {
     dagScheduler.sparkListeners += listener
   }
-
+  addSparkListener(new JobLogger)
   /**
    * Return a map from the slave to the max memory available for caching and the remaining
    * memory available for caching.
@@ -630,7 +630,7 @@ class SparkContext(
       partitions: Seq[Int],
       allowLocal: Boolean,
       resultHandler: (Int, U) => Unit) {
-    val callSite = Utils.getSparkCallSite
+    val callSite = Utils.formatSparkCallSite
     logInfo("Starting job: " + callSite)
     val start = System.nanoTime
     val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.value)
@@ -713,7 +713,7 @@ class SparkContext(
       func: (TaskContext, Iterator[T]) => U,
       evaluator: ApproximateEvaluator[U, R],
       timeout: Long): PartialResult[R] = {
-    val callSite = Utils.getSparkCallSite
+    val callSite = Utils.formatSparkCallSite
     logInfo("Starting job: " + callSite)
     val start = System.nanoTime
     val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value)
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index ec15326014e8d1e8e7ab058c4c2c1118bfd26c79..1630b2b4b01e7f9c7d292ac26058dfbfafc1f2a6 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -528,7 +528,7 @@ private object Utils extends Logging {
    * (outside the spark package) that called into Spark, as well as which Spark method they called.
    * This is used, for example, to tell users where in their code each RDD got created.
    */
-  def getSparkCallSite: String = {
+  def getCallSiteInfo = {
     val trace = Thread.currentThread.getStackTrace().filter( el =>
       (!el.getMethodName.contains("getStackTrace")))
 
@@ -540,6 +540,7 @@ private object Utils extends Logging {
     var firstUserFile = "<unknown>"
     var firstUserLine = 0
     var finished = false
+    var firstUserClass = "<unknown>"
 
     for (el <- trace) {
       if (!finished) {
@@ -554,13 +555,18 @@ private object Utils extends Logging {
         else {
           firstUserLine = el.getLineNumber
           firstUserFile = el.getFileName
+          firstUserClass = el.getClassName
           finished = true
         }
       }
     }
-    "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
+    (lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
   }
 
+  def formatSparkCallSite = {
+    val callSiteInfo = getCallSiteInfo
+    "%s at %s:%s".format(callSiteInfo._1, callSiteInfo._2, callSiteInfo._3)
+  }
   /**
    * Try to find a free port to bind to on the local host. This should ideally never be needed,
    * except that, unfortunately, some of the networking libraries we currently rely on (e.g. Spray)
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 890938d48b667ee769097e2c6044334a96abb80c..8bebfafce4421e559d0bcdd0a7e77863bf702b58 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -104,6 +104,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
         val value = task.run(taskId.toInt)
         val taskFinish = System.currentTimeMillis()
         task.metrics.foreach{ m =>
+          m.hostname = Utils.localHostName
           m.executorDeserializeTime = (taskStart - startTime).toInt
           m.executorRunTime = (taskFinish - taskStart).toInt
         }
diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala
index a7c56c237199b2cb98366219282d02bcc521389b..26e80293657c4231b32fab1067cb6ee73ec1c2b3 100644
--- a/core/src/main/scala/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/spark/executor/TaskMetrics.scala
@@ -1,6 +1,11 @@
 package spark.executor
 
 class TaskMetrics extends Serializable {
+  /**
+   * host's name the task runs on 
+   */
+  var hostname: String = _
+
   /**
    * Time taken on the executor to deserialize this task
    */
@@ -33,10 +38,15 @@ object TaskMetrics {
 
 
 class ShuffleReadMetrics extends Serializable {
+  /**
+   * Time when shuffle finishs
+   */
+  var shuffleFinishTime: Long = _
+
   /**
    * Total number of blocks fetched in a shuffle (remote or local)
    */
-  var totalBlocksFetched : Int = _
+  var totalBlocksFetched: Int = _
 
   /**
    * Number of remote blocks fetched in a shuffle
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 7feeb9754289cfb294b195f5579089725aca6b55..43dd7d6534cf46599f68adf54dd7c4e083d2289c 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -289,6 +289,7 @@ class DAGScheduler(
         val finalStage = newStage(finalRDD, None, runId)
         val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties)
         clearCacheLocs()
+        sparkListeners.foreach(_.onJobStart(job, properties))
         logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length +
                 " output partitions (allowLocal=" + allowLocal + ")")
         logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")")
@@ -311,6 +312,7 @@ class DAGScheduler(
         handleExecutorLost(execId)
 
       case completion: CompletionEvent =>
+        sparkListeners.foreach(_.onTaskEnd(completion))
         handleTaskCompletion(completion)
 
       case TaskSetFailed(taskSet, reason) =>
@@ -321,6 +323,8 @@ class DAGScheduler(
         for (job <- activeJobs) {
           val error = new SparkException("Job cancelled because SparkContext was shut down")
           job.listener.jobFailed(error)
+          val JobCancelEvent = new SparkListenerJobCancelled("SPARKCONTEXT_SHUTDOWN")
+          sparkListeners.foreach(_.onJobEnd(job, JobCancelEvent))
         }
         return true
     }
@@ -468,6 +472,7 @@ class DAGScheduler(
       }
     }
     if (tasks.size > 0) {
+      sparkListeners.foreach(_.onStageSubmitted(stage, "TASKS_SIZE=" + tasks.size))
       logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
       myPending ++= tasks
       logDebug("New pending tasks: " + myPending)
@@ -522,6 +527,7 @@ class DAGScheduler(
                     activeJobs -= job
                     resultStageToJob -= stage
                     markStageAsFinished(stage)
+                    sparkListeners.foreach(_.onJobEnd(job, SparkListenerJobSuccess))
                   }
                   job.listener.taskSucceeded(rt.outputId, event.result)
                 }
@@ -665,6 +671,8 @@ class DAGScheduler(
       job.listener.jobFailed(new SparkException("Job failed: " + reason))
       activeJobs -= job
       resultStageToJob -= resultStage
+      val jobFailedEvent = new SparkListenerJobFailed(failedStage)
+      sparkListeners.foreach(_.onJobEnd(job, jobFailedEvent))
     }
     if (dependentStages.isEmpty) {
       logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f87acfd0b61c03b0e1944aa78c4a79c8731a4154
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -0,0 +1,317 @@
+package spark.scheduler
+
+import java.io.PrintWriter
+import java.io.File
+import java.io.FileNotFoundException
+import java.text.SimpleDateFormat
+import java.util.{Date, Properties}
+import java.util.concurrent.LinkedBlockingQueue
+import scala.collection.mutable.{Map, HashMap, ListBuffer}
+import scala.io.Source
+import spark._
+import spark.executor.TaskMetrics
+import spark.scheduler.cluster.TaskInfo
+
+// used to record runtime information for each job, including RDD graph 
+// tasks' start/stop shuffle information and information from outside
+
+sealed trait JobLoggerEvent
+case class JobLoggerOnJobStart(job: ActiveJob, properties: Properties) extends JobLoggerEvent
+case class JobLoggerOnStageSubmitted(stage: Stage, info: String) extends JobLoggerEvent
+case class JobLoggerOnStageCompleted(stageCompleted: StageCompleted) extends JobLoggerEvent
+case class JobLoggerOnJobEnd(job: ActiveJob, event: SparkListenerEvents) extends JobLoggerEvent
+case class JobLoggerOnTaskEnd(event: CompletionEvent) extends JobLoggerEvent
+
+class JobLogger(val logDirName: String) extends SparkListener with Logging {
+  private val logDir =  
+    if (System.getenv("SPARK_LOG_DIR") != null)  
+      System.getenv("SPARK_LOG_DIR")
+    else 
+      "/tmp/spark"
+  private val jobIDToPrintWriter = new HashMap[Int, PrintWriter] 
+  private val stageIDToJobID = new HashMap[Int, Int]
+  private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]]
+  private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+  private val eventQueue = new LinkedBlockingQueue[JobLoggerEvent]
+  
+  createLogDir()
+  def this() = this(String.valueOf(System.currentTimeMillis()))
+  
+  def getLogDir = logDir
+  def getJobIDtoPrintWriter = jobIDToPrintWriter
+  def getStageIDToJobID = stageIDToJobID
+  def getJobIDToStages = jobIDToStages
+  def getEventQueue = eventQueue
+  
+  new Thread("JobLogger") {
+    setDaemon(true)
+    override def run() {
+      while (true) {
+        val event = eventQueue.take
+        if (event != null) {
+          logDebug("Got event of type " + event.getClass.getName)
+          event match {
+            case JobLoggerOnJobStart(job, info) =>
+              processJobStartEvent(job, info)
+            case JobLoggerOnStageSubmitted(stage, info) =>
+              processStageSubmittedEvent(stage, info)
+            case JobLoggerOnStageCompleted(stageCompleted) =>
+              processStageCompletedEvent(stageCompleted)
+            case JobLoggerOnJobEnd(job, event) =>
+              processJobEndEvent(job, event)
+            case JobLoggerOnTaskEnd(event) =>
+              processTaskEndEvent(event)
+            case _ =>
+          }
+        }
+      }
+    }
+  }.start()
+
+  //create a folder for log files, the folder's name is the creation time of the jobLogger
+  protected def createLogDir() {
+    val dir = new File(logDir + "/" + logDirName + "/")
+    if (dir.exists()) {
+      return
+    }
+    if (dir.mkdirs() == false) {
+      logError("create log directory error:" + logDir + "/" + logDirName + "/")
+    }
+  }
+  
+  // create a log file for one job, the file name is the jobID
+  protected def createLogWriter(jobID: Int) {
+    try{
+      val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
+      jobIDToPrintWriter += (jobID -> fileWriter)
+      } catch {
+        case e: FileNotFoundException => e.printStackTrace()
+      }
+  }
+  
+  // close log file for one job, and clean the stage relationship in stageIDToJobID 
+  protected def closeLogWriter(jobID: Int) = 
+    jobIDToPrintWriter.get(jobID).foreach { fileWriter => 
+      fileWriter.close()
+      jobIDToStages.get(jobID).foreach(_.foreach{ stage => 
+        stageIDToJobID -= stage.id
+      })
+      jobIDToPrintWriter -= jobID
+      jobIDToStages -= jobID
+    }
+  
+  // write log information to log file, withTime parameter controls whether to recored 
+  // time stamp for the information
+  protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
+    var writeInfo = info
+    if (withTime) {
+      val date = new Date(System.currentTimeMillis())
+      writeInfo = DATE_FORMAT.format(date) + ": " +info
+    }
+    jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
+  }
+  
+  protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) = 
+    stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
+
+  protected def buildJobDep(jobID: Int, stage: Stage) {
+    if (stage.priority == jobID) {
+      jobIDToStages.get(jobID) match {
+        case Some(stageList) => stageList += stage
+        case None => val stageList = new  ListBuffer[Stage]
+                     stageList += stage
+                     jobIDToStages += (jobID -> stageList)
+      }
+      stageIDToJobID += (stage.id -> jobID)
+      stage.parents.foreach(buildJobDep(jobID, _))
+    }
+  }
+
+  protected def recordStageDep(jobID: Int) {
+    def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
+      var rddList = new ListBuffer[RDD[_]]
+      rddList += rdd
+      rdd.dependencies.foreach{ dep => dep match {
+          case shufDep: ShuffleDependency[_,_] =>
+          case _ => rddList ++= getRddsInStage(dep.rdd)
+        }
+      }
+      rddList
+    }
+    jobIDToStages.get(jobID).foreach {_.foreach { stage => 
+        var depRddDesc: String = ""
+        getRddsInStage(stage.rdd).foreach { rdd => 
+          depRddDesc += rdd.id + ","
+        }
+        var depStageDesc: String = ""
+        stage.parents.foreach { stage => 
+          depStageDesc += "(" + stage.id + "," + stage.shuffleDep.get.shuffleId + ")"
+        }
+        jobLogInfo(jobID, "STAGE_ID=" + stage.id + " RDD_DEP=(" + 
+                   depRddDesc.substring(0, depRddDesc.length - 1) + ")" + 
+                   " STAGE_DEP=" + depStageDesc, false)
+      }
+    }
+  }
+  
+  // generate indents and convert to String
+  protected def indentString(indent: Int) = {
+    val sb = new StringBuilder()
+    for (i <- 1 to indent) {
+      sb.append(" ")
+    }
+    sb.toString()
+  }
+  
+  protected def getRddName(rdd: RDD[_]) = {
+    var rddName = rdd.getClass.getName
+    if (rdd.name != null) {
+      rddName = rdd.name 
+    }
+    rddName
+  }
+  
+  protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
+    val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")"
+    jobLogInfo(jobID, indentString(indent) + rddInfo, false)
+    rdd.dependencies.foreach{ dep => dep match {
+        case shufDep: ShuffleDependency[_,_] => 
+          val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId
+          jobLogInfo(jobID, indentString(indent + 1) + depInfo, false)
+        case _ => recordRddInStageGraph(jobID, dep.rdd, indent + 1)
+      }
+    }
+  }
+  
+  protected def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) {
+    var stageInfo: String = ""
+    if (stage.isShuffleMap) {
+      stageInfo = "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + 
+                  stage.shuffleDep.get.shuffleId
+    }else{
+      stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE"
+    }
+    if (stage.priority == jobID) {
+      jobLogInfo(jobID, indentString(indent) + stageInfo, false)
+      recordRddInStageGraph(jobID, stage.rdd, indent)
+      stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
+    } else
+      jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false)
+  }
+  
+  // record task metrics into job log files
+  protected def recordTaskMetrics(stageID: Int, status: String, 
+                                taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
+    val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + 
+               " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + 
+               " EXECUTOR_ID=" + taskInfo.executorId +  " HOST=" + taskMetrics.hostname
+    val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
+    val readMetrics = 
+      taskMetrics.shuffleReadMetrics match {
+        case Some(metrics) => 
+          " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime + 
+          " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + 
+          " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + 
+          " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + 
+          " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + 
+          " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + 
+          " REMOTE_BYTES_READ=" + metrics.remoteBytesRead
+        case None => ""
+      }
+    val writeMetrics = 
+      taskMetrics.shuffleWriteMetrics match {
+        case Some(metrics) => 
+          " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
+        case None => ""
+      }
+    stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
+  }
+  
+  override def onStageSubmitted(stage: Stage, info: String = "") {
+    eventQueue.put(JobLoggerOnStageSubmitted(stage, info))
+  }
+
+  protected def processStageSubmittedEvent(stage: Stage, info: String) {
+    stageLogInfo(stage.id, "STAGE_ID=" + stage.id + " STATUS=SUBMITTED " + info)
+  }
+  
+  override def onStageCompleted(stageCompleted: StageCompleted) {
+    eventQueue.put(JobLoggerOnStageCompleted(stageCompleted))
+  }
+
+  protected def processStageCompletedEvent(stageCompleted: StageCompleted) {
+    stageLogInfo(stageCompleted.stageInfo.stage.id, "STAGE_ID=" + 
+                 stageCompleted.stageInfo.stage.id + " STATUS=COMPLETED")
+    
+  }
+  
+  override def onTaskEnd(event: CompletionEvent) {
+    eventQueue.put(JobLoggerOnTaskEnd(event))
+  }
+
+  protected def processTaskEndEvent(event: CompletionEvent) {
+    var taskStatus = ""
+    event.task match {
+      case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
+      case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
+    }
+    event.reason match {
+      case Success => taskStatus += " STATUS=SUCCESS"
+        recordTaskMetrics(event.task.stageId, taskStatus, event.taskInfo, event.taskMetrics)
+      case Resubmitted => 
+        taskStatus += " STATUS=RESUBMITTED TID=" + event.taskInfo.taskId + 
+                      " STAGE_ID=" + event.task.stageId
+        stageLogInfo(event.task.stageId, taskStatus)
+      case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => 
+        taskStatus += " STATUS=FETCHFAILED TID=" + event.taskInfo.taskId + " STAGE_ID=" + 
+                      event.task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" + 
+                      mapId + " REDUCE_ID=" + reduceId
+        stageLogInfo(event.task.stageId, taskStatus)
+      case OtherFailure(message) => 
+        taskStatus += " STATUS=FAILURE TID=" + event.taskInfo.taskId + 
+                      " STAGE_ID=" + event.task.stageId + " INFO=" + message
+        stageLogInfo(event.task.stageId, taskStatus)
+      case _ =>
+    }
+  }
+  
+  override def onJobEnd(job: ActiveJob, event: SparkListenerEvents) {
+    eventQueue.put(JobLoggerOnJobEnd(job, event))
+  }
+
+  protected def processJobEndEvent(job: ActiveJob, event: SparkListenerEvents) {
+    var info = "JOB_ID=" + job.runId + " STATUS="
+    var validEvent = true
+    event match {
+      case SparkListenerJobSuccess => info += "SUCCESS"
+      case SparkListenerJobFailed(failedStage) => 
+        info += "FAILED REASON=STAGE_FAILED FAILED_STAGE_ID=" + failedStage.id
+      case SparkListenerJobCancelled(reason) => info += "CANCELLED REASON=" + reason
+      case _ => validEvent = false
+    }
+    if (validEvent) {
+      jobLogInfo(job.runId, info)
+      closeLogWriter(job.runId)
+    }
+  }
+  
+  protected def recordJobProperties(jobID: Int, properties: Properties) {
+    if(properties != null) {
+      val annotation = properties.getProperty("spark.job.annotation", "")
+      jobLogInfo(jobID, annotation, false)
+    }
+  }
+
+  override def onJobStart(job: ActiveJob, properties: Properties = null) {
+    eventQueue.put(JobLoggerOnJobStart(job, properties))
+  }
+ 
+  protected def processJobStartEvent(job: ActiveJob, properties: Properties) {
+    createLogWriter(job.runId)
+    recordJobProperties(job.runId, properties)
+    buildJobDep(job.runId, job.finalStage)
+    recordStageDep(job.runId)
+    recordStageDepGraph(job.runId, job.finalStage) 
+    jobLogInfo(job.runId, "JOB_ID=" + job.runId + " STATUS=STARTED")
+  }
+}
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index a65140b145833c70e584936785ac8fe94d6b0bc2..9cf7f3ffc0ea0f2ac6d107d4a7b3f646c4a42262 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -1,27 +1,54 @@
 package spark.scheduler
 
+import java.util.Properties
 import spark.scheduler.cluster.TaskInfo
 import spark.util.Distribution
-import spark.{Utils, Logging}
+import spark.{Utils, Logging, SparkContext, TaskEndReason}
 import spark.executor.TaskMetrics
 
 trait SparkListener {
   /**
    * called when a stage is completed, with information on the completed stage
    */
-  def onStageCompleted(stageCompleted: StageCompleted)
+  def onStageCompleted(stageCompleted: StageCompleted) { }
+  
+  /**
+   * called when a stage is submitted
+   */
+  def onStageSubmitted(stage: Stage, info: String = "") { }
+
+  /**
+   * called when a task ends
+   */
+  def onTaskEnd(event: CompletionEvent) { }
+
+  /**
+   * called when a job starts
+   */
+  def onJobStart(job: ActiveJob, properties: Properties = null) { }
+  
+  /**
+   * called when a job ends
+   */
+  def onJobEnd(job: ActiveJob, event: SparkListenerEvents) { }
+  
 }
 
 sealed trait SparkListenerEvents
 
 case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
 
+case object SparkListenerJobSuccess extends SparkListenerEvents
+
+case class SparkListenerJobFailed(failedStage: Stage) extends SparkListenerEvents
+
+case class SparkListenerJobCancelled(reason: String) extends SparkListenerEvents
 
 /**
  * Simple SparkListener that logs a few summary statistics when each stage completes
  */
 class StatsReportListener extends SparkListener with Logging {
-  def onStageCompleted(stageCompleted: StageCompleted) {
+  override def onStageCompleted(stageCompleted: StageCompleted) {
     import spark.scheduler.StatsReportListener._
     implicit val sc = stageCompleted
     this.logInfo("Finished stage: " + stageCompleted.stageInfo)
diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..34fd8b995e141410e03cca5179066916c861eedb
--- /dev/null
+++ b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
@@ -0,0 +1,105 @@
+package spark.scheduler
+
+import java.util.Properties
+import java.util.concurrent.LinkedBlockingQueue
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+import scala.collection.mutable
+import spark._
+import spark.SparkContext._
+
+
+class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
+
+  test("inner method") {
+    sc = new SparkContext("local", "joblogger")
+    val joblogger = new JobLogger {
+      def createLogWriterTest(jobID: Int) = createLogWriter(jobID)
+      def closeLogWriterTest(jobID: Int) = closeLogWriter(jobID)
+      def getRddNameTest(rdd: RDD[_]) = getRddName(rdd)
+      def buildJobDepTest(jobID: Int, stage: Stage) = buildJobDep(jobID, stage) 
+    }
+    type MyRDD = RDD[(Int, Int)]
+    def makeRdd(
+        numPartitions: Int,
+        dependencies: List[Dependency[_]]
+      ): MyRDD = {
+      val maxPartition = numPartitions - 1
+      return new MyRDD(sc, dependencies) {
+        override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
+          throw new RuntimeException("should not be reached")
+        override def getPartitions = (0 to maxPartition).map(i => new Partition {
+          override def index = i
+        }).toArray
+      }
+    }
+    val jobID = 5
+    val parentRdd = makeRdd(4, Nil)
+    val shuffleDep = new ShuffleDependency(parentRdd, null)
+    val rootRdd = makeRdd(4, List(shuffleDep))
+    val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID) 
+    val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID)
+    
+    joblogger.onStageSubmitted(rootStage)
+    joblogger.getEventQueue.size should be (1)
+    joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)
+    parentRdd.setName("MyRDD")
+    joblogger.getRddNameTest(parentRdd) should be ("MyRDD")
+    joblogger.createLogWriterTest(jobID)
+    joblogger.getJobIDtoPrintWriter.size should be (1)
+    joblogger.buildJobDepTest(jobID, rootStage)
+    joblogger.getJobIDToStages.get(jobID).get.size should be (2)
+    joblogger.getStageIDToJobID.get(0) should be (Some(jobID))
+    joblogger.getStageIDToJobID.get(1) should be (Some(jobID))
+    joblogger.closeLogWriterTest(jobID)
+    joblogger.getStageIDToJobID.size should be (0)
+    joblogger.getJobIDToStages.size should be (0)
+    joblogger.getJobIDtoPrintWriter.size should be (0)
+  }
+  
+  test("inner variables") {
+    sc = new SparkContext("local[4]", "joblogger")
+    val joblogger = new JobLogger {
+      override protected def closeLogWriter(jobID: Int) = 
+        getJobIDtoPrintWriter.get(jobID).foreach { fileWriter => 
+          fileWriter.close()
+        }
+    }
+    sc.addSparkListener(joblogger)
+    val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
+    rdd.reduceByKey(_+_).collect()
+    
+    joblogger.getLogDir should be ("/tmp/spark")
+    joblogger.getJobIDtoPrintWriter.size should be (1)
+    joblogger.getStageIDToJobID.size should be (2)
+    joblogger.getStageIDToJobID.get(0) should be (Some(0))
+    joblogger.getStageIDToJobID.get(1) should be (Some(0))
+    joblogger.getJobIDToStages.size should be (1)
+  }
+  
+  
+  test("interface functions") {
+    sc = new SparkContext("local[4]", "joblogger")
+    val joblogger = new JobLogger {
+      var onTaskEndCount = 0
+      var onJobEndCount = 0 
+      var onJobStartCount = 0
+      var onStageCompletedCount = 0
+      var onStageSubmittedCount = 0
+      override def onTaskEnd(event: CompletionEvent)  = onTaskEndCount += 1
+      override def onJobEnd(job: ActiveJob, event: SparkListenerEvents) = onJobEndCount += 1
+      override def onJobStart(job: ActiveJob, properties: Properties) = onJobStartCount += 1
+      override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1
+      override def onStageSubmitted(stage: Stage, info: String = "") = onStageSubmittedCount += 1
+    }
+    sc.addSparkListener(joblogger)
+    val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
+    rdd.reduceByKey(_+_).collect()
+    
+    joblogger.onJobStartCount should be (1)
+    joblogger.onJobEndCount should be (1)
+    joblogger.onTaskEndCount should be (8)
+    joblogger.onStageSubmittedCount should be (2)
+    joblogger.onStageCompletedCount should be (2)
+  }
+}
diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
index 42a87d8b90fe57c111d4d438c8f033207d0fedbd..48aa67c543cf347db32bd664826dab339209305a 100644
--- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
@@ -77,7 +77,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
 
   class SaveStageInfo extends SparkListener {
     val stageInfos = mutable.Buffer[StageInfo]()
-    def onStageCompleted(stage: StageCompleted) {
+    override def onStageCompleted(stage: StageCompleted) {
       stageInfos += stage.stageInfo
     }
   }