From 1daff54b2ed92d0bcee7030d7d3ab5c274f80d2f Mon Sep 17 00:00:00 2001
From: jerryshao <saisai.shao@intel.com>
Date: Tue, 2 Jul 2013 11:28:32 +0800
Subject: [PATCH] Change Executor MetricsSystem initialize code to SparkEnv

---
 core/src/main/scala/spark/SparkContext.scala         |  6 ++----
 core/src/main/scala/spark/SparkEnv.scala             |  9 +++++++++
 core/src/main/scala/spark/executor/Executor.scala    |  6 +++++-
 .../spark/executor/ExecutorInstrumentation.scala     | 10 +++++-----
 .../scala/spark/executor/MesosExecutorBackend.scala  | 10 +---------
 .../spark/executor/StandaloneExecutorBackend.scala   | 12 ------------
 6 files changed, 22 insertions(+), 31 deletions(-)

diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 1255d0c72e..f1d9d5e442 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -275,12 +275,10 @@ class SparkContext(
 
   val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
   val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
-  val metricsSystem = MetricsSystem.createMetricsSystem("driver")
 
   def initDriverMetrics() = {
-     metricsSystem.registerSource(dagSchedulerSource)
-     metricsSystem.registerSource(blockManagerSource)
-     metricsSystem.start()
+     SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
+     SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
   }
 
   initDriverMetrics()
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 125dc55bd8..7b3dc69348 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -54,6 +54,7 @@ class SparkEnv (
     val connectionManager: ConnectionManager,
     val httpFileServer: HttpFileServer,
     val sparkFilesDir: String,
+    val metricsSystem: metricsSystem,
     // To be set only as part of initialization of SparkContext.
     // (executorId, defaultHostPort) => executorHostPort
     // If executorId is NOT found, return defaultHostPort
@@ -69,6 +70,7 @@ class SparkEnv (
     broadcastManager.stop()
     blockManager.stop()
     blockManager.master.stop()
+    metricsSystem.stop()
     actorSystem.shutdown()
     // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
     // down, but let's call it anyway in case it gets fixed in a later release
@@ -185,6 +187,12 @@ object SparkEnv extends Logging {
     httpFileServer.initialize()
     System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
 
+    val metricsSystem = if (isDriver) {
+      MetricsSystem.createMetricsSystem("driver")
+    } else {
+      MetricsSystem.createMetricsSystem("executor")
+    }
+    metricsSystem.start()
 
     // Set the sparkFiles directory, used when downloading dependencies.  In local mode,
     // this is a temporary directory; in distributed mode, this is the executor's current working
@@ -215,6 +223,7 @@ object SparkEnv extends Logging {
       connectionManager,
       httpFileServer,
       sparkFilesDir,
+      metricsSystem,
       None)
   }
 }
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 2e81151882..7179ed84a8 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -86,10 +86,14 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
       }
     }
   )
+  
+  val executorInstrumentation = new ExecutorInstrumentation(this)
 
   // Initialize Spark environment (using system properties read above)
   val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
-  SparkEnv.set(env)
+  SparkEnv.set(env) 
+  env.metricsSystem.registerSource(executorInstrumentation)
+  
   private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
 
   // Start worker thread pool
diff --git a/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala b/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala
index 80aadb66b0..ebbcbee742 100644
--- a/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala
+++ b/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala
@@ -4,32 +4,32 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
 
 import spark.metrics.source.Source
 
-class ExecutorInstrumentation(val executor: Option[Executor]) extends Source{
+class ExecutorInstrumentation(val executor: Executor) extends Source{
   val metricRegistry = new MetricRegistry()
   val sourceName = "executor"
   
   // Gauge for executor thread pool's actively executing task counts
   metricRegistry.register(MetricRegistry.name("threadpool", "active_task", "number"), 
     new Gauge[Int] {
-      override def getValue: Int = executor.map(_.threadPool.getActiveCount()).getOrElse(0)
+      override def getValue: Int = executor.threadPool.getActiveCount()
   })
   
   // Gauge for executor thread pool's approximate total number of tasks that have been completed
   metricRegistry.register(MetricRegistry.name("threadpool", "complete_task", "count"),
     new Gauge[Long] {
-      override def getValue: Long = executor.map(_.threadPool.getCompletedTaskCount()).getOrElse(0)
+      override def getValue: Long = executor.threadPool.getCompletedTaskCount()
   })
   
   // Gauge for executor thread pool's current number of threads
   metricRegistry.register(MetricRegistry.name("threadpool", "current_pool", "size"), 
     new Gauge[Int] {
-      override def getValue: Int = executor.map(_.threadPool.getPoolSize()).getOrElse(0)
+      override def getValue: Int = executor.threadPool.getPoolSize()
   })
   
   // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
   metricRegistry.register(MetricRegistry.name("threadpool", "max_pool", "size"), 
     new Gauge[Int] {
-      override def getValue: Int = executor.map(_.threadPool.getMaximumPoolSize()).getOrElse(0)
+      override def getValue: Int = executor.threadPool.getMaximumPoolSize()
     })
 
 }
\ No newline at end of file
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index 8b6ab0c391..4961c42fad 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -24,7 +24,6 @@ import spark.TaskState.TaskState
 import com.google.protobuf.ByteString
 import spark.{Utils, Logging}
 import spark.TaskState
-import spark.metrics.MetricsSystem
 
 private[spark] class MesosExecutorBackend
   extends MesosExecutor
@@ -33,9 +32,6 @@ private[spark] class MesosExecutorBackend
 
   var executor: Executor = null
   var driver: ExecutorDriver = null
-  
-  val executorInstrumentation = new ExecutorInstrumentation(Option(executor))
-  MesosExecutorBackend.metricsSystem.start()
 
   override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
     val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
@@ -83,17 +79,13 @@ private[spark] class MesosExecutorBackend
 
   override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
 
-  override def shutdown(d: ExecutorDriver) {
-    MesosExecutorBackend.metricsSystem.stop()
-  }
+  override def shutdown(d: ExecutorDriver) {}
 }
 
 /**
  * Entry point for Mesos executor.
  */
 private[spark] object MesosExecutorBackend {
-  private val metricsSystem = MetricsSystem.createMetricsSystem("executor")
-  
   def main(args: Array[String]) {
     MesosNativeLibrary.load()
     // Create a new Executor and start it running
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index 6ef74cd2ff..f4003da732 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -24,7 +24,6 @@ import spark.util.AkkaUtils
 import akka.actor.{ActorRef, Actor, Props, Terminated}
 import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
 import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
-import spark.metrics.MetricsSystem
 import spark.scheduler.cluster._
 import spark.scheduler.cluster.RegisteredExecutor
 import spark.scheduler.cluster.LaunchTask
@@ -46,8 +45,6 @@ private[spark] class StandaloneExecutorBackend(
 
   var executor: Executor = null
   var driver: ActorRef = null
-  
-  val executorInstrumentation = new ExecutorInstrumentation(Option(executor))
 
   override def preStart() {
     logInfo("Connecting to driver: " + driverUrl)
@@ -55,9 +52,6 @@ private[spark] class StandaloneExecutorBackend(
     driver ! RegisterExecutor(executorId, hostPort, cores)
     context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
     context.watch(driver) // Doesn't work with remote actors, but useful for testing
-    
-    StandaloneExecutorBackend.metricsSystem.registerSource(executorInstrumentation)
-    StandaloneExecutorBackend.metricsSystem.start()
   }
 
   override def receive = {
@@ -87,15 +81,9 @@ private[spark] class StandaloneExecutorBackend(
   override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
     driver ! StatusUpdate(executorId, taskId, state, data)
   }
-  
-  override def postStop() {
-    StandaloneExecutorBackend.metricsSystem.stop()
-  }
 }
 
 private[spark] object StandaloneExecutorBackend {
-  private val metricsSystem = MetricsSystem.createMetricsSystem("executor")
-  
   def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
     SparkHadoopUtil.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores))
   }
-- 
GitLab