diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index e1fb02157aafecd591d8dd2631c8bc88ed52eb60..3239f4c3854a91ab6ed773a1b201684bbbf6ed1b 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -58,6 +58,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
 
     CompletionIterator[(K,V), Iterator[(K,V)]](itr, {
       val shuffleMetrics = new ShuffleReadMetrics
+      shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
       shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
       shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
       shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 709271d4ebbdf3153b5505458e9313c3985291e9..f336c2ea1ea207e51315a729da4d8e472bbd2271 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -117,6 +117,14 @@ abstract class RDD[T: ClassManifest](
     this
   }
 
+  /** User-defined generator of this RDD*/
+  var generator = Utils.getCallSiteInfo.firstUserClass
+  
+  /** Reset generator*/
+  def setGenerator(_generator: String) = {
+    generator = _generator
+  }
+
   /**
    * Set this RDD's storage level to persist its values across operations after the first time
    * it is computed. This can only be used to assign a new storage level if the RDD does not
@@ -840,7 +848,7 @@ abstract class RDD[T: ClassManifest](
   private var storageLevel: StorageLevel = StorageLevel.NONE
 
   /** Record user function generating this RDD. */
-  private[spark] val origin = Utils.getSparkCallSite
+  private[spark] val origin = Utils.formatSparkCallSite
 
   private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]
 
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index bc05d08fd6ce94221026af4426ae29bacf1f0caa..70a9d7698c490dc3741db39e2dff7603b060fc7f 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -49,7 +49,6 @@ import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend
 import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo}
 import spark.util.{MetadataCleaner, TimeStampedHashMap}
 
-
 /**
  * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
  * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
@@ -630,7 +629,7 @@ class SparkContext(
       partitions: Seq[Int],
       allowLocal: Boolean,
       resultHandler: (Int, U) => Unit) {
-    val callSite = Utils.getSparkCallSite
+    val callSite = Utils.formatSparkCallSite
     logInfo("Starting job: " + callSite)
     val start = System.nanoTime
     val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.value)
@@ -713,7 +712,7 @@ class SparkContext(
       func: (TaskContext, Iterator[T]) => U,
       evaluator: ApproximateEvaluator[U, R],
       timeout: Long): PartialResult[R] = {
-    val callSite = Utils.getSparkCallSite
+    val callSite = Utils.formatSparkCallSite
     logInfo("Starting job: " + callSite)
     val start = System.nanoTime
     val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value)
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 645c18541e20519d2ba8050b0aeed4ced67c825c..f3621c6beee1ffb20121ce9055bf13e3bd7f7db1 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -522,13 +522,14 @@ private object Utils extends Logging {
     execute(command, new File("."))
   }
 
-
+  private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String, 
+                                    val firstUserLine: Int, val firstUserClass: String)
   /**
    * When called inside a class in the spark package, returns the name of the user code class
    * (outside the spark package) that called into Spark, as well as which Spark method they called.
    * This is used, for example, to tell users where in their code each RDD got created.
    */
-  def getSparkCallSite: String = {
+  def getCallSiteInfo: CallSiteInfo = {
     val trace = Thread.currentThread.getStackTrace().filter( el =>
       (!el.getMethodName.contains("getStackTrace")))
 
@@ -540,6 +541,7 @@ private object Utils extends Logging {
     var firstUserFile = "<unknown>"
     var firstUserLine = 0
     var finished = false
+    var firstUserClass = "<unknown>"
 
     for (el <- trace) {
       if (!finished) {
@@ -554,13 +556,19 @@ private object Utils extends Logging {
         else {
           firstUserLine = el.getLineNumber
           firstUserFile = el.getFileName
+          firstUserClass = el.getClassName
           finished = true
         }
       }
     }
-    "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
+    new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
   }
 
+  def formatSparkCallSite = {
+    val callSiteInfo = getCallSiteInfo
+    "%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile,
+                         callSiteInfo.firstUserLine)
+  }
   /**
    * Try to find a free port to bind to on the local host. This should ideally never be needed,
    * except that, unfortunately, some of the networking libraries we currently rely on (e.g. Spray)
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 890938d48b667ee769097e2c6044334a96abb80c..8bebfafce4421e559d0bcdd0a7e77863bf702b58 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -104,6 +104,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
         val value = task.run(taskId.toInt)
         val taskFinish = System.currentTimeMillis()
         task.metrics.foreach{ m =>
+          m.hostname = Utils.localHostName
           m.executorDeserializeTime = (taskStart - startTime).toInt
           m.executorRunTime = (taskFinish - taskStart).toInt
         }
diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala
index a7c56c237199b2cb98366219282d02bcc521389b..1dc13754f90e4e950965402386af707a244567bd 100644
--- a/core/src/main/scala/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/spark/executor/TaskMetrics.scala
@@ -1,6 +1,11 @@
 package spark.executor
 
 class TaskMetrics extends Serializable {
+  /**
+   * Host's name the task runs on 
+   */
+  var hostname: String = _
+
   /**
    * Time taken on the executor to deserialize this task
    */
@@ -33,10 +38,15 @@ object TaskMetrics {
 
 
 class ShuffleReadMetrics extends Serializable {
+  /**
+   * Time when shuffle finishs
+   */
+  var shuffleFinishTime: Long = _
+
   /**
    * Total number of blocks fetched in a shuffle (remote or local)
    */
-  var totalBlocksFetched : Int = _
+  var totalBlocksFetched: Int = _
 
   /**
    * Number of remote blocks fetched in a shuffle
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 7feeb9754289cfb294b195f5579089725aca6b55..f7d60be5dbfc2c5983da71c88c2598187d97f9c8 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -298,6 +298,7 @@ class DAGScheduler(
           // Compute very short actions like first() or take() with no parent stages locally.
           runLocally(job)
         } else {
+          sparkListeners.foreach(_.onJobStart(SparkListenerJobStart(job, properties)))
           idToActiveJob(runId) = job
           activeJobs += job
           resultStageToJob(finalStage) = job
@@ -311,6 +312,8 @@ class DAGScheduler(
         handleExecutorLost(execId)
 
       case completion: CompletionEvent =>
+        sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task, 
+                               completion.reason, completion.taskInfo, completion.taskMetrics)))
         handleTaskCompletion(completion)
 
       case TaskSetFailed(taskSet, reason) =>
@@ -321,6 +324,7 @@ class DAGScheduler(
         for (job <- activeJobs) {
           val error = new SparkException("Job cancelled because SparkContext was shut down")
           job.listener.jobFailed(error)
+          sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error))))
         }
         return true
     }
@@ -468,6 +472,7 @@ class DAGScheduler(
       }
     }
     if (tasks.size > 0) {
+      sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size)))
       logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
       myPending ++= tasks
       logDebug("New pending tasks: " + myPending)
@@ -522,6 +527,7 @@ class DAGScheduler(
                     activeJobs -= job
                     resultStageToJob -= stage
                     markStageAsFinished(stage)
+                    sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobSucceeded)))
                   }
                   job.listener.taskSucceeded(rt.outputId, event.result)
                 }
@@ -662,7 +668,9 @@ class DAGScheduler(
     val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
     for (resultStage <- dependentStages) {
       val job = resultStageToJob(resultStage)
-      job.listener.jobFailed(new SparkException("Job failed: " + reason))
+      val error = new SparkException("Job failed: " + reason)
+      job.listener.jobFailed(error)
+      sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error))))
       activeJobs -= job
       resultStageToJob -= resultStage
     }
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
new file mode 100644
index 0000000000000000000000000000000000000000..178bfaba3d923de7f116656e65902f7d5fa3ee9e
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -0,0 +1,306 @@
+package spark.scheduler
+
+import java.io.PrintWriter
+import java.io.File
+import java.io.FileNotFoundException
+import java.text.SimpleDateFormat
+import java.util.{Date, Properties}
+import java.util.concurrent.LinkedBlockingQueue
+import scala.collection.mutable.{Map, HashMap, ListBuffer}
+import scala.io.Source
+import spark._
+import spark.executor.TaskMetrics
+import spark.scheduler.cluster.TaskInfo
+
+// Used to record runtime information for each job, including RDD graph 
+// tasks' start/stop shuffle information and information from outside
+
+class JobLogger(val logDirName: String) extends SparkListener with Logging {
+  private val logDir =  
+    if (System.getenv("SPARK_LOG_DIR") != null)  
+      System.getenv("SPARK_LOG_DIR")
+    else 
+      "/tmp/spark"
+  private val jobIDToPrintWriter = new HashMap[Int, PrintWriter] 
+  private val stageIDToJobID = new HashMap[Int, Int]
+  private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]]
+  private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]
+  
+  createLogDir()
+  def this() = this(String.valueOf(System.currentTimeMillis()))
+  
+  def getLogDir = logDir
+  def getJobIDtoPrintWriter = jobIDToPrintWriter
+  def getStageIDToJobID = stageIDToJobID
+  def getJobIDToStages = jobIDToStages
+  def getEventQueue = eventQueue
+  
+  new Thread("JobLogger") {
+    setDaemon(true)
+    override def run() {
+      while (true) {
+        val event = eventQueue.take
+        logDebug("Got event of type " + event.getClass.getName)
+        event match {
+          case SparkListenerJobStart(job, properties) =>
+            processJobStartEvent(job, properties)
+          case SparkListenerStageSubmitted(stage, taskSize) =>
+            processStageSubmittedEvent(stage, taskSize)
+          case StageCompleted(stageInfo) =>
+            processStageCompletedEvent(stageInfo)
+          case SparkListenerJobEnd(job, result) =>
+            processJobEndEvent(job, result)
+          case SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics) =>
+            processTaskEndEvent(task, reason, taskInfo, taskMetrics)
+          case _ =>
+        }
+      }
+    }
+  }.start()
+
+  // Create a folder for log files, the folder's name is the creation time of the jobLogger
+  protected def createLogDir() {
+    val dir = new File(logDir + "/" + logDirName + "/")
+    if (dir.exists()) {
+      return
+    }
+    if (dir.mkdirs() == false) {
+      logError("create log directory error:" + logDir + "/" + logDirName + "/")
+    }
+  }
+  
+  // Create a log file for one job, the file name is the jobID
+  protected def createLogWriter(jobID: Int) {
+    try{
+      val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
+      jobIDToPrintWriter += (jobID -> fileWriter)
+      } catch {
+        case e: FileNotFoundException => e.printStackTrace()
+      }
+  }
+  
+  // Close log file, and clean the stage relationship in stageIDToJobID 
+  protected def closeLogWriter(jobID: Int) = 
+    jobIDToPrintWriter.get(jobID).foreach { fileWriter => 
+      fileWriter.close()
+      jobIDToStages.get(jobID).foreach(_.foreach{ stage => 
+        stageIDToJobID -= stage.id
+      })
+      jobIDToPrintWriter -= jobID
+      jobIDToStages -= jobID
+    }
+  
+  // Write log information to log file, withTime parameter controls whether to recored 
+  // time stamp for the information
+  protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
+    var writeInfo = info
+    if (withTime) {
+      val date = new Date(System.currentTimeMillis())
+      writeInfo = DATE_FORMAT.format(date) + ": " +info
+    }
+    jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
+  }
+  
+  protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) = 
+    stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
+
+  protected def buildJobDep(jobID: Int, stage: Stage) {
+    if (stage.priority == jobID) {
+      jobIDToStages.get(jobID) match {
+        case Some(stageList) => stageList += stage
+        case None => val stageList = new  ListBuffer[Stage]
+                     stageList += stage
+                     jobIDToStages += (jobID -> stageList)
+      }
+      stageIDToJobID += (stage.id -> jobID)
+      stage.parents.foreach(buildJobDep(jobID, _))
+    }
+  }
+
+  protected def recordStageDep(jobID: Int) {
+    def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
+      var rddList = new ListBuffer[RDD[_]]
+      rddList += rdd
+      rdd.dependencies.foreach{ dep => dep match {
+          case shufDep: ShuffleDependency[_,_] =>
+          case _ => rddList ++= getRddsInStage(dep.rdd)
+        }
+      }
+      rddList
+    }
+    jobIDToStages.get(jobID).foreach {_.foreach { stage => 
+        var depRddDesc: String = ""
+        getRddsInStage(stage.rdd).foreach { rdd => 
+          depRddDesc += rdd.id + ","
+        }
+        var depStageDesc: String = ""
+        stage.parents.foreach { stage => 
+          depStageDesc += "(" + stage.id + "," + stage.shuffleDep.get.shuffleId + ")"
+        }
+        jobLogInfo(jobID, "STAGE_ID=" + stage.id + " RDD_DEP=(" + 
+                   depRddDesc.substring(0, depRddDesc.length - 1) + ")" + 
+                   " STAGE_DEP=" + depStageDesc, false)
+      }
+    }
+  }
+  
+  // Generate indents and convert to String
+  protected def indentString(indent: Int) = {
+    val sb = new StringBuilder()
+    for (i <- 1 to indent) {
+      sb.append(" ")
+    }
+    sb.toString()
+  }
+  
+  protected def getRddName(rdd: RDD[_]) = {
+    var rddName = rdd.getClass.getName
+    if (rdd.name != null) {
+      rddName = rdd.name 
+    }
+    rddName
+  }
+  
+  protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
+    val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")"
+    jobLogInfo(jobID, indentString(indent) + rddInfo, false)
+    rdd.dependencies.foreach{ dep => dep match {
+        case shufDep: ShuffleDependency[_,_] => 
+          val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId
+          jobLogInfo(jobID, indentString(indent + 1) + depInfo, false)
+        case _ => recordRddInStageGraph(jobID, dep.rdd, indent + 1)
+      }
+    }
+  }
+  
+  protected def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) {
+    var stageInfo: String = ""
+    if (stage.isShuffleMap) {
+      stageInfo = "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + 
+                  stage.shuffleDep.get.shuffleId
+    }else{
+      stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE"
+    }
+    if (stage.priority == jobID) {
+      jobLogInfo(jobID, indentString(indent) + stageInfo, false)
+      recordRddInStageGraph(jobID, stage.rdd, indent)
+      stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
+    } else
+      jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false)
+  }
+  
+  // Record task metrics into job log files
+  protected def recordTaskMetrics(stageID: Int, status: String, 
+                                taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
+    val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + 
+               " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + 
+               " EXECUTOR_ID=" + taskInfo.executorId +  " HOST=" + taskMetrics.hostname
+    val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
+    val readMetrics = 
+      taskMetrics.shuffleReadMetrics match {
+        case Some(metrics) => 
+          " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime + 
+          " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + 
+          " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + 
+          " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + 
+          " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + 
+          " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + 
+          " REMOTE_BYTES_READ=" + metrics.remoteBytesRead
+        case None => ""
+      }
+    val writeMetrics = 
+      taskMetrics.shuffleWriteMetrics match {
+        case Some(metrics) => 
+          " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
+        case None => ""
+      }
+    stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
+  }
+  
+  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
+    eventQueue.put(stageSubmitted)
+  }
+
+  protected def processStageSubmittedEvent(stage: Stage, taskSize: Int) {
+    stageLogInfo(stage.id, "STAGE_ID=" + stage.id + " STATUS=SUBMITTED" + " TASK_SIZE=" + taskSize)
+  }
+  
+  override def onStageCompleted(stageCompleted: StageCompleted) {
+    eventQueue.put(stageCompleted)
+  }
+
+  protected def processStageCompletedEvent(stageInfo: StageInfo) {
+    stageLogInfo(stageInfo.stage.id, "STAGE_ID=" + 
+                 stageInfo.stage.id + " STATUS=COMPLETED")
+    
+  }
+  
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+    eventQueue.put(taskEnd)
+  }
+
+  protected def processTaskEndEvent(task: Task[_], reason: TaskEndReason, 
+      taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
+    var taskStatus = ""
+    task match {
+      case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
+      case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
+    }
+    reason match {
+      case Success => taskStatus += " STATUS=SUCCESS"
+        recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskMetrics)
+      case Resubmitted => 
+        taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId + 
+                      " STAGE_ID=" + task.stageId
+        stageLogInfo(task.stageId, taskStatus)
+      case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => 
+        taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" + 
+                      task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" + 
+                      mapId + " REDUCE_ID=" + reduceId
+        stageLogInfo(task.stageId, taskStatus)
+      case OtherFailure(message) => 
+        taskStatus += " STATUS=FAILURE TID=" + taskInfo.taskId + 
+                      " STAGE_ID=" + task.stageId + " INFO=" + message
+        stageLogInfo(task.stageId, taskStatus)
+      case _ =>
+    }
+  }
+  
+  override def onJobEnd(jobEnd: SparkListenerJobEnd) {
+    eventQueue.put(jobEnd)
+  }
+
+  protected def processJobEndEvent(job: ActiveJob, reason: JobResult) {
+    var info = "JOB_ID=" + job.runId
+    reason match {
+      case JobSucceeded => info += " STATUS=SUCCESS"
+      case JobFailed(exception) =>
+        info += " STATUS=FAILED REASON="
+        exception.getMessage.split("\\s+").foreach(info += _ + "_")
+      case _ =>
+    }
+    jobLogInfo(job.runId, info.substring(0, info.length - 1).toUpperCase)
+    closeLogWriter(job.runId)
+  }
+
+  protected def recordJobProperties(jobID: Int, properties: Properties) {
+    if(properties != null) {
+      val annotation = properties.getProperty("spark.job.annotation", "")
+      jobLogInfo(jobID, annotation, false)
+    }
+  }
+
+  override def onJobStart(jobStart: SparkListenerJobStart) {
+    eventQueue.put(jobStart)
+  }
+ 
+  protected def processJobStartEvent(job: ActiveJob, properties: Properties) {
+    createLogWriter(job.runId)
+    recordJobProperties(job.runId, properties)
+    buildJobDep(job.runId, job.finalStage)
+    recordStageDep(job.runId)
+    recordStageDepGraph(job.runId, job.finalStage) 
+    jobLogInfo(job.runId, "JOB_ID=" + job.runId + " STATUS=STARTED")
+  }
+}
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index a65140b145833c70e584936785ac8fe94d6b0bc2..bac984b5c942cc9bc13c2c8db18ae07fede6ac55 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -1,27 +1,59 @@
 package spark.scheduler
 
+import java.util.Properties
 import spark.scheduler.cluster.TaskInfo
 import spark.util.Distribution
-import spark.{Utils, Logging}
+import spark.{Logging, SparkContext, TaskEndReason, Utils}
 import spark.executor.TaskMetrics
 
-trait SparkListener {
-  /**
-   * called when a stage is completed, with information on the completed stage
-   */
-  def onStageCompleted(stageCompleted: StageCompleted)
-}
-
 sealed trait SparkListenerEvents
 
+case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends SparkListenerEvents
+
 case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
 
+case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
+     taskMetrics: TaskMetrics) extends SparkListenerEvents
+
+case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null) 
+     extends SparkListenerEvents
+
+case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult) 
+     extends SparkListenerEvents
+
+trait SparkListener {
+  /**
+   * Called when a stage is completed, with information on the completed stage
+   */
+  def onStageCompleted(stageCompleted: StageCompleted) { }
+  
+  /**
+   * Called when a stage is submitted
+   */
+  def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
+  
+  /**
+   * Called when a task ends
+   */
+  def onTaskEnd(taskEnd: SparkListenerTaskEnd) { }
+
+  /**
+   * Called when a job starts
+   */
+  def onJobStart(jobStart: SparkListenerJobStart) { }
+  
+  /**
+   * Called when a job ends
+   */
+  def onJobEnd(jobEnd: SparkListenerJobEnd) { }
+  
+}
 
 /**
  * Simple SparkListener that logs a few summary statistics when each stage completes
  */
 class StatsReportListener extends SparkListener with Logging {
-  def onStageCompleted(stageCompleted: StageCompleted) {
+  override def onStageCompleted(stageCompleted: StageCompleted) {
     import spark.scheduler.StatsReportListener._
     implicit val sc = stageCompleted
     this.logInfo("Finished stage: " + stageCompleted.stageInfo)
diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..4000c4d5209b6ff96410d17b90e17b366c75d0fb
--- /dev/null
+++ b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
@@ -0,0 +1,105 @@
+package spark.scheduler
+
+import java.util.Properties
+import java.util.concurrent.LinkedBlockingQueue
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+import scala.collection.mutable
+import spark._
+import spark.SparkContext._
+
+
+class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
+
+  test("inner method") {
+    sc = new SparkContext("local", "joblogger")
+    val joblogger = new JobLogger {
+      def createLogWriterTest(jobID: Int) = createLogWriter(jobID)
+      def closeLogWriterTest(jobID: Int) = closeLogWriter(jobID)
+      def getRddNameTest(rdd: RDD[_]) = getRddName(rdd)
+      def buildJobDepTest(jobID: Int, stage: Stage) = buildJobDep(jobID, stage) 
+    }
+    type MyRDD = RDD[(Int, Int)]
+    def makeRdd(
+        numPartitions: Int,
+        dependencies: List[Dependency[_]]
+      ): MyRDD = {
+      val maxPartition = numPartitions - 1
+      return new MyRDD(sc, dependencies) {
+        override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
+          throw new RuntimeException("should not be reached")
+        override def getPartitions = (0 to maxPartition).map(i => new Partition {
+          override def index = i
+        }).toArray
+      }
+    }
+    val jobID = 5
+    val parentRdd = makeRdd(4, Nil)
+    val shuffleDep = new ShuffleDependency(parentRdd, null)
+    val rootRdd = makeRdd(4, List(shuffleDep))
+    val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID) 
+    val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID)
+    
+    joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4))
+    joblogger.getEventQueue.size should be (1)
+    joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)
+    parentRdd.setName("MyRDD")
+    joblogger.getRddNameTest(parentRdd) should be ("MyRDD")
+    joblogger.createLogWriterTest(jobID)
+    joblogger.getJobIDtoPrintWriter.size should be (1)
+    joblogger.buildJobDepTest(jobID, rootStage)
+    joblogger.getJobIDToStages.get(jobID).get.size should be (2)
+    joblogger.getStageIDToJobID.get(0) should be (Some(jobID))
+    joblogger.getStageIDToJobID.get(1) should be (Some(jobID))
+    joblogger.closeLogWriterTest(jobID)
+    joblogger.getStageIDToJobID.size should be (0)
+    joblogger.getJobIDToStages.size should be (0)
+    joblogger.getJobIDtoPrintWriter.size should be (0)
+  }
+  
+  test("inner variables") {
+    sc = new SparkContext("local[4]", "joblogger")
+    val joblogger = new JobLogger {
+      override protected def closeLogWriter(jobID: Int) = 
+        getJobIDtoPrintWriter.get(jobID).foreach { fileWriter => 
+          fileWriter.close()
+        }
+    }
+    sc.addSparkListener(joblogger)
+    val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
+    rdd.reduceByKey(_+_).collect()
+    
+    joblogger.getLogDir should be ("/tmp/spark")
+    joblogger.getJobIDtoPrintWriter.size should be (1)
+    joblogger.getStageIDToJobID.size should be (2)
+    joblogger.getStageIDToJobID.get(0) should be (Some(0))
+    joblogger.getStageIDToJobID.get(1) should be (Some(0))
+    joblogger.getJobIDToStages.size should be (1)
+  }
+  
+  
+  test("interface functions") {
+    sc = new SparkContext("local[4]", "joblogger")
+    val joblogger = new JobLogger {
+      var onTaskEndCount = 0
+      var onJobEndCount = 0 
+      var onJobStartCount = 0
+      var onStageCompletedCount = 0
+      var onStageSubmittedCount = 0
+      override def onTaskEnd(taskEnd: SparkListenerTaskEnd)  = onTaskEndCount += 1
+      override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1
+      override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1
+      override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1
+      override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1
+    }
+    sc.addSparkListener(joblogger)
+    val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
+    rdd.reduceByKey(_+_).collect()
+    
+    joblogger.onJobStartCount should be (1)
+    joblogger.onJobEndCount should be (1)
+    joblogger.onTaskEndCount should be (8)
+    joblogger.onStageSubmittedCount should be (2)
+    joblogger.onStageCompletedCount should be (2)
+  }
+}
diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
index 42a87d8b90fe57c111d4d438c8f033207d0fedbd..48aa67c543cf347db32bd664826dab339209305a 100644
--- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
@@ -77,7 +77,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
 
   class SaveStageInfo extends SparkListener {
     val stageInfos = mutable.Buffer[StageInfo]()
-    def onStageCompleted(stage: StageCompleted) {
+    override def onStageCompleted(stage: StageCompleted) {
       stageInfos += stage.stageInfo
     }
   }