Skip to content
Snippets Groups Projects
Commit 503acd3a authored by jerryshao's avatar jerryshao
Browse files

Build metrics system framwork

parent b0113290
No related branches found
No related tags found
No related merge requests found
# syntax: [prefix].[sink].[instance].[options]
*.sink.console.period=10
*.sink.console.unit=second
master.sink.console.period=10
master.sink.console.unit=second
worker.sink.console.period=20
worker.sink.console.unit=second
package spark.metrics
import scala.collection.mutable
import com.codahale.metrics.{JmxReporter, MetricRegistry}
import java.util.Properties
import java.util.concurrent.TimeUnit
import spark.Logging
import spark.metrics.sink._
trait AbstractInstrumentation extends Logging {
initLogging()
def registryHandler: MetricRegistry
def instance: String
val confFile = System.getProperty("spark.metrics.conf.file", MetricsConfig.DEFAULT_CONFIG_FILE)
val metricsConfig = new MetricsConfig(confFile)
val sinks = new mutable.ArrayBuffer[Sink]
def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
val sinkConfigs = MetricsConfig.subProperties(instConfig, AbstractInstrumentation.SINK_REGEX)
// Register JMX sink as a default sink
sinks += new JmxSink(registryHandler)
// Register other sinks according to conf
sinkConfigs.foreach { kv =>
val classPath = if (AbstractInstrumentation.DEFAULT_SINKS.contains(kv._1)) {
AbstractInstrumentation.DEFAULT_SINKS(kv._1)
} else {
kv._2.getProperty("class")
}
try {
val sink = Class.forName(classPath).getConstructor(classOf[Properties], classOf[MetricRegistry])
.newInstance(kv._2, registryHandler)
sinks += sink.asInstanceOf[Sink]
} catch {
case e: Exception => logError("class " + classPath + "cannot be instantialize", e)
}
}
sinks.foreach(_.registerSink)
}
def unregisterSinks() {
sinks.foreach(_.unregisterSink)
}
}
object AbstractInstrumentation {
val DEFAULT_SINKS = Map(
"console" -> "spark.metrics.sink.ConsoleSink",
"csv" -> "spark.metrics.sink.CsvSink")
val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
val timeUnits = Map(
"millisecond" -> TimeUnit.MILLISECONDS,
"second" -> TimeUnit.SECONDS,
"minute" -> TimeUnit.MINUTES,
"hour" -> TimeUnit.HOURS,
"day" -> TimeUnit.DAYS)
}
\ No newline at end of file
package spark.metrics
import java.util.Properties
import java.io.FileInputStream
import scala.collection.mutable
import scala.util.matching.Regex
class MetricsConfig(val configFile: String) {
val properties = new Properties()
var fis: FileInputStream = _
try {
fis = new FileInputStream(configFile)
properties.load(fis)
} finally {
fis.close()
}
val propertyCategories = MetricsConfig.subProperties(properties, MetricsConfig.INSTANCE_REGEX)
if (propertyCategories.contains(MetricsConfig.DEFAULT_PREFIX)) {
import scala.collection.JavaConversions._
val defaultProperty = propertyCategories(MetricsConfig.DEFAULT_PREFIX)
for ((inst, prop) <- propertyCategories; p <- defaultProperty
if inst != MetricsConfig.DEFAULT_PREFIX; if prop.getProperty(p._1) == null) {
prop.setProperty(p._1, p._2)
}
}
def getInstance(inst: String) = {
propertyCategories.get(inst) match {
case Some(s) => s
case None => propertyCategories(MetricsConfig.DEFAULT_PREFIX)
}
}
}
object MetricsConfig {
val DEFAULT_CONFIG_FILE = "/home/jerryshao/project/sotc_cloud-spark/conf/metrics.properties"
val DEFAULT_PREFIX = "*"
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
def subProperties(prop: Properties, regex: Regex) = {
val subProperties = new mutable.HashMap[String, Properties]
import scala.collection.JavaConversions._
prop.foreach { kv =>
val regex(a, b) = kv._1
subProperties.getOrElseUpdate(a, new Properties).setProperty(b, kv._2)
println(">>>>>subProperties added " + a + " " + b + " " + kv._2)
}
subProperties
}
}
\ No newline at end of file
package spark.metrics.sink
import java.util.Properties
import java.util.concurrent.TimeUnit
import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
import spark.metrics.AbstractInstrumentation
class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
val pollPeriod = Option(property.getProperty(ConsoleSink.CONSOLE_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => ConsoleSink.CONSOLE_DEFAULT_PERIOD.toInt
}
val pollUnit = Option(property.getProperty(ConsoleSink.CONSOLE_KEY_UNIT)) match {
case Some(s) => AbstractInstrumentation.timeUnits(s)
case None => AbstractInstrumentation.timeUnits(ConsoleSink.CONSOLE_DEFAULT_UNIT)
}
var reporter: ConsoleReporter = _
override def registerSink() {
reporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
reporter.start(pollPeriod, pollUnit)
}
override def unregisterSink() {
reporter.stop()
}
}
object ConsoleSink {
val CONSOLE_DEFAULT_PERIOD = "10"
val CONSOLE_DEFAULT_UNIT = "second"
val CONSOLE_KEY_PERIOD = "period"
val CONSOLE_KEY_UNIT = "unit"
}
\ No newline at end of file
package spark.metrics.sink
import java.io.File
import java.util.{Locale, Properties}
import java.util.concurrent.TimeUnit
import com.codahale.metrics.{CsvReporter, MetricRegistry}
import spark.metrics.AbstractInstrumentation
class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink {
val pollPeriod = Option(property.getProperty(CsvSink.CSV_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => CsvSink.CSV_DEFAULT_PERIOD.toInt
}
val pollUnit = Option(property.getProperty(CsvSink.CSV_KEY_UNIT)) match {
case Some(s) => AbstractInstrumentation.timeUnits(s)
case None => AbstractInstrumentation.timeUnits(CsvSink.CSV_DEFAULT_UNIT)
}
val pollDir = Option(property.getProperty(CsvSink.CSV_KEY_DIR)) match {
case Some(s) => s
case None => CsvSink.CSV_DEFAULT_DIR
}
var reporter: CsvReporter = _
override def registerSink() {
reporter = CsvReporter.forRegistry(registry)
.formatFor(Locale.US)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build(new File(pollDir))
reporter.start(pollPeriod, pollUnit)
}
override def unregisterSink() {
reporter.stop()
}
}
object CsvSink {
val CSV_KEY_PERIOD = "period"
val CSV_KEY_UNIT = "unit"
val CSV_KEY_DIR = "directory"
val CSV_DEFAULT_PERIOD = "1"
val CSV_DEFAULT_UNIT = "minute"
val CSV_DEFAULT_DIR = "/tmp/"
}
package spark.metrics.sink
import com.codahale.metrics.{JmxReporter, MetricRegistry}
class JmxSink(registry: MetricRegistry) extends Sink {
var reporter: JmxReporter = _
override def registerSink() {
reporter = JmxReporter.forRegistry(registry).build()
reporter.start()
}
override def unregisterSink() {
reporter.stop()
}
}
\ No newline at end of file
package spark.metrics.sink
trait Sink {
def registerSink: Unit
def unregisterSink: Unit
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment