Skip to content
Snippets Groups Projects
Commit eaeb0f76 authored by Reynold Xin's avatar Reynold Xin
Browse files

Minor cleanup of metrics.Source

- Added override.
- Marked some variables as private.

Author: Reynold Xin <rxin@apache.org>

Closes #1943 from rxin/metricsSource and squashes the following commits:

fbfa943 [Reynold Xin] Minor cleanup of metrics.Source. - Added override. - Marked some variables as private.
parent 267fdffe
No related branches found
No related tags found
No related merge requests found
......@@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source
class ApplicationSource(val application: ApplicationInfo) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "%s.%s.%s".format("application", application.desc.name,
override val metricRegistry = new MetricRegistry()
override val sourceName = "%s.%s.%s".format("application", application.desc.name,
System.currentTimeMillis())
metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {
......
......@@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source
private[spark] class MasterSource(val master: Master) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "master"
override val metricRegistry = new MetricRegistry()
override val sourceName = "master"
// Gauge for worker numbers in cluster
metricRegistry.register(MetricRegistry.name("workers"), new Gauge[Int] {
......
......@@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source
private[spark] class WorkerSource(val worker: Worker) extends Source {
val sourceName = "worker"
val metricRegistry = new MetricRegistry()
override val sourceName = "worker"
override val metricRegistry = new MetricRegistry()
metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] {
override def getValue: Int = worker.executors.size
......
......@@ -35,9 +35,10 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String)
})
}
val metricRegistry = new MetricRegistry()
override val metricRegistry = new MetricRegistry()
// TODO: It would be nice to pass the application name here
val sourceName = "executor.%s".format(executorId)
override val sourceName = "executor.%s".format(executorId)
// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
......
......@@ -21,12 +21,9 @@ import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet}
private[spark] class JvmSource extends Source {
val sourceName = "jvm"
val metricRegistry = new MetricRegistry()
override val sourceName = "jvm"
override val metricRegistry = new MetricRegistry()
val gcMetricSet = new GarbageCollectorMetricSet
val memGaugeSet = new MemoryUsageGaugeSet
metricRegistry.registerAll(gcMetricSet)
metricRegistry.registerAll(memGaugeSet)
metricRegistry.registerAll(new GarbageCollectorMetricSet)
metricRegistry.registerAll(new MemoryUsageGaugeSet)
}
......@@ -24,8 +24,8 @@ import org.apache.spark.metrics.source.Source
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "%s.DAGScheduler".format(sc.appName)
override val metricRegistry = new MetricRegistry()
override val sourceName = "%s.DAGScheduler".format(sc.appName)
metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failedStages.size
......
......@@ -24,8 +24,8 @@ import org.apache.spark.metrics.source.Source
private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext)
extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "%s.BlockManager".format(sc.appName)
override val metricRegistry = new MetricRegistry()
override val sourceName = "%s.BlockManager".format(sc.appName)
metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] {
override def getValue: Long = {
......
......@@ -23,10 +23,10 @@ import org.apache.spark.metrics.source.Source
import org.apache.spark.streaming.ui.StreamingJobProgressListener
private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
val metricRegistry = new MetricRegistry
val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName)
override val metricRegistry = new MetricRegistry
override val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName)
val streamingListener = ssc.uiTab.listener
private val streamingListener = ssc.uiTab.listener
private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
defaultValue: T) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment