diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index aeb76c9b2f6eab8a2196953f900472ff33308547..4c008a13607c25cad5e83cc1164eeef25ce4a1fb 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -118,6 +118,14 @@ # prefix EMPTY STRING Prefix to prepend to every metric's name # protocol tcp Protocol ("tcp" or "udp") to use +# org.apache.spark.metrics.sink.StatsdSink +# Name: Default: Description: +# host 127.0.0.1 Hostname or IP of StatsD server +# port 8125 Port of StatsD server +# period 10 Poll period +# unit seconds Units of poll period +# prefix EMPTY STRING Prefix to prepend to metric name + ## Examples # Enable JmxSink for all instances by class name #*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink @@ -125,6 +133,10 @@ # Enable ConsoleSink for all instances by class name #*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink +# Enable StatsdSink for all instances by class name +#*.sink.statsd.class=org.apache.spark.metrics.sink.StatsdSink +#*.sink.statsd.prefix=spark + # Polling period for the ConsoleSink #*.sink.console.period=10 # Unit of the polling period for the ConsoleSink diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala new file mode 100644 index 0000000000000000000000000000000000000000..ba75aa1c65cc6290aa9c35bbbcdf0595b23626a9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.io.IOException +import java.net.{DatagramPacket, DatagramSocket, InetSocketAddress} +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.SortedMap +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.util.{Failure, Success, Try} + +import com.codahale.metrics._ +import org.apache.hadoop.net.NetUtils + +import org.apache.spark.internal.Logging + +/** + * @see <a href="https://github.com/etsy/statsd/blob/master/docs/metric_types.md"> + * StatsD metric types</a> + */ +private[spark] object StatsdMetricType { + val COUNTER = "c" + val GAUGE = "g" + val TIMER = "ms" + val Set = "s" +} + +private[spark] class StatsdReporter( + registry: MetricRegistry, + host: String = "127.0.0.1", + port: Int = 8125, + prefix: String = "", + filter: MetricFilter = MetricFilter.ALL, + rateUnit: TimeUnit = TimeUnit.SECONDS, + durationUnit: TimeUnit = TimeUnit.MILLISECONDS) + extends ScheduledReporter(registry, "statsd-reporter", filter, rateUnit, durationUnit) + with Logging { + + import StatsdMetricType._ + + private val address = new InetSocketAddress(host, port) + private val whitespace = "[\\s]+".r + + override def report( + gauges: SortedMap[String, Gauge[_]], + counters: SortedMap[String, Counter], + histograms: SortedMap[String, Histogram], + meters: SortedMap[String, Meter], + timers: SortedMap[String, Timer]): Unit = + Try(new DatagramSocket) match { + case Failure(ioe: IOException) => logWarning("StatsD datagram socket construction failed", + NetUtils.wrapException(host, port, NetUtils.getHostname(), 0, ioe)) + case Failure(e) => logWarning("StatsD datagram socket construction failed", e) + case Success(s) => + implicit val socket = s + val localAddress = Try(socket.getLocalAddress).map(_.getHostAddress).getOrElse(null) + val localPort = socket.getLocalPort + Try { + gauges.entrySet.asScala.foreach(e => reportGauge(e.getKey, e.getValue)) + counters.entrySet.asScala.foreach(e => reportCounter(e.getKey, e.getValue)) + histograms.entrySet.asScala.foreach(e => reportHistogram(e.getKey, e.getValue)) + meters.entrySet.asScala.foreach(e => reportMetered(e.getKey, e.getValue)) + timers.entrySet.asScala.foreach(e => reportTimer(e.getKey, e.getValue)) + } recover { + case ioe: IOException => + logDebug(s"Unable to send packets to StatsD", NetUtils.wrapException( + address.getHostString, address.getPort, localAddress, localPort, ioe)) + case e: Throwable => logDebug(s"Unable to send packets to StatsD at '$host:$port'", e) + } + Try(socket.close()) recover { + case ioe: IOException => + logDebug("Error when close socket to StatsD", NetUtils.wrapException( + address.getHostString, address.getPort, localAddress, localPort, ioe)) + case e: Throwable => logDebug("Error when close socket to StatsD", e) + } + } + + private def reportGauge(name: String, gauge: Gauge[_])(implicit socket: DatagramSocket): Unit = + formatAny(gauge.getValue).foreach(v => send(fullName(name), v, GAUGE)) + + private def reportCounter(name: String, counter: Counter)(implicit socket: DatagramSocket): Unit = + send(fullName(name), format(counter.getCount), COUNTER) + + private def reportHistogram(name: String, histogram: Histogram) + (implicit socket: DatagramSocket): Unit = { + val snapshot = histogram.getSnapshot + send(fullName(name, "count"), format(histogram.getCount), GAUGE) + send(fullName(name, "max"), format(snapshot.getMax), TIMER) + send(fullName(name, "mean"), format(snapshot.getMean), TIMER) + send(fullName(name, "min"), format(snapshot.getMin), TIMER) + send(fullName(name, "stddev"), format(snapshot.getStdDev), TIMER) + send(fullName(name, "p50"), format(snapshot.getMedian), TIMER) + send(fullName(name, "p75"), format(snapshot.get75thPercentile), TIMER) + send(fullName(name, "p95"), format(snapshot.get95thPercentile), TIMER) + send(fullName(name, "p98"), format(snapshot.get98thPercentile), TIMER) + send(fullName(name, "p99"), format(snapshot.get99thPercentile), TIMER) + send(fullName(name, "p999"), format(snapshot.get999thPercentile), TIMER) + } + + private def reportMetered(name: String, meter: Metered)(implicit socket: DatagramSocket): Unit = { + send(fullName(name, "count"), format(meter.getCount), GAUGE) + send(fullName(name, "m1_rate"), format(convertRate(meter.getOneMinuteRate)), TIMER) + send(fullName(name, "m5_rate"), format(convertRate(meter.getFiveMinuteRate)), TIMER) + send(fullName(name, "m15_rate"), format(convertRate(meter.getFifteenMinuteRate)), TIMER) + send(fullName(name, "mean_rate"), format(convertRate(meter.getMeanRate)), TIMER) + } + + private def reportTimer(name: String, timer: Timer)(implicit socket: DatagramSocket): Unit = { + val snapshot = timer.getSnapshot + send(fullName(name, "max"), format(convertDuration(snapshot.getMax)), TIMER) + send(fullName(name, "mean"), format(convertDuration(snapshot.getMean)), TIMER) + send(fullName(name, "min"), format(convertDuration(snapshot.getMin)), TIMER) + send(fullName(name, "stddev"), format(convertDuration(snapshot.getStdDev)), TIMER) + send(fullName(name, "p50"), format(convertDuration(snapshot.getMedian)), TIMER) + send(fullName(name, "p75"), format(convertDuration(snapshot.get75thPercentile)), TIMER) + send(fullName(name, "p95"), format(convertDuration(snapshot.get95thPercentile)), TIMER) + send(fullName(name, "p98"), format(convertDuration(snapshot.get98thPercentile)), TIMER) + send(fullName(name, "p99"), format(convertDuration(snapshot.get99thPercentile)), TIMER) + send(fullName(name, "p999"), format(convertDuration(snapshot.get999thPercentile)), TIMER) + + reportMetered(name, timer) + } + + private def send(name: String, value: String, metricType: String) + (implicit socket: DatagramSocket): Unit = { + val bytes = sanitize(s"$name:$value|$metricType").getBytes(UTF_8) + val packet = new DatagramPacket(bytes, bytes.length, address) + socket.send(packet) + } + + private def fullName(names: String*): String = MetricRegistry.name(prefix, names : _*) + + private def sanitize(s: String): String = whitespace.replaceAllIn(s, "-") + + private def format(v: Any): String = formatAny(v).getOrElse("") + + private def formatAny(v: Any): Option[String] = + v match { + case f: Float => Some("%2.2f".format(f)) + case d: Double => Some("%2.2f".format(d)) + case b: BigDecimal => Some("%2.2f".format(b)) + case n: Number => Some(v.toString) + case _ => None + } +} + diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala new file mode 100644 index 0000000000000000000000000000000000000000..859a2f6bcd456098821a6dfc4dd2f30e98aefff7 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.util.Properties +import java.util.concurrent.TimeUnit + +import com.codahale.metrics.MetricRegistry + +import org.apache.spark.SecurityManager +import org.apache.spark.internal.Logging +import org.apache.spark.metrics.MetricsSystem + +private[spark] object StatsdSink { + val STATSD_KEY_HOST = "host" + val STATSD_KEY_PORT = "port" + val STATSD_KEY_PERIOD = "period" + val STATSD_KEY_UNIT = "unit" + val STATSD_KEY_PREFIX = "prefix" + + val STATSD_DEFAULT_HOST = "127.0.0.1" + val STATSD_DEFAULT_PORT = "8125" + val STATSD_DEFAULT_PERIOD = "10" + val STATSD_DEFAULT_UNIT = "SECONDS" + val STATSD_DEFAULT_PREFIX = "" +} + +private[spark] class StatsdSink( + val property: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager) + extends Sink with Logging { + import StatsdSink._ + + val host = property.getProperty(STATSD_KEY_HOST, STATSD_DEFAULT_HOST) + val port = property.getProperty(STATSD_KEY_PORT, STATSD_DEFAULT_PORT).toInt + + val pollPeriod = property.getProperty(STATSD_KEY_PERIOD, STATSD_DEFAULT_PERIOD).toInt + val pollUnit = + TimeUnit.valueOf(property.getProperty(STATSD_KEY_UNIT, STATSD_DEFAULT_UNIT).toUpperCase) + + val prefix = property.getProperty(STATSD_KEY_PREFIX, STATSD_DEFAULT_PREFIX) + + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + + val reporter = new StatsdReporter(registry, host, port, prefix) + + override def start(): Unit = { + reporter.start(pollPeriod, pollUnit) + logInfo(s"StatsdSink started with prefix: '$prefix'") + } + + override def stop(): Unit = { + reporter.stop() + logInfo("StatsdSink stopped.") + } + + override def report(): Unit = reporter.report() +} + diff --git a/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala b/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..0e21a36071c423f94894dbebb9d0b8d305e06dec --- /dev/null +++ b/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.net.{DatagramPacket, DatagramSocket} +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.Properties +import java.util.concurrent.TimeUnit._ + +import com.codahale.metrics._ + +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.metrics.sink.StatsdSink._ + +class StatsdSinkSuite extends SparkFunSuite { + private val securityMgr = new SecurityManager(new SparkConf(false)) + private val defaultProps = Map( + STATSD_KEY_PREFIX -> "spark", + STATSD_KEY_PERIOD -> "1", + STATSD_KEY_UNIT -> "seconds", + STATSD_KEY_HOST -> "127.0.0.1" + ) + private val socketTimeout = 30000 // milliseconds + private val socketBufferSize = 8192 + + private def withSocketAndSink(testCode: (DatagramSocket, StatsdSink) => Any): Unit = { + val socket = new DatagramSocket + socket.setReceiveBufferSize(socketBufferSize) + socket.setSoTimeout(socketTimeout) + val props = new Properties + defaultProps.foreach(e => props.put(e._1, e._2)) + props.put(STATSD_KEY_PORT, socket.getLocalPort.toString) + val registry = new MetricRegistry + val sink = new StatsdSink(props, registry, securityMgr) + try { + testCode(socket, sink) + } finally { + socket.close() + } + } + + test("metrics StatsD sink with Counter") { + withSocketAndSink { (socket, sink) => + val counter = new Counter + counter.inc(12) + sink.registry.register("counter", counter) + sink.report() + + val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize) + socket.receive(p) + + val result = new String(p.getData, 0, p.getLength, UTF_8) + assert(result === "spark.counter:12|c", "Counter metric received should match data sent") + } + } + + test("metrics StatsD sink with Gauge") { + withSocketAndSink { (socket, sink) => + val gauge = new Gauge[Double] { + override def getValue: Double = 1.23 + } + sink.registry.register("gauge", gauge) + sink.report() + + val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize) + socket.receive(p) + + val result = new String(p.getData, 0, p.getLength, UTF_8) + assert(result === "spark.gauge:1.23|g", "Gauge metric received should match data sent") + } + } + + test("metrics StatsD sink with Histogram") { + withSocketAndSink { (socket, sink) => + val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize) + val histogram = new Histogram(new UniformReservoir) + histogram.update(10) + histogram.update(20) + histogram.update(30) + sink.registry.register("histogram", histogram) + sink.report() + + val expectedResults = Set( + "spark.histogram.count:3|g", + "spark.histogram.max:30|ms", + "spark.histogram.mean:20.00|ms", + "spark.histogram.min:10|ms", + "spark.histogram.stddev:10.00|ms", + "spark.histogram.p50:20.00|ms", + "spark.histogram.p75:30.00|ms", + "spark.histogram.p95:30.00|ms", + "spark.histogram.p98:30.00|ms", + "spark.histogram.p99:30.00|ms", + "spark.histogram.p999:30.00|ms" + ) + + (1 to expectedResults.size).foreach { i => + socket.receive(p) + val result = new String(p.getData, 0, p.getLength, UTF_8) + logInfo(s"Received histogram result $i: '$result'") + assert(expectedResults.contains(result), + "Histogram metric received should match data sent") + } + } + } + + test("metrics StatsD sink with Timer") { + withSocketAndSink { (socket, sink) => + val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize) + val timer = new Timer() + timer.update(1, SECONDS) + timer.update(2, SECONDS) + timer.update(3, SECONDS) + sink.registry.register("timer", timer) + sink.report() + + val expectedResults = Set( + "spark.timer.max:3000.00|ms", + "spark.timer.mean:2000.00|ms", + "spark.timer.min:1000.00|ms", + "spark.timer.stddev:816.50|ms", + "spark.timer.p50:2000.00|ms", + "spark.timer.p75:3000.00|ms", + "spark.timer.p95:3000.00|ms", + "spark.timer.p98:3000.00|ms", + "spark.timer.p99:3000.00|ms", + "spark.timer.p999:3000.00|ms", + "spark.timer.count:3|g", + "spark.timer.m1_rate:0.00|ms", + "spark.timer.m5_rate:0.00|ms", + "spark.timer.m15_rate:0.00|ms" + ) + // mean rate varies on each test run + val oneMoreResult = """spark.timer.mean_rate:\d+\.\d\d\|ms""" + + (1 to (expectedResults.size + 1)).foreach { i => + socket.receive(p) + val result = new String(p.getData, 0, p.getLength, UTF_8) + logInfo(s"Received timer result $i: '$result'") + assert(expectedResults.contains(result) || result.matches(oneMoreResult), + "Timer metric received should match data sent") + } + } + } +} + diff --git a/docs/monitoring.md b/docs/monitoring.md index d22cd945eaf6177c53c229c6e124316df0f0b547..51084a25983eac61c118f39f5c3b7bf47c6f8f24 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -455,6 +455,7 @@ Each instance can report to zero or more _sinks_. Sinks are contained in the * `MetricsServlet`: Adds a servlet within the existing Spark UI to serve metrics data as JSON data. * `GraphiteSink`: Sends metrics to a Graphite node. * `Slf4jSink`: Sends metrics to slf4j as log entries. +* `StatsdSink`: Sends metrics to a StatsD node. Spark also supports a Ganglia sink which is not included in the default build due to licensing restrictions: