Skip to content
Snippets Groups Projects
Commit 5ce5dc9f authored by jerryshao's avatar jerryshao
Browse files

Add default properties to deal with no configure file situation

parent 871bc168
No related branches found
No related tags found
No related merge requests found
package spark.metrics
import java.util.Properties
import java.io.FileInputStream
import java.io.{File, FileInputStream}
import scala.collection.mutable
import scala.util.matching.Regex
private [spark] class MetricsConfig(val configFile: String) {
val properties = new Properties()
var fis: FileInputStream = _
// Add default properties in case there's no properties file
MetricsConfig.setDefaultProperties(properties)
try {
fis = new FileInputStream(configFile)
properties.load(fis)
} finally {
fis.close()
val confFile = new File(configFile)
if (confFile.exists()) {
var fis: FileInputStream = null
try {
fis = new FileInputStream(configFile)
properties.load(fis)
} finally {
fis.close()
}
}
val propertyCategories = MetricsConfig.subProperties(properties, MetricsConfig.INSTANCE_REGEX)
......@@ -35,11 +40,15 @@ private [spark] class MetricsConfig(val configFile: String) {
}
}
object MetricsConfig {
val DEFAULT_CONFIG_FILE = "conf/metrics.properties"
private[spark] object MetricsConfig {
val DEFAULT_PREFIX = "*"
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.jmx.enabled", "default")
prop.setProperty("*.source.jvm.class", "spark.metrics.source.JvmSource")
}
def subProperties(prop: Properties, regex: Regex) = {
val subProperties = new mutable.HashMap[String, Properties]
......@@ -48,7 +57,6 @@ object MetricsConfig {
if (regex.findPrefixOf(kv._1) != None) {
val regex(prefix, suffix) = kv._1
subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
println(">>>>>subProperties added " + prefix + " " + suffix + " " + kv._2)
}
}
......
......@@ -14,7 +14,7 @@ import spark.metrics.source._
private[spark] class MetricsSystem private (val instance: String) extends Logging {
initLogging()
val confFile = System.getProperty("spark.metrics.conf.file", MetricsConfig.DEFAULT_CONFIG_FILE)
val confFile = System.getProperty("spark.metrics.conf.file", "unsupported")
val metricsConfig = new MetricsConfig(confFile)
val sinks = new mutable.ArrayBuffer[Sink]
......@@ -58,9 +58,6 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val instConfig = metricsConfig.getInstance(instance)
val sinkConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
// Register JMX sink as a default sink
sinks += new JmxSink(registry)
// Register other sinks according to conf
sinkConfigs.foreach { kv =>
val classPath = if (MetricsSystem.DEFAULT_SINKS.contains(kv._1)) {
......@@ -81,9 +78,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
}
private[spark] object MetricsSystem {
val DEFAULT_SINKS = Map(
"console" -> "spark.metrics.sink.ConsoleSink",
"csv" -> "spark.metrics.sink.CsvSink")
val DEFAULT_SINKS = Map("jmx" -> "spark.metrics.sink.JmxSink")
val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
......
package spark.metrics.sink
import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
import java.util.Properties
import java.util.concurrent.TimeUnit
import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
import spark.metrics.MetricsSystem
class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
......@@ -18,7 +18,7 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry) extend
case None => MetricsSystem.timeUnits(ConsoleSink.CONSOLE_DEFAULT_UNIT)
}
var reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
......
package spark.metrics.sink
import com.codahale.metrics.{CsvReporter, MetricRegistry}
import java.io.File
import java.util.{Locale, Properties}
import java.util.concurrent.TimeUnit
import com.codahale.metrics.{CsvReporter, MetricRegistry}
import spark.metrics.MetricsSystem
class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink {
......@@ -24,7 +24,7 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si
case None => CsvSink.CSV_DEFAULT_DIR
}
var reporter: CsvReporter = CsvReporter.forRegistry(registry)
val reporter: CsvReporter = CsvReporter.forRegistry(registry)
.formatFor(Locale.US)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
......
package spark.metrics.sink
import java.util.Properties
import com.codahale.metrics.{JmxReporter, MetricRegistry}
class JmxSink(registry: MetricRegistry) extends Sink {
var reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
class JmxSink(val property: Properties, val registry: MetricRegistry) extends Sink {
val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
override def start() {
reporter.start()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment