Skip to content
Snippets Groups Projects
Commit 7fb574bf authored by jerryshao's avatar jerryshao
Browse files

Code clean and remarshal

parent 4d6dd67f
No related branches found
No related tags found
No related merge requests found
...@@ -78,6 +78,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -78,6 +78,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
Master.metricsSystem.registerSource(masterInstrumentation) Master.metricsSystem.registerSource(masterInstrumentation)
Master.metricsSystem.start()
} }
override def postStop() { override def postStop() {
...@@ -321,22 +322,22 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -321,22 +322,22 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
removeWorker(worker) removeWorker(worker)
} }
} }
override def postStop() {
Master.metricsSystem.stop()
}
} }
private[spark] object Master { private[spark] object Master {
private val systemName = "sparkMaster" private val systemName = "sparkMaster"
private val actorName = "Master" private val actorName = "Master"
private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
private val metricsSystem = MetricsSystem.createMetricsSystem("master") private val metricsSystem = MetricsSystem.createMetricsSystem("master")
def main(argStrings: Array[String]) { def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings) val args = new MasterArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
metricsSystem.start()
actorSystem.awaitTermination() actorSystem.awaitTermination()
metricsSystem.stop()
} }
/** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */ /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
......
package spark.deploy.master package spark.deploy.master
import java.util.{Map, HashMap => JHashMap} import com.codahale.metrics.{Gauge,MetricRegistry}
import com.codahale.metrics.{Gauge, Metric}
import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
import spark.metrics.source.Source import spark.metrics.source.Source
import spark.Logging
private[spark] class MasterInstrumentation(val master: Master) extends Source { private[spark] class MasterInstrumentation(val master: Master) extends Source {
val className = classOf[Master].getName()
val instrumentationName = "master"
val metricRegistry = new MetricRegistry() val metricRegistry = new MetricRegistry()
val sourceName = instrumentationName val sourceName = "master"
metricRegistry.register(MetricRegistry.name("workers","number"), metricRegistry.register(MetricRegistry.name("workers","number"),
new Gauge[Int] { new Gauge[Int] {
......
package spark.deploy.worker package spark.deploy.worker
import com.codahale.metrics.{Gauge, Metric} import com.codahale.metrics.{Gauge, MetricRegistry}
import java.util.{Map, HashMap => JHashMap}
import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
import spark.metrics.source.Source import spark.metrics.source.Source
private[spark] class WorkerInstrumentation(val worker: Worker) extends Source { private[spark] class WorkerInstrumentation(val worker: Worker) extends Source {
val className = classOf[Worker].getName()
val sourceName = "worker" val sourceName = "worker"
val metricRegistry = new MetricRegistry() val metricRegistry = new MetricRegistry()
metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"), metricRegistry.register(MetricRegistry.name("executor", "number"),
new Gauge[Int] { new Gauge[Int] {
override def getValue: Int = worker.executors.size override def getValue: Int = worker.executors.size
}) })
// Gauge for cores used of this worker // Gauge for cores used of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"), metricRegistry.register(MetricRegistry.name("core_used", "number"),
new Gauge[Int] { new Gauge[Int] {
override def getValue: Int = worker.coresUsed override def getValue: Int = worker.coresUsed
}) })
// Gauge for memory used of this worker // Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"), metricRegistry.register(MetricRegistry.name("mem_used", "MBytes"),
new Gauge[Int] { new Gauge[Int] {
override def getValue: Int = worker.memoryUsed override def getValue: Int = worker.memoryUsed
}) })
// Gauge for cores free of this worker // Gauge for cores free of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"), metricRegistry.register(MetricRegistry.name("core_free", "number"),
new Gauge[Int] { new Gauge[Int] {
override def getValue: Int = worker.coresFree override def getValue: Int = worker.coresFree
}) })
// Gauge for memory used of this worker // Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"), metricRegistry.register(MetricRegistry.name("mem_free", "MBytes"),
new Gauge[Int] { new Gauge[Int] {
override def getValue: Int = worker.memoryFree override def getValue: Int = worker.memoryFree
}) })
......
...@@ -5,7 +5,6 @@ import scala.collection.mutable ...@@ -5,7 +5,6 @@ import scala.collection.mutable
import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry} import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
import java.util.Properties import java.util.Properties
//import java.util._
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import spark.Logging import spark.Logging
...@@ -20,7 +19,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin ...@@ -20,7 +19,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val sinks = new mutable.ArrayBuffer[Sink] val sinks = new mutable.ArrayBuffer[Sink]
val sources = new mutable.ArrayBuffer[Source] val sources = new mutable.ArrayBuffer[Source]
var registry = new MetricRegistry() val registry = new MetricRegistry()
registerSources() registerSources()
registerSinks() registerSinks()
...@@ -35,7 +34,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin ...@@ -35,7 +34,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
def registerSource(source: Source) { def registerSource(source: Source) {
sources += source sources += source
registry.register(source.sourceName,source.metricRegistry) registry.register(source.sourceName, source.metricRegistry)
} }
def registerSources() { def registerSources() {
......
...@@ -23,7 +23,6 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry) extend ...@@ -23,7 +23,6 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry) extend
.convertRatesTo(TimeUnit.SECONDS) .convertRatesTo(TimeUnit.SECONDS)
.build() .build()
override def start() { override def start() {
reporter.start(pollPeriod, pollUnit) reporter.start(pollPeriod, pollUnit)
} }
......
...@@ -30,7 +30,6 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si ...@@ -30,7 +30,6 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si
.convertRatesTo(TimeUnit.SECONDS) .convertRatesTo(TimeUnit.SECONDS)
.build(new File(pollDir)) .build(new File(pollDir))
override def start() { override def start() {
reporter.start(pollPeriod, pollUnit) reporter.start(pollPeriod, pollUnit)
} }
......
...@@ -2,6 +2,5 @@ package spark.metrics.sink ...@@ -2,6 +2,5 @@ package spark.metrics.sink
trait Sink { trait Sink {
def start: Unit def start: Unit
def stop: Unit def stop: Unit
} }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment