From 68df47aca55e99406b7b67ef3d4b1008abf1b8b6 Mon Sep 17 00:00:00 2001 From: Yangyang Liu <yangyangliu@fb.com> Date: Tue, 12 Jul 2016 10:13:58 -0700 Subject: [PATCH] [SPARK-16405] Add metrics and source for external shuffle service ## What changes were proposed in this pull request? Since externalShuffleService is essential for spark, better monitoring for shuffle service is necessary. In order to do so, we added various metrics in shuffle service and imported into ExternalShuffleServiceSource for metric system. Metrics added in shuffle service: * registeredExecutorsSize * openBlockRequestLatencyMillis * registerExecutorRequestLatencyMillis * blockTransferRateBytes JIRA Issue: https://issues.apache.org/jira/browse/SPARK-16405 ## How was this patch tested? Some test cases are added to verify metrics as expected in metric system. Those unit test cases are shown in `ExternalShuffleBlockHandlerSuite ` Author: Yangyang Liu <yangyangliu@fb.com> Closes #14080 from lovexi/yangyang-metrics. --- common/network-shuffle/pom.xml | 5 + .../shuffle/ExternalShuffleBlockHandler.java | 92 +++++++++++++++---- .../shuffle/ExternalShuffleBlockResolver.java | 4 + .../ExternalShuffleBlockHandlerSuite.java | 21 +++++ .../spark/deploy/ExternalShuffleService.scala | 8 ++ .../deploy/ExternalShuffleServiceSource.scala | 37 ++++++++ 6 files changed, 150 insertions(+), 17 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/ExternalShuffleServiceSource.scala diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 3c4f32933e..51c06b9e5a 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -54,6 +54,11 @@ <artifactId>jackson-databind</artifactId> </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 22fd592a32..1cc0fb65d7 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -20,8 +20,15 @@ package org.apache.spark.network.shuffle; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; +import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -52,6 +59,7 @@ public class ExternalShuffleBlockHandler extends RpcHandler { @VisibleForTesting final ExternalShuffleBlockResolver blockManager; private final OneForOneStreamManager streamManager; + private final ShuffleMetrics metrics; public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException { @@ -64,6 +72,7 @@ public class ExternalShuffleBlockHandler extends RpcHandler { public ExternalShuffleBlockHandler( OneForOneStreamManager streamManager, ExternalShuffleBlockResolver blockManager) { + this.metrics = new ShuffleMetrics(); this.streamManager = streamManager; this.blockManager = blockManager; } @@ -79,32 +88,50 @@ public class ExternalShuffleBlockHandler extends RpcHandler { TransportClient client, RpcResponseCallback callback) { if (msgObj instanceof OpenBlocks) { - OpenBlocks msg = (OpenBlocks) msgObj; - checkAuth(client, msg.appId); - - List<ManagedBuffer> blocks = Lists.newArrayList(); - for (String blockId : msg.blockIds) { - blocks.add(blockManager.getBlockData(msg.appId, msg.execId, blockId)); + final Timer.Context responseDelayContext = metrics.openBlockRequestLatencyMillis.time(); + try { + OpenBlocks msg = (OpenBlocks) msgObj; + checkAuth(client, msg.appId); + + List<ManagedBuffer> blocks = Lists.newArrayList(); + long totalBlockSize = 0; + for (String blockId : msg.blockIds) { + final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId, blockId); + totalBlockSize += block != null ? block.size() : 0; + blocks.add(block); + } + long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator()); + logger.trace("Registered streamId {} with {} buffers for client {} from host {}", + streamId, + msg.blockIds.length, + client.getClientId(), + NettyUtils.getRemoteAddress(client.getChannel())); + callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer()); + metrics.blockTransferRateBytes.mark(totalBlockSize); + } finally { + responseDelayContext.stop(); } - long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator()); - logger.trace("Registered streamId {} with {} buffers for client {} from host {}", - streamId, - msg.blockIds.length, - client.getClientId(), - NettyUtils.getRemoteAddress(client.getChannel())); - callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer()); } else if (msgObj instanceof RegisterExecutor) { - RegisterExecutor msg = (RegisterExecutor) msgObj; - checkAuth(client, msg.appId); - blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo); - callback.onSuccess(ByteBuffer.wrap(new byte[0])); + final Timer.Context responseDelayContext = metrics.registerExecutorRequestLatencyMillis.time(); + try { + RegisterExecutor msg = (RegisterExecutor) msgObj; + checkAuth(client, msg.appId); + blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo); + callback.onSuccess(ByteBuffer.wrap(new byte[0])); + } finally { + responseDelayContext.stop(); + } } else { throw new UnsupportedOperationException("Unexpected message: " + msgObj); } } + public MetricSet getAllMetrics() { + return metrics; + } + @Override public StreamManager getStreamManager() { return streamManager; @@ -143,4 +170,35 @@ public class ExternalShuffleBlockHandler extends RpcHandler { } } + /** + * A simple class to wrap all shuffle service wrapper metrics + */ + private class ShuffleMetrics implements MetricSet { + private final Map<String, Metric> allMetrics; + // Time latency for open block request in ms + private final Timer openBlockRequestLatencyMillis = new Timer(); + // Time latency for executor registration latency in ms + private final Timer registerExecutorRequestLatencyMillis = new Timer(); + // Block transfer rate in byte per second + private final Meter blockTransferRateBytes = new Meter(); + + private ShuffleMetrics() { + allMetrics = new HashMap<>(); + allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis); + allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis); + allMetrics.put("blockTransferRateBytes", blockTransferRateBytes); + allMetrics.put("registeredExecutorsSize", new Gauge<Integer>() { + @Override + public Integer getValue() { + return blockManager.getRegisteredExecutorsSize(); + } + }); + } + + @Override + public Map<String, Metric> getMetrics() { + return allMetrics; + } + } + } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 54e870a9b5..7eefccaaed 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -146,6 +146,10 @@ public class ExternalShuffleBlockResolver { this.directoryCleaner = directoryCleaner; } + public int getRegisteredExecutorsSize() { + return executors.size(); + } + /** Registers a new Executor with all the configuration we need to find its shuffle files. */ public void registerExecutor( String appId, diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java index c2e0b7447f..c036bc2e8d 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java @@ -20,6 +20,8 @@ package org.apache.spark.network.shuffle; import java.nio.ByteBuffer; import java.util.Iterator; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -66,6 +68,12 @@ public class ExternalShuffleBlockHandlerSuite { verify(callback, times(1)).onSuccess(any(ByteBuffer.class)); verify(callback, never()).onFailure(any(Throwable.class)); + // Verify register executor request latency metrics + Timer registerExecutorRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler) + .getAllMetrics() + .getMetrics() + .get("registerExecutorRequestLatencyMillis"); + assertEquals(1, registerExecutorRequestLatencyMillis.getCount()); } @SuppressWarnings("unchecked") @@ -99,6 +107,19 @@ public class ExternalShuffleBlockHandlerSuite { assertEquals(block0Marker, buffers.next()); assertEquals(block1Marker, buffers.next()); assertFalse(buffers.hasNext()); + + // Verify open block request latency metrics + Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler) + .getAllMetrics() + .getMetrics() + .get("openBlockRequestLatencyMillis"); + assertEquals(1, openBlockRequestLatencyMillis.getCount()); + // Verify block transfer metrics + Meter blockTransferRateBytes = (Meter) ((ExternalShuffleBlockHandler) handler) + .getAllMetrics() + .getMetrics() + .get("blockTransferRateBytes"); + assertEquals(10, blockTransferRateBytes.getCount()); } @Test diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index adc0de1e91..37a19a495b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging +import org.apache.spark.metrics.MetricsSystem import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.sasl.SaslServerBootstrap @@ -41,6 +42,8 @@ import org.apache.spark.util.{ShutdownHookManager, Utils} private[deploy] class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager) extends Logging { + protected val masterMetricsSystem = + MetricsSystem.createMetricsSystem("shuffleService", sparkConf, securityManager) private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false) private val port = sparkConf.getInt("spark.shuffle.service.port", 7337) @@ -54,6 +57,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana private var server: TransportServer = _ + private val shuffleServiceSource = new ExternalShuffleServiceSource(blockHandler) + /** Create a new shuffle block handler. Factored out for subclasses to override. */ protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = { new ExternalShuffleBlockHandler(conf, null) @@ -77,6 +82,9 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana Nil } server = transportContext.createServer(port, bootstraps.asJava) + + masterMetricsSystem.registerSource(shuffleServiceSource) + masterMetricsSystem.start() } /** Clean up all shuffle files associated with an application that has exited. */ diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleServiceSource.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleServiceSource.scala new file mode 100644 index 0000000000..e917679c83 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleServiceSource.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import javax.annotation.concurrent.ThreadSafe + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.source.Source +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler + +/** + * Provides metrics source for external shuffle service + */ +@ThreadSafe +private class ExternalShuffleServiceSource +(blockHandler: ExternalShuffleBlockHandler) extends Source { + override val metricRegistry = new MetricRegistry() + override val sourceName = "shuffleService" + + metricRegistry.registerAll(blockHandler.getAllMetrics) +} -- GitLab