From b4e382c210b4987da78421f5de11199e4d74f0e7 Mon Sep 17 00:00:00 2001 From: Patrick Wendell <pwendell@gmail.com> Date: Sun, 8 Sep 2013 16:06:49 -0700 Subject: [PATCH] Adding sc name in metrics source --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 ++-- .../src/main/scala/org/apache/spark/executor/Executor.scala | 2 +- .../scala/org/apache/spark/executor/ExecutorSource.scala | 5 +++-- .../org/apache/spark/scheduler/DAGSchedulerSource.scala | 6 ++++-- .../scala/org/apache/spark/storage/BlockManagerSource.scala | 6 ++++-- 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 89318712a5..4f711a5ea6 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -282,8 +282,8 @@ class SparkContext( // Post init taskScheduler.postStartHook() - val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler) - val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager) + val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this) + val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this) def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d365804994..ceae3b8289 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -98,7 +98,7 @@ private[spark] class Executor( } ) - val executorSource = new ExecutorSource(this) + val executorSource = new ExecutorSource(this, executorId) // Initialize Spark environment (using system properties read above) val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index bf8fb4fd21..18c9dc1c0a 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -27,7 +27,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.metrics.source.Source -class ExecutorSource(val executor: Executor) extends Source { +class ExecutorSource(val executor: Executor, executorId: String) extends Source { private def fileStats(scheme: String) : Option[FileSystem.Statistics] = FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption @@ -39,7 +39,8 @@ class ExecutorSource(val executor: Executor) extends Source { } val metricRegistry = new MetricRegistry() - val sourceName = "executor" + // TODO: It would be nice to pass the application name here + val sourceName = "executor.%s".format(executorId) // Gauge for executor thread pool's actively executing task counts metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 22e3723ac8..446d490cc9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -20,10 +20,12 @@ package org.apache.spark.scheduler import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.metrics.source.Source +import org.apache.spark.SparkContext -private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source { +private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext) + extends Source { val metricRegistry = new MetricRegistry() - val sourceName = "DAGScheduler" + val sourceName = "%s.DAGScheduler".format(sc.appName) metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 3d709cfde4..acc3951088 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -20,11 +20,13 @@ package org.apache.spark.storage import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.metrics.source.Source +import org.apache.spark.SparkContext -private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source { +private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext) + extends Source { val metricRegistry = new MetricRegistry() - val sourceName = "BlockManager" + val sourceName = "%s.BlockManager".format(sc.appName) metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] { override def getValue: Long = { -- GitLab