diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index e44f5e316802ad9aed2e2a654ba0d18417a1597c..cc0b2d429536ba56084cc162b4765a32f637829b 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -78,6 +78,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) Master.metricsSystem.registerSource(masterInstrumentation) + Master.metricsSystem.start() } override def postStop() { @@ -321,22 +322,22 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act removeWorker(worker) } } + + override def postStop() { + Master.metricsSystem.stop() + } } private[spark] object Master { private val systemName = "sparkMaster" private val actorName = "Master" private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r - private val metricsSystem = MetricsSystem.createMetricsSystem("master") def main(argStrings: Array[String]) { val args = new MasterArguments(argStrings) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort) - - metricsSystem.start() actorSystem.awaitTermination() - metricsSystem.stop() } /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */ diff --git a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala index 46c90b94d22a6074191388ce26ca79e64851366b..61a561c9557502d52cd611af28f60beecfd8d1f0 100644 --- a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala +++ b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala @@ -1,19 +1,12 @@ package spark.deploy.master -import java.util.{Map, HashMap => JHashMap} - -import com.codahale.metrics.{Gauge, Metric} - -import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry} +import com.codahale.metrics.{Gauge,MetricRegistry} import spark.metrics.source.Source -import spark.Logging private[spark] class MasterInstrumentation(val master: Master) extends Source { - val className = classOf[Master].getName() - val instrumentationName = "master" val metricRegistry = new MetricRegistry() - val sourceName = instrumentationName + val sourceName = "master" metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] { diff --git a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala index 5ce29cf04c8cc8643cf5fc3c8ad2ef575a079757..94c20a98c184fb1372568f957620f540c04b5c83 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala @@ -1,44 +1,38 @@ package spark.deploy.worker -import com.codahale.metrics.{Gauge, Metric} +import com.codahale.metrics.{Gauge, MetricRegistry} -import java.util.{Map, HashMap => JHashMap} - -import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry} import spark.metrics.source.Source private[spark] class WorkerInstrumentation(val worker: Worker) extends Source { - val className = classOf[Worker].getName() - val sourceName = "worker" - val metricRegistry = new MetricRegistry() - metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"), + metricRegistry.register(MetricRegistry.name("executor", "number"), new Gauge[Int] { override def getValue: Int = worker.executors.size }) // Gauge for cores used of this worker - metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"), + metricRegistry.register(MetricRegistry.name("core_used", "number"), new Gauge[Int] { override def getValue: Int = worker.coresUsed }) // Gauge for memory used of this worker - metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"), + metricRegistry.register(MetricRegistry.name("mem_used", "MBytes"), new Gauge[Int] { override def getValue: Int = worker.memoryUsed }) // Gauge for cores free of this worker - metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"), + metricRegistry.register(MetricRegistry.name("core_free", "number"), new Gauge[Int] { override def getValue: Int = worker.coresFree }) // Gauge for memory used of this worker - metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"), + metricRegistry.register(MetricRegistry.name("mem_free", "MBytes"), new Gauge[Int] { override def getValue: Int = worker.memoryFree }) diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala index a23ccd2692755f34ebcda3f407f4beec622ca00a..5bfdc00eaff320154fca6d666cb4b47b1403671d 100644 --- a/core/src/main/scala/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala @@ -5,7 +5,6 @@ import scala.collection.mutable import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry} import java.util.Properties -//import java.util._ import java.util.concurrent.TimeUnit import spark.Logging @@ -20,7 +19,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin val sinks = new mutable.ArrayBuffer[Sink] val sources = new mutable.ArrayBuffer[Source] - var registry = new MetricRegistry() + val registry = new MetricRegistry() registerSources() registerSinks() @@ -35,7 +34,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin def registerSource(source: Source) { sources += source - registry.register(source.sourceName,source.metricRegistry) + registry.register(source.sourceName, source.metricRegistry) } def registerSources() { diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala index 9cd17556fa12cadc02020d746aa68a1317d4280b..e2e4197d1dcad0aab2cf7286f1e2ee056329ba21 100644 --- a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala @@ -23,7 +23,6 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry) extend .convertRatesTo(TimeUnit.SECONDS) .build() - override def start() { reporter.start(pollPeriod, pollUnit) } diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala index 62e51be0dc32b234ef69d10400358c49f75af317..c2d645331c211992a0d5fa23fff542e4147aaba8 100644 --- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala @@ -30,7 +30,6 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si .convertRatesTo(TimeUnit.SECONDS) .build(new File(pollDir)) - override def start() { reporter.start(pollPeriod, pollUnit) } diff --git a/core/src/main/scala/spark/metrics/sink/Sink.scala b/core/src/main/scala/spark/metrics/sink/Sink.scala index 9fef894fde1cf4c59447677c310773ef77bc4e4c..3ffdcbdaba41459f7b46ac8c49e48646a6130cfd 100644 --- a/core/src/main/scala/spark/metrics/sink/Sink.scala +++ b/core/src/main/scala/spark/metrics/sink/Sink.scala @@ -2,6 +2,5 @@ package spark.metrics.sink trait Sink { def start: Unit - def stop: Unit } \ No newline at end of file