diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java b/core/src/main/java/org/apache/spark/JavaSparkListener.java
new file mode 100644
index 0000000000000000000000000000000000000000..646496f3135076bfcf1de3312acf54ff67ee94b9
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/JavaSparkListener.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerApplicationEnd;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
+import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
+import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorAdded;
+import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
+import org.apache.spark.scheduler.SparkListenerTaskStart;
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
+
+/**
+ * Java clients should extend this class instead of implementing
+ * SparkListener directly. This is to prevent java clients
+ * from breaking when new events are added to the SparkListener
+ * trait.
+ *
+ * This is a concrete class instead of abstract to enforce
+ * new events get added to both the SparkListener and this adapter
+ * in lockstep.
+ */
+public class JavaSparkListener implements SparkListener {
+
+  @Override
+  public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }
+
+  @Override
+  public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { }
+
+  @Override
+  public void onTaskStart(SparkListenerTaskStart taskStart) { }
+
+  @Override
+  public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { }
+
+  @Override
+  public void onTaskEnd(SparkListenerTaskEnd taskEnd) { }
+
+  @Override
+  public void onJobStart(SparkListenerJobStart jobStart) { }
+
+  @Override
+  public void onJobEnd(SparkListenerJobEnd jobEnd) { }
+
+  @Override
+  public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { }
+
+  @Override
+  public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { }
+
+  @Override
+  public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { }
+
+  @Override
+  public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { }
+
+  @Override
+  public void onApplicationStart(SparkListenerApplicationStart applicationStart) { }
+
+  @Override
+  public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { }
+
+  @Override
+  public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { }
+
+  @Override
+  public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { }
+
+  @Override
+  public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index ad7d81747c3777f474f17ab31ce7ec2f97abfad8..ede0a9dbefb8d41079ee6a234ec3706ef89061cc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -38,8 +38,8 @@ private[spark] class ApplicationInfo(
   extends Serializable {
 
   @transient var state: ApplicationState.Value = _
-  @transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
-  @transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _
+  @transient var executors: mutable.HashMap[Int, ExecutorDesc] = _
+  @transient var removedExecutors: ArrayBuffer[ExecutorDesc] = _
   @transient var coresGranted: Int = _
   @transient var endTime: Long = _
   @transient var appSource: ApplicationSource = _
@@ -55,12 +55,12 @@ private[spark] class ApplicationInfo(
 
   private def init() {
     state = ApplicationState.WAITING
-    executors = new mutable.HashMap[Int, ExecutorInfo]
+    executors = new mutable.HashMap[Int, ExecutorDesc]
     coresGranted = 0
     endTime = -1L
     appSource = new ApplicationSource(this)
     nextExecutorId = 0
-    removedExecutors = new ArrayBuffer[ExecutorInfo]
+    removedExecutors = new ArrayBuffer[ExecutorDesc]
   }
 
   private def newExecutorId(useID: Option[Int] = None): Int = {
@@ -75,14 +75,14 @@ private[spark] class ApplicationInfo(
     }
   }
 
-  def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = {
-    val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
+  def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorDesc = {
+    val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
     executors(exec.id) = exec
     coresGranted += cores
     exec
   }
 
-  def removeExecutor(exec: ExecutorInfo) {
+  def removeExecutor(exec: ExecutorDesc) {
     if (executors.contains(exec.id)) {
       removedExecutors += executors(exec.id)
       executors -= exec.id
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala
similarity index 95%
rename from core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
rename to core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala
index d417070c510164c47e136fb24384ed725cd1a6a0..5d620dfcabad5edec26e51b200bd902420c77b6d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.master
 
 import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
 
-private[spark] class ExecutorInfo(
+private[spark] class ExecutorDesc(
     val id: Int,
     val application: ApplicationInfo,
     val worker: WorkerInfo,
@@ -37,7 +37,7 @@ private[spark] class ExecutorInfo(
 
   override def equals(other: Any): Boolean = {
     other match {
-      case info: ExecutorInfo =>
+      case info: ExecutorDesc =>
         fullId == info.fullId &&
         worker.id == info.worker.id &&
         cores == info.cores &&
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 4b631ec6390715c066cdefed5253536159e33bab..d92d99310a5838431a4cf55969eea2ff06aa7fd7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -581,7 +581,7 @@ private[spark] class Master(
     }
   }
 
-  def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
+  def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
     logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
     worker.addExecutor(exec)
     worker.actor ! LaunchExecutor(masterUrl,
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index 473ddc23ff0f39fd9d84fe0d4840b55ad651bfc0..e94aae93e4495af9ad390b007eae690be11372c0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -38,7 +38,7 @@ private[spark] class WorkerInfo(
   Utils.checkHost(host, "Expected hostname")
   assert (port > 0)
 
-  @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // executorId => info
+  @transient var executors: mutable.HashMap[String, ExecutorDesc] = _ // executorId => info
   @transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId => info
   @transient var state: WorkerState.Value = _
   @transient var coresUsed: Int = _
@@ -70,13 +70,13 @@ private[spark] class WorkerInfo(
     host + ":" + port
   }
 
-  def addExecutor(exec: ExecutorInfo) {
+  def addExecutor(exec: ExecutorDesc) {
     executors(exec.fullId) = exec
     coresUsed += exec.cores
     memoryUsed += exec.memory
   }
 
-  def removeExecutor(exec: ExecutorInfo) {
+  def removeExecutor(exec: ExecutorDesc) {
     if (executors.contains(exec.fullId)) {
       executors -= exec.fullId
       coresUsed -= exec.cores
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 4588c130ef439c9a3dc1caa43f66ed40519818c1..3aae2b95d73969d0178e5cb18a4694165da7e038 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -27,7 +27,7 @@ import org.json4s.JValue
 
 import org.apache.spark.deploy.{ExecutorState, JsonProtocol}
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
-import org.apache.spark.deploy.master.ExecutorInfo
+import org.apache.spark.deploy.master.ExecutorDesc
 import org.apache.spark.ui.{UIUtils, WebUIPage}
 import org.apache.spark.util.Utils
 
@@ -109,7 +109,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
     UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
   }
 
-  private def executorRow(executor: ExecutorInfo): Seq[Node] = {
+  private def executorRow(executor: ExecutorDesc): Seq[Node] = {
     <tr>
       <td>{executor.id}</td>
       <td>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 27bf4f15990762b0b45e9f8c52c4e0c12408d60b..30075c172bdb1d35b955c29700a4484520c42e33 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -168,6 +168,10 @@ private[spark] class EventLoggingListener(
     logEvent(event, flushLogger = true)
   override def onApplicationEnd(event: SparkListenerApplicationEnd) =
     logEvent(event, flushLogger = true)
+  override def onExecutorAdded(event: SparkListenerExecutorAdded) =
+    logEvent(event, flushLogger = true)
+  override def onExecutorRemoved(event: SparkListenerExecutorRemoved) =
+    logEvent(event, flushLogger = true)
 
   // No-op because logging every update would be overkill
   override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate) { }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index b62b0c13126935f4d9b947b1a001210ee4ae59e6..4840d8bd2d2f06f60902383cf7f5645192ed1ea7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable
 import org.apache.spark.{Logging, TaskEndReason}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.{Distribution, Utils}
 
@@ -84,6 +85,14 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan
 @DeveloperApi
 case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
 
+@DeveloperApi
+case class SparkListenerExecutorAdded(executorId: String, executorInfo: ExecutorInfo)
+  extends SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerExecutorRemoved(executorId: String)
+  extends SparkListenerEvent
+
 /**
  * Periodic updates from executors.
  * @param execId executor id
@@ -109,7 +118,8 @@ private[spark] case object SparkListenerShutdown extends SparkListenerEvent
 /**
  * :: DeveloperApi ::
  * Interface for listening to events from the Spark scheduler. Note that this is an internal
- * interface which might change in different Spark releases.
+ * interface which might change in different Spark releases. Java clients should extend
+ * {@link JavaSparkListener}
  */
 @DeveloperApi
 trait SparkListener {
@@ -183,6 +193,16 @@ trait SparkListener {
    * Called when the driver receives task metrics from an executor in a heartbeat.
    */
   def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { }
+
+  /**
+   * Called when the driver registers a new executor.
+   */
+  def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) { }
+
+  /**
+   * Called when the driver removes an executor.
+   */
+  def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved) { }
 }
 
 /**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index e79ffd7a3587d6129da5b8867ef2a6c90f6ad385..e700c6af542f4a0f47c8508846fe515e8d7d1642 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -70,6 +70,10 @@ private[spark] trait SparkListenerBus extends Logging {
         foreachListener(_.onApplicationEnd(applicationEnd))
       case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
         foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))
+      case executorAdded: SparkListenerExecutorAdded =>
+        foreachListener(_.onExecutorAdded(executorAdded))
+      case executorRemoved: SparkListenerExecutorRemoved =>
+        foreachListener(_.onExecutorRemoved(executorRemoved))
       case SparkListenerShutdown =>
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index fe9914b50bc54343a95b3316d87003a46f99f1d4..5786d367464f4c339c248e88b932243839e62c26 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -28,7 +28,7 @@ import akka.pattern.ask
 import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 
 import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
-import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
 
@@ -66,6 +66,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
   // Number of executors requested from the cluster manager that have not registered yet
   private var numPendingExecutors = 0
 
+  private val listenerBus = scheduler.sc.listenerBus
+
   // Executors we have requested the cluster manager to kill that have not died yet
   private val executorsPendingToRemove = new HashSet[String]
 
@@ -106,6 +108,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
               logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
             }
           }
+          listenerBus.post(SparkListenerExecutorAdded(executorId, data))
           makeOffers()
         }
 
@@ -213,6 +216,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
           totalCoreCount.addAndGet(-executorInfo.totalCores)
           totalRegisteredExecutors.addAndGet(-1)
           scheduler.executorLost(executorId, SlaveLost(reason))
+          listenerBus.post(SparkListenerExecutorRemoved(executorId))
         case None => logError(s"Asked to remove non-existent executor $executorId")
       }
     }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
index b71bd5783d6df2ea4c8a825261daedb7765770bb..eb52ddfb1eab120fdf9fdd174d1f76700e86e297 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -31,7 +31,7 @@ import akka.actor.{Address, ActorRef}
 private[cluster] class ExecutorData(
    val executorActor: ActorRef,
    val executorAddress: Address,
-   val executorHost: String ,
+   override val executorHost: String,
    var freeCores: Int,
-   val totalCores: Int
-)
+   override val totalCores: Int
+) extends ExecutorInfo(executorHost, totalCores)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
new file mode 100644
index 0000000000000000000000000000000000000000..b4738e64c93911516c1f3e7f57c5f29b1d5ad3b6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Stores information about an executor to pass from the scheduler to SparkListeners.
+ */
+@DeveloperApi
+class ExecutorInfo(
+   val executorHost: String,
+   val totalCores: Int
+) {
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: ExecutorInfo =>
+      (that canEqual this) &&
+        executorHost == that.executorHost &&
+        totalCores == that.totalCores
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    val state = Seq(executorHost, totalCores)
+    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 75d8ddf375e275632c3bdbc2ab8e40d9547cf7b7..d252fe8595fb8a507023b74a13356149c04b7dd7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -27,9 +27,11 @@ import scala.collection.mutable.{HashMap, HashSet}
 import org.apache.mesos.protobuf.ByteString
 import org.apache.mesos.{Scheduler => MScheduler}
 import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState,
+  ExecutorInfo => MesosExecutorInfo, _}
 
 import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
+import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
 
@@ -62,6 +64,9 @@ private[spark] class MesosSchedulerBackend(
 
   var classLoader: ClassLoader = null
 
+  // The listener bus to publish executor added/removed events.
+  val listenerBus = sc.listenerBus
+
   @volatile var appId: String = _
 
   override def start() {
@@ -87,7 +92,7 @@ private[spark] class MesosSchedulerBackend(
     }
   }
 
-  def createExecutorInfo(execId: String): ExecutorInfo = {
+  def createExecutorInfo(execId: String): MesosExecutorInfo = {
     val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
       .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
       .getOrElse {
@@ -141,7 +146,7 @@ private[spark] class MesosSchedulerBackend(
         Value.Scalar.newBuilder()
           .setValue(MemoryUtils.calculateTotalMemory(sc)).build())
       .build()
-    ExecutorInfo.newBuilder()
+    MesosExecutorInfo.newBuilder()
       .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
       .setCommand(command)
       .setData(ByteString.copyFrom(createExecArg()))
@@ -237,6 +242,7 @@ private[spark] class MesosSchedulerBackend(
       }
 
       val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
+      val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
 
       val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
 
@@ -260,6 +266,10 @@ private[spark] class MesosSchedulerBackend(
       val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
 
       mesosTasks.foreach { case (slaveId, tasks) =>
+        slaveIdToWorkerOffer.get(slaveId).foreach(o =>
+          listenerBus.post(SparkListenerExecutorAdded(slaveId,
+            new ExecutorInfo(o.host, o.cores)))
+        )
         d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
       }
 
@@ -315,7 +325,7 @@ private[spark] class MesosSchedulerBackend(
       synchronized {
         if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
           // We lost the executor on this slave, so remember that it's gone
-          slaveIdsWithExecutors -= taskIdToSlaveId(tid)
+          removeExecutor(taskIdToSlaveId(tid))
         }
         if (isFinished(status.getState)) {
           taskIdToSlaveId.remove(tid)
@@ -344,12 +354,20 @@ private[spark] class MesosSchedulerBackend(
 
   override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
 
+  /**
+   * Remove executor associated with slaveId in a thread safe manner.
+   */
+  private def removeExecutor(slaveId: String) = {
+    synchronized {
+      listenerBus.post(SparkListenerExecutorRemoved(slaveId))
+      slaveIdsWithExecutors -= slaveId
+    }
+  }
+
   private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
     inClassLoader() {
       logInfo("Mesos slave lost: " + slaveId.getValue)
-      synchronized {
-        slaveIdsWithExecutors -= slaveId.getValue
-      }
+      removeExecutor(slaveId.getValue)
       scheduler.executorLost(slaveId.getValue, reason)
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index d94e8252650d221dd628bd80057a53a670142b43..a025011006156e6b8a82027756384581e389418f 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -19,6 +19,8 @@ package org.apache.spark.util
 
 import java.util.{Properties, UUID}
 
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+
 import scala.collection.JavaConverters._
 import scala.collection.Map
 
@@ -83,7 +85,10 @@ private[spark] object JsonProtocol {
         applicationStartToJson(applicationStart)
       case applicationEnd: SparkListenerApplicationEnd =>
         applicationEndToJson(applicationEnd)
-
+      case executorAdded: SparkListenerExecutorAdded =>
+        executorAddedToJson(executorAdded)
+      case executorRemoved: SparkListenerExecutorRemoved =>
+        executorRemovedToJson(executorRemoved)
       // These aren't used, but keeps compiler happy
       case SparkListenerShutdown => JNothing
       case SparkListenerExecutorMetricsUpdate(_, _) => JNothing
@@ -194,6 +199,16 @@ private[spark] object JsonProtocol {
     ("Timestamp" -> applicationEnd.time)
   }
 
+  def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = {
+    ("Event" -> Utils.getFormattedClassName(executorAdded)) ~
+    ("Executor ID" -> executorAdded.executorId) ~
+    ("Executor Info" -> executorInfoToJson(executorAdded.executorInfo))
+  }
+
+  def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = {
+    ("Event" -> Utils.getFormattedClassName(executorRemoved)) ~
+    ("Executor ID" -> executorRemoved.executorId)
+  }
 
   /** ------------------------------------------------------------------- *
    * JSON serialization methods for classes SparkListenerEvents depend on |
@@ -362,6 +377,10 @@ private[spark] object JsonProtocol {
     ("Disk Size" -> blockStatus.diskSize)
   }
 
+  def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
+    ("Host" -> executorInfo.executorHost) ~
+    ("Total Cores" -> executorInfo.totalCores)
+  }
 
   /** ------------------------------ *
    * Util JSON serialization methods |
@@ -416,6 +435,8 @@ private[spark] object JsonProtocol {
     val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD)
     val applicationStart = Utils.getFormattedClassName(SparkListenerApplicationStart)
     val applicationEnd = Utils.getFormattedClassName(SparkListenerApplicationEnd)
+    val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded)
+    val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
 
     (json \ "Event").extract[String] match {
       case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -431,6 +452,8 @@ private[spark] object JsonProtocol {
       case `unpersistRDD` => unpersistRDDFromJson(json)
       case `applicationStart` => applicationStartFromJson(json)
       case `applicationEnd` => applicationEndFromJson(json)
+      case `executorAdded` => executorAddedFromJson(json)
+      case `executorRemoved` => executorRemovedFromJson(json)
     }
   }
 
@@ -523,6 +546,16 @@ private[spark] object JsonProtocol {
     SparkListenerApplicationEnd((json \ "Timestamp").extract[Long])
   }
 
+  def executorAddedFromJson(json: JValue): SparkListenerExecutorAdded = {
+    val executorId = (json \ "Executor ID").extract[String]
+    val executorInfo = executorInfoFromJson(json \ "Executor Info")
+    SparkListenerExecutorAdded(executorId, executorInfo)
+  }
+
+  def executorRemovedFromJson(json: JValue): SparkListenerExecutorRemoved = {
+    val executorId = (json \ "Executor ID").extract[String]
+    SparkListenerExecutorRemoved(executorId)
+  }
 
   /** --------------------------------------------------------------------- *
    * JSON deserialization methods for classes SparkListenerEvents depend on |
@@ -745,6 +778,11 @@ private[spark] object JsonProtocol {
     BlockStatus(storageLevel, memorySize, diskSize, tachyonSize)
   }
 
+  def executorInfoFromJson(json: JValue): ExecutorInfo = {
+    val executorHost = (json \ "Host").extract[String]
+    val totalCores = (json \ "Total Cores").extract[Int]
+    new ExecutorInfo(executorHost, totalCores)
+  }
 
   /** -------------------------------- *
    * Util JSON deserialization methods |
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 1de7e130039a51965c4971cb5506e18b872a69be..437d8693c0b1f2925ac481052aa366372a6f4e70 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -160,7 +160,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
    */
   private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
     val conf = getLoggingConf(testDirPath, compressionCodec)
-    val sc = new SparkContext("local", "test", conf)
+    val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
     assert(sc.eventLogger.isDefined)
     val eventLogger = sc.eventLogger.get
     val expectedLogDir = testDir.toURI().toString()
@@ -184,6 +184,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
     val eventSet = mutable.Set(
       SparkListenerApplicationStart,
       SparkListenerBlockManagerAdded,
+      SparkListenerExecutorAdded,
       SparkListenerEnvironmentUpdate,
       SparkListenerJobStart,
       SparkListenerJobEnd,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..623a687c359a21a1614054e3899d5dd335242259
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.{SparkContext, LocalSparkContext}
+
+import org.scalatest.{FunSuite, BeforeAndAfter, BeforeAndAfterAll}
+
+import scala.collection.mutable
+
+/**
+ * Unit tests for SparkListener that require a local cluster.
+ */
+class SparkListenerWithClusterSuite extends FunSuite with LocalSparkContext
+  with BeforeAndAfter with BeforeAndAfterAll {
+
+  /** Length of time to wait while draining listener events. */
+  val WAIT_TIMEOUT_MILLIS = 10000
+
+  before {
+    sc = new SparkContext("local-cluster[2,1,512]", "SparkListenerSuite")
+  }
+
+  test("SparkListener sends executor added message") {
+    val listener = new SaveExecutorInfo
+    sc.addSparkListener(listener)
+
+    val rdd1 = sc.parallelize(1 to 100, 4)
+    val rdd2 = rdd1.map(_.toString)
+    rdd2.setName("Target RDD")
+    rdd2.count()
+
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(listener.addedExecutorInfo.size == 2)
+    assert(listener.addedExecutorInfo("0").totalCores == 1)
+    assert(listener.addedExecutorInfo("1").totalCores == 1)
+  }
+
+  private class SaveExecutorInfo extends SparkListener {
+    val addedExecutorInfo = mutable.Map[String, ExecutorInfo]()
+
+    override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
+      addedExecutorInfo(executor.executorId) = executor.executorInfo
+    }
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
index 48f5e40f506d93b4c3bb59ff3558ed3c62c45ee2..78a30a40bf19ad33485f63f14fc644bfbfa50319 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
@@ -18,17 +18,20 @@
 package org.apache.spark.scheduler.mesos
 
 import org.scalatest.FunSuite
-import org.apache.spark.{scheduler, SparkConf, SparkContext, LocalSparkContext}
-import org.apache.spark.scheduler.{TaskDescription, WorkerOffer, TaskSchedulerImpl}
+import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
+import org.apache.spark.scheduler.{SparkListenerExecutorAdded, LiveListenerBus,
+  TaskDescription, WorkerOffer, TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend}
 import org.apache.mesos.SchedulerDriver
-import org.apache.mesos.Protos._
-import org.scalatest.mock.EasyMockSugar
+import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, _}
 import org.apache.mesos.Protos.Value.Scalar
 import org.easymock.{Capture, EasyMock}
 import java.nio.ByteBuffer
 import java.util.Collections
 import java.util
+import org.scalatest.mock.EasyMockSugar
+
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
@@ -52,11 +55,16 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
     val driver = EasyMock.createMock(classOf[SchedulerDriver])
     val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
 
+    val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
+    listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2)))
+    EasyMock.replay(listenerBus)
+
     val sc = EasyMock.createMock(classOf[SparkContext])
     EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
     EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes()
     EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
     EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes()
+    EasyMock.expect(sc.listenerBus).andReturn(listenerBus)
     EasyMock.replay(sc)
 
     val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 63c2559c5c5f5991c5c62daf31fe94514973ae4d..5ba94ff67d395f3fed5d924114f0bc7ab519c55e 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.util
 
 import java.util.Properties
 
+import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.shuffle.MetadataFetchFailedException
 
 import scala.collection.Map
@@ -69,6 +70,9 @@ class JsonProtocolSuite extends FunSuite {
     val unpersistRdd = SparkListenerUnpersistRDD(12345)
     val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
     val applicationEnd = SparkListenerApplicationEnd(42L)
+    val executorAdded = SparkListenerExecutorAdded("exec1",
+      new ExecutorInfo("Hostee.awesome.com", 11))
+    val executorRemoved = SparkListenerExecutorRemoved("exec2")
 
     testEvent(stageSubmitted, stageSubmittedJsonString)
     testEvent(stageCompleted, stageCompletedJsonString)
@@ -85,6 +89,8 @@ class JsonProtocolSuite extends FunSuite {
     testEvent(unpersistRdd, unpersistRDDJsonString)
     testEvent(applicationStart, applicationStartJsonString)
     testEvent(applicationEnd, applicationEndJsonString)
+    testEvent(executorAdded, executorAddedJsonString)
+    testEvent(executorRemoved, executorRemovedJsonString)
   }
 
   test("Dependent Classes") {
@@ -94,6 +100,7 @@ class JsonProtocolSuite extends FunSuite {
     testTaskMetrics(makeTaskMetrics(
       33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false))
     testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
+    testExecutorInfo(new ExecutorInfo("host", 43))
 
     // StorageLevel
     testStorageLevel(StorageLevel.NONE)
@@ -303,6 +310,10 @@ class JsonProtocolSuite extends FunSuite {
     assert(blockId === newBlockId)
   }
 
+  private def testExecutorInfo(info: ExecutorInfo) {
+    val newInfo = JsonProtocol.executorInfoFromJson(JsonProtocol.executorInfoToJson(info))
+    assertEquals(info, newInfo)
+  }
 
   /** -------------------------------- *
    | Util methods for comparing events |
@@ -335,6 +346,11 @@ class JsonProtocolSuite extends FunSuite {
         assertEquals(e1.jobResult, e2.jobResult)
       case (e1: SparkListenerEnvironmentUpdate, e2: SparkListenerEnvironmentUpdate) =>
         assertEquals(e1.environmentDetails, e2.environmentDetails)
+      case (e1: SparkListenerExecutorAdded, e2: SparkListenerExecutorAdded) =>
+        assert(e1.executorId == e1.executorId)
+        assertEquals(e1.executorInfo, e2.executorInfo)
+      case (e1: SparkListenerExecutorRemoved, e2: SparkListenerExecutorRemoved) =>
+        assert(e1.executorId == e1.executorId)
       case (e1, e2) =>
         assert(e1 === e2)
       case _ => fail("Events don't match in types!")
@@ -387,6 +403,11 @@ class JsonProtocolSuite extends FunSuite {
     assert(info1.accumulables === info2.accumulables)
   }
 
+  private def assertEquals(info1: ExecutorInfo, info2: ExecutorInfo) {
+    assert(info1.executorHost == info2.executorHost)
+    assert(info1.totalCores == info2.totalCores)
+  }
+
   private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
     assert(metrics1.hostname === metrics2.hostname)
     assert(metrics1.executorDeserializeTime === metrics2.executorDeserializeTime)
@@ -1407,4 +1428,24 @@ class JsonProtocolSuite extends FunSuite {
       |  "Timestamp": 42
       |}
     """
+
+  private val executorAddedJsonString =
+    """
+      |{
+      |  "Event": "SparkListenerExecutorAdded",
+      |  "Executor ID": "exec1",
+      |  "Executor Info": {
+      |    "Host": "Hostee.awesome.com",
+      |    "Total Cores": 11
+      |  }
+      |}
+    """
+
+  private val executorRemovedJsonString =
+    """
+      |{
+      |  "Event": "SparkListenerExecutorRemoved",
+      |  "Executor ID": "exec2"
+      |}
+    """
 }