diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index 8a4f4e48335bd72153acd0c4629897d8fa92a03c..aeb76c9b2f6eab8a2196953f900472ff33308547 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -93,6 +93,7 @@ # period 10 Poll period # unit seconds Unit of the poll period # ttl 1 TTL of messages sent by Ganglia +# dmax 0 Lifetime in seconds of metrics (0 never expired) # mode multicast Ganglia network mode ('unicast' or 'multicast') # org.apache.spark.metrics.sink.JmxSink diff --git a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala index 3b1880e1435134208d7b66c086d8e303b62f62bb..0cd795f638870b26e7666b82b7af1b4ec04e47ee 100644 --- a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -46,6 +46,9 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry, val GANGLIA_KEY_HOST = "host" val GANGLIA_KEY_PORT = "port" + val GANGLIA_KEY_DMAX = "dmax" + val GANGLIA_DEFAULT_DMAX = 0 + def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop)) if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) { @@ -59,6 +62,7 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry, val host = propertyToOption(GANGLIA_KEY_HOST).get val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) + val dmax = propertyToOption(GANGLIA_KEY_DMAX).map(_.toInt).getOrElse(GANGLIA_DEFAULT_DMAX) val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE) .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE) val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) @@ -73,6 +77,7 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry, val reporter: GangliaReporter = GangliaReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) + .withDMax(dmax) .build(ganglia) override def start() {