diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 89318712a5777eee2046f7ec1246b471148c666e..4f711a5ea652f059dfb25d939fb5ae201d90f7fc 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -282,8 +282,8 @@ class SparkContext( // Post init taskScheduler.postStartHook() - val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler) - val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager) + val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this) + val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this) def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d3658049945c2d252914aecbae314bbc7d840f66..ceae3b8289b5efa31ba4a6bf36caf6d672b328fd 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -98,7 +98,7 @@ private[spark] class Executor( } ) - val executorSource = new ExecutorSource(this) + val executorSource = new ExecutorSource(this, executorId) // Initialize Spark environment (using system properties read above) val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index bf8fb4fd21f6b5133c10bdde1032d7e9c27561a6..18c9dc1c0a9bbb5e974c49fa14c6c1cbae18f097 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -27,7 +27,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.metrics.source.Source -class ExecutorSource(val executor: Executor) extends Source { +class ExecutorSource(val executor: Executor, executorId: String) extends Source { private def fileStats(scheme: String) : Option[FileSystem.Statistics] = FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption @@ -39,7 +39,8 @@ class ExecutorSource(val executor: Executor) extends Source { } val metricRegistry = new MetricRegistry() - val sourceName = "executor" + // TODO: It would be nice to pass the application name here + val sourceName = "executor.%s".format(executorId) // Gauge for executor thread pool's actively executing task counts metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 22e3723ac8e4916d04882b74dd4c5ae8f002aefb..446d490cc9dde851046d50d61279d5129dd9b818 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -20,10 +20,12 @@ package org.apache.spark.scheduler import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.metrics.source.Source +import org.apache.spark.SparkContext -private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source { +private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext) + extends Source { val metricRegistry = new MetricRegistry() - val sourceName = "DAGScheduler" + val sourceName = "%s.DAGScheduler".format(sc.appName) metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 3d709cfde49b36af35a6c1c51190566f635b71fe..acc3951088a8de54c9c28dbf0fd2b71287970342 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -20,11 +20,13 @@ package org.apache.spark.storage import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.metrics.source.Source +import org.apache.spark.SparkContext -private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source { +private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext) + extends Source { val metricRegistry = new MetricRegistry() - val sourceName = "BlockManager" + val sourceName = "%s.BlockManager".format(sc.appName) metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] { override def getValue: Long = {