diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index e5d1eb767e1098bec81bffef8dd395a8630de6b2..8f5ceaa5de515e182d0806c349084584c389f2b5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -91,11 +91,11 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan
 case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerExecutorAdded(executorId: String, executorInfo: ExecutorInfo)
+case class SparkListenerExecutorAdded(time: Long, executorId: String, executorInfo: ExecutorInfo)
   extends SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerExecutorRemoved(executorId: String)
+case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String)
   extends SparkListenerEvent
 
 /**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5786d367464f4c339c248e88b932243839e62c26..103a5c053c28939b4e67b3625f2c7f600cdf2b3e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -108,7 +108,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
               logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
             }
           }
-          listenerBus.post(SparkListenerExecutorAdded(executorId, data))
+          listenerBus.post(
+            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
           makeOffers()
         }
 
@@ -216,7 +217,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
           totalCoreCount.addAndGet(-executorInfo.totalCores)
           totalRegisteredExecutors.addAndGet(-1)
           scheduler.executorLost(executorId, SlaveLost(reason))
-          listenerBus.post(SparkListenerExecutorRemoved(executorId))
+          listenerBus.post(
+            SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason))
         case None => logError(s"Asked to remove non-existent executor $executorId")
       }
     }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 79c9051e88691e316755f18d81e062786864bc6e..c3c546be6da15d7f1846f98fc975db3c2d6ae46e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -269,7 +269,7 @@ private[spark] class MesosSchedulerBackend(
 
       mesosTasks.foreach { case (slaveId, tasks) =>
         slaveIdToWorkerOffer.get(slaveId).foreach(o =>
-          listenerBus.post(SparkListenerExecutorAdded(slaveId,
+          listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
             new ExecutorInfo(o.host, o.cores)))
         )
         d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
@@ -327,7 +327,7 @@ private[spark] class MesosSchedulerBackend(
       synchronized {
         if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
           // We lost the executor on this slave, so remember that it's gone
-          removeExecutor(taskIdToSlaveId(tid))
+          removeExecutor(taskIdToSlaveId(tid), "Lost executor")
         }
         if (isFinished(status.getState)) {
           taskIdToSlaveId.remove(tid)
@@ -359,9 +359,9 @@ private[spark] class MesosSchedulerBackend(
   /**
    * Remove executor associated with slaveId in a thread safe manner.
    */
-  private def removeExecutor(slaveId: String) = {
+  private def removeExecutor(slaveId: String, reason: String) = {
     synchronized {
-      listenerBus.post(SparkListenerExecutorRemoved(slaveId))
+      listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason))
       slaveIdsWithExecutors -= slaveId
     }
   }
@@ -369,7 +369,7 @@ private[spark] class MesosSchedulerBackend(
   private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
     inClassLoader() {
       logInfo("Mesos slave lost: " + slaveId.getValue)
-      removeExecutor(slaveId.getValue)
+      removeExecutor(slaveId.getValue, reason.toString)
       scheduler.executorLost(slaveId.getValue, reason)
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index f896b5072e4fa38912bd211b1dc73023cc193938..b5f736dc41c6c893bcee940e728a1fa23c08a1ff 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -204,13 +204,16 @@ private[spark] object JsonProtocol {
 
   def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = {
     ("Event" -> Utils.getFormattedClassName(executorAdded)) ~
+    ("Timestamp" -> executorAdded.time) ~
     ("Executor ID" -> executorAdded.executorId) ~
     ("Executor Info" -> executorInfoToJson(executorAdded.executorInfo))
   }
 
   def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = {
     ("Event" -> Utils.getFormattedClassName(executorRemoved)) ~
-    ("Executor ID" -> executorRemoved.executorId)
+    ("Timestamp" -> executorRemoved.time) ~
+    ("Executor ID" -> executorRemoved.executorId) ~
+    ("Removed Reason" -> executorRemoved.reason)
   }
 
   /** ------------------------------------------------------------------- *
@@ -554,14 +557,17 @@ private[spark] object JsonProtocol {
   }
 
   def executorAddedFromJson(json: JValue): SparkListenerExecutorAdded = {
+    val time = (json \ "Timestamp").extract[Long]
     val executorId = (json \ "Executor ID").extract[String]
     val executorInfo = executorInfoFromJson(json \ "Executor Info")
-    SparkListenerExecutorAdded(executorId, executorInfo)
+    SparkListenerExecutorAdded(time, executorId, executorInfo)
   }
 
   def executorRemovedFromJson(json: JValue): SparkListenerExecutorRemoved = {
+    val time = (json \ "Timestamp").extract[Long]
     val executorId = (json \ "Executor ID").extract[String]
-    SparkListenerExecutorRemoved(executorId)
+    val reason = (json \ "Removed Reason").extract[String]
+    SparkListenerExecutorRemoved(time, executorId, reason)
   }
 
   /** --------------------------------------------------------------------- *
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
index 073814c127edc24ca5cf44751cfb3727f4dbcc81..f2ff98eb72daf7a8208c6cd8018a7e30fd59e444 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
@@ -43,7 +43,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
     conf.set("spark.mesos.executor.home" , "/mesos-home")
 
     val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
-    listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2)))
+    listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
     EasyMock.replay(listenerBus)
 
     val sc = EasyMock.createMock(classOf[SparkContext])
@@ -88,7 +88,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
     val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
 
     val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
-    listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2)))
+    listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
     EasyMock.replay(listenerBus)
 
     val sc = EasyMock.createMock(classOf[SparkContext])
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 0357fc6ce27801586506b23cbaf15252ebd6a891..6577ebaa2e9a88f68a85201fe1fa0757139fafcd 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -37,6 +37,9 @@ class JsonProtocolSuite extends FunSuite {
   val jobSubmissionTime = 1421191042750L
   val jobCompletionTime = 1421191296660L
 
+  val executorAddedTime = 1421458410000L
+  val executorRemovedTime = 1421458922000L
+
   test("SparkListenerEvent") {
     val stageSubmitted =
       SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
@@ -73,9 +76,9 @@ class JsonProtocolSuite extends FunSuite {
     val unpersistRdd = SparkListenerUnpersistRDD(12345)
     val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
     val applicationEnd = SparkListenerApplicationEnd(42L)
-    val executorAdded = SparkListenerExecutorAdded("exec1",
+    val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
       new ExecutorInfo("Hostee.awesome.com", 11))
-    val executorRemoved = SparkListenerExecutorRemoved("exec2")
+    val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
 
     testEvent(stageSubmitted, stageSubmittedJsonString)
     testEvent(stageCompleted, stageCompletedJsonString)
@@ -1453,9 +1456,10 @@ class JsonProtocolSuite extends FunSuite {
     """
 
   private val executorAddedJsonString =
-    """
+    s"""
       |{
       |  "Event": "SparkListenerExecutorAdded",
+      |  "Timestamp": ${executorAddedTime},
       |  "Executor ID": "exec1",
       |  "Executor Info": {
       |    "Host": "Hostee.awesome.com",
@@ -1465,10 +1469,12 @@ class JsonProtocolSuite extends FunSuite {
     """
 
   private val executorRemovedJsonString =
-    """
+    s"""
       |{
       |  "Event": "SparkListenerExecutorRemoved",
-      |  "Executor ID": "exec2"
+      |  "Timestamp": ${executorRemovedTime},
+      |  "Executor ID": "exec2",
+      |  "Removed Reason": "test reason"
       |}
     """
 }