diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 1255d0c72e6c26578748bad974ef141395306495..f1d9d5e4429514655c8e917e37fb298551639d99 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -275,12 +275,10 @@ class SparkContext(
 
   val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
   val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
-  val metricsSystem = MetricsSystem.createMetricsSystem("driver")
 
   def initDriverMetrics() = {
-     metricsSystem.registerSource(dagSchedulerSource)
-     metricsSystem.registerSource(blockManagerSource)
-     metricsSystem.start()
+     SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
+     SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
   }
 
   initDriverMetrics()
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 125dc55bd8fe3a3c46a187c19a2899fcad7d42c8..7b3dc693486bb23c32be6daf499c137310cc2aeb 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -54,6 +54,7 @@ class SparkEnv (
     val connectionManager: ConnectionManager,
     val httpFileServer: HttpFileServer,
     val sparkFilesDir: String,
+    val metricsSystem: metricsSystem,
     // To be set only as part of initialization of SparkContext.
     // (executorId, defaultHostPort) => executorHostPort
     // If executorId is NOT found, return defaultHostPort
@@ -69,6 +70,7 @@ class SparkEnv (
     broadcastManager.stop()
     blockManager.stop()
     blockManager.master.stop()
+    metricsSystem.stop()
     actorSystem.shutdown()
     // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
     // down, but let's call it anyway in case it gets fixed in a later release
@@ -185,6 +187,12 @@ object SparkEnv extends Logging {
     httpFileServer.initialize()
     System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
 
+    val metricsSystem = if (isDriver) {
+      MetricsSystem.createMetricsSystem("driver")
+    } else {
+      MetricsSystem.createMetricsSystem("executor")
+    }
+    metricsSystem.start()
 
     // Set the sparkFiles directory, used when downloading dependencies.  In local mode,
     // this is a temporary directory; in distributed mode, this is the executor's current working
@@ -215,6 +223,7 @@ object SparkEnv extends Logging {
       connectionManager,
       httpFileServer,
       sparkFilesDir,
+      metricsSystem,
       None)
   }
 }
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 2e81151882837e92b4e97131b6c9e5d6ab5661e1..7179ed84a8c6ffa5eddd76b8e7c4139379dd9138 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -86,10 +86,14 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
       }
     }
   )
+  
+  val executorInstrumentation = new ExecutorInstrumentation(this)
 
   // Initialize Spark environment (using system properties read above)
   val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
-  SparkEnv.set(env)
+  SparkEnv.set(env) 
+  env.metricsSystem.registerSource(executorInstrumentation)
+  
   private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
 
   // Start worker thread pool
diff --git a/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala b/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala
index 80aadb66b0e5fcbd32ec73462ceec33f817820b8..ebbcbee7423095009485cf269639fdfacb4afb26 100644
--- a/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala
+++ b/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala
@@ -4,32 +4,32 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
 
 import spark.metrics.source.Source
 
-class ExecutorInstrumentation(val executor: Option[Executor]) extends Source{
+class ExecutorInstrumentation(val executor: Executor) extends Source{
   val metricRegistry = new MetricRegistry()
   val sourceName = "executor"
   
   // Gauge for executor thread pool's actively executing task counts
   metricRegistry.register(MetricRegistry.name("threadpool", "active_task", "number"), 
     new Gauge[Int] {
-      override def getValue: Int = executor.map(_.threadPool.getActiveCount()).getOrElse(0)
+      override def getValue: Int = executor.threadPool.getActiveCount()
   })
   
   // Gauge for executor thread pool's approximate total number of tasks that have been completed
   metricRegistry.register(MetricRegistry.name("threadpool", "complete_task", "count"),
     new Gauge[Long] {
-      override def getValue: Long = executor.map(_.threadPool.getCompletedTaskCount()).getOrElse(0)
+      override def getValue: Long = executor.threadPool.getCompletedTaskCount()
   })
   
   // Gauge for executor thread pool's current number of threads
   metricRegistry.register(MetricRegistry.name("threadpool", "current_pool", "size"), 
     new Gauge[Int] {
-      override def getValue: Int = executor.map(_.threadPool.getPoolSize()).getOrElse(0)
+      override def getValue: Int = executor.threadPool.getPoolSize()
   })
   
   // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
   metricRegistry.register(MetricRegistry.name("threadpool", "max_pool", "size"), 
     new Gauge[Int] {
-      override def getValue: Int = executor.map(_.threadPool.getMaximumPoolSize()).getOrElse(0)
+      override def getValue: Int = executor.threadPool.getMaximumPoolSize()
     })
 
 }
\ No newline at end of file
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index 8b6ab0c3919255696ef95210ad33fd45ed319300..4961c42faddbcd3dd69a59cb4f509e99ba43ef0b 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -24,7 +24,6 @@ import spark.TaskState.TaskState
 import com.google.protobuf.ByteString
 import spark.{Utils, Logging}
 import spark.TaskState
-import spark.metrics.MetricsSystem
 
 private[spark] class MesosExecutorBackend
   extends MesosExecutor
@@ -33,9 +32,6 @@ private[spark] class MesosExecutorBackend
 
   var executor: Executor = null
   var driver: ExecutorDriver = null
-  
-  val executorInstrumentation = new ExecutorInstrumentation(Option(executor))
-  MesosExecutorBackend.metricsSystem.start()
 
   override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
     val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
@@ -83,17 +79,13 @@ private[spark] class MesosExecutorBackend
 
   override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
 
-  override def shutdown(d: ExecutorDriver) {
-    MesosExecutorBackend.metricsSystem.stop()
-  }
+  override def shutdown(d: ExecutorDriver) {}
 }
 
 /**
  * Entry point for Mesos executor.
  */
 private[spark] object MesosExecutorBackend {
-  private val metricsSystem = MetricsSystem.createMetricsSystem("executor")
-  
   def main(args: Array[String]) {
     MesosNativeLibrary.load()
     // Create a new Executor and start it running
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index 6ef74cd2ff2764a8b3fad24971f529d558a5ea4b..f4003da73298d07123572b12c74bea8696c44d3b 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -24,7 +24,6 @@ import spark.util.AkkaUtils
 import akka.actor.{ActorRef, Actor, Props, Terminated}
 import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
 import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
-import spark.metrics.MetricsSystem
 import spark.scheduler.cluster._
 import spark.scheduler.cluster.RegisteredExecutor
 import spark.scheduler.cluster.LaunchTask
@@ -46,8 +45,6 @@ private[spark] class StandaloneExecutorBackend(
 
   var executor: Executor = null
   var driver: ActorRef = null
-  
-  val executorInstrumentation = new ExecutorInstrumentation(Option(executor))
 
   override def preStart() {
     logInfo("Connecting to driver: " + driverUrl)
@@ -55,9 +52,6 @@ private[spark] class StandaloneExecutorBackend(
     driver ! RegisterExecutor(executorId, hostPort, cores)
     context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
     context.watch(driver) // Doesn't work with remote actors, but useful for testing
-    
-    StandaloneExecutorBackend.metricsSystem.registerSource(executorInstrumentation)
-    StandaloneExecutorBackend.metricsSystem.start()
   }
 
   override def receive = {
@@ -87,15 +81,9 @@ private[spark] class StandaloneExecutorBackend(
   override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
     driver ! StatusUpdate(executorId, taskId, state, data)
   }
-  
-  override def postStop() {
-    StandaloneExecutorBackend.metricsSystem.stop()
-  }
 }
 
 private[spark] object StandaloneExecutorBackend {
-  private val metricsSystem = MetricsSystem.createMetricsSystem("executor")
-  
   def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
     SparkHadoopUtil.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores))
   }