From 80bd715a3e2c39449ed5e4d4e7058d75281ef3cb Mon Sep 17 00:00:00 2001
From: Ryan Williams <ryan.blake.williams@gmail.com>
Date: Sat, 31 Jan 2015 23:41:05 -0800
Subject: [PATCH] [SPARK-5422] Add support for sending Graphite metrics via UDP

Depends on [SPARK-5413](https://issues.apache.org/jira/browse/SPARK-5413) / #4209, included here, will rebase once the latter's merged.

Author: Ryan Williams <ryan.blake.williams@gmail.com>

Closes #4218 from ryan-williams/udp and squashes the following commits:

ebae393 [Ryan Williams] Add support for sending Graphite metrics via UDP
cb58262 [Ryan Williams] bump metrics dependency to v3.1.0
---
 conf/metrics.properties.template                     |  1 +
 core/pom.xml                                         |  8 ++++----
 .../org/apache/spark/metrics/sink/GraphiteSink.scala |  9 +++++++--
 pom.xml                                              | 12 ++++++------
 4 files changed, 18 insertions(+), 12 deletions(-)

diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index 96b6844f0a..464c14457e 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -87,6 +87,7 @@
 #   period    10            Poll period
 #   unit      seconds       Units of poll period
 #   prefix    EMPTY STRING  Prefix to prepend to metric name
+#   protocol  tcp           Protocol ("tcp" or "udp") to use
 
 ## Examples
 # Enable JmxSink for all instances by class name
diff --git a/core/pom.xml b/core/pom.xml
index 31e919a1c8..6fce10a0ae 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -198,19 +198,19 @@
       <artifactId>stream</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.codahale.metrics</groupId>
+      <groupId>io.dropwizard.metrics</groupId>
       <artifactId>metrics-core</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.codahale.metrics</groupId>
+      <groupId>io.dropwizard.metrics</groupId>
       <artifactId>metrics-jvm</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.codahale.metrics</groupId>
+      <groupId>io.dropwizard.metrics</groupId>
       <artifactId>metrics-json</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.codahale.metrics</groupId>
+      <groupId>io.dropwizard.metrics</groupId>
       <artifactId>metrics-graphite</artifactId>
     </dependency>
     <dependency>
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
index d7b5f5c40e..2d25ebd661 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 import java.util.concurrent.TimeUnit
 
 import com.codahale.metrics.MetricRegistry
-import com.codahale.metrics.graphite.{Graphite, GraphiteReporter}
+import com.codahale.metrics.graphite.{GraphiteUDP, Graphite, GraphiteReporter}
 
 import org.apache.spark.SecurityManager
 import org.apache.spark.metrics.MetricsSystem
@@ -38,6 +38,7 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric
   val GRAPHITE_KEY_PERIOD = "period"
   val GRAPHITE_KEY_UNIT = "unit"
   val GRAPHITE_KEY_PREFIX = "prefix"
+  val GRAPHITE_KEY_PROTOCOL = "protocol"
 
   def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))
 
@@ -66,7 +67,11 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric
 
   MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
 
-  val graphite: Graphite = new Graphite(new InetSocketAddress(host, port))
+  val graphite = propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase) match {
+    case Some("udp") => new GraphiteUDP(new InetSocketAddress(host, port))
+    case Some("tcp") | None => new Graphite(new InetSocketAddress(host, port))
+    case Some(p) => throw new Exception(s"Invalid Graphite protocol: $p")
+  }
 
   val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry)
       .convertDurationsTo(TimeUnit.MILLISECONDS)
diff --git a/pom.xml b/pom.xml
index 4adfdf3eb8..b855f2371b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,7 +136,7 @@
     <jblas.version>1.2.3</jblas.version>
     <jetty.version>8.1.14.v20131031</jetty.version>
     <chill.version>0.5.0</chill.version>
-    <codahale.metrics.version>3.0.0</codahale.metrics.version>
+    <codahale.metrics.version>3.1.0</codahale.metrics.version>
     <avro.version>1.7.6</avro.version>
     <avro.mapred.classifier></avro.mapred.classifier>
     <jets3t.version>0.7.1</jets3t.version>
@@ -521,27 +521,27 @@
         <version>${derby.version}</version>
       </dependency>
       <dependency>
-        <groupId>com.codahale.metrics</groupId>
+        <groupId>io.dropwizard.metrics</groupId>
         <artifactId>metrics-core</artifactId>
         <version>${codahale.metrics.version}</version>
       </dependency>
       <dependency>
-        <groupId>com.codahale.metrics</groupId>
+        <groupId>io.dropwizard.metrics</groupId>
         <artifactId>metrics-jvm</artifactId>
         <version>${codahale.metrics.version}</version>
       </dependency>
       <dependency>
-        <groupId>com.codahale.metrics</groupId>
+        <groupId>io.dropwizard.metrics</groupId>
         <artifactId>metrics-json</artifactId>
         <version>${codahale.metrics.version}</version>
       </dependency>
       <dependency>
-        <groupId>com.codahale.metrics</groupId>
+        <groupId>io.dropwizard.metrics</groupId>
         <artifactId>metrics-ganglia</artifactId>
         <version>${codahale.metrics.version}</version>
       </dependency>
       <dependency>
-        <groupId>com.codahale.metrics</groupId>
+        <groupId>io.dropwizard.metrics</groupId>
         <artifactId>metrics-graphite</artifactId>
         <version>${codahale.metrics.version}</version>
       </dependency>
-- 
GitLab