diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 3c4f32933e3c145945c50f40fdaff655a707d247..51c06b9e5a0763ee64b6dad6febb1c87376df5e8 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -54,6 +54,11 @@ <artifactId>jackson-databind</artifactId> </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 22fd592a321d21ca0d75c1ee5697f54da24d583f..1cc0fb65d726f101d777f2d3905285c79b642a44 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -20,8 +20,15 @@ package org.apache.spark.network.shuffle; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; +import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -52,6 +59,7 @@ public class ExternalShuffleBlockHandler extends RpcHandler { @VisibleForTesting final ExternalShuffleBlockResolver blockManager; private final OneForOneStreamManager streamManager; + private final ShuffleMetrics metrics; public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException { @@ -64,6 +72,7 @@ public class ExternalShuffleBlockHandler extends RpcHandler { public ExternalShuffleBlockHandler( OneForOneStreamManager streamManager, ExternalShuffleBlockResolver blockManager) { + this.metrics = new ShuffleMetrics(); this.streamManager = streamManager; this.blockManager = blockManager; } @@ -79,32 +88,50 @@ public class ExternalShuffleBlockHandler extends RpcHandler { TransportClient client, RpcResponseCallback callback) { if (msgObj instanceof OpenBlocks) { - OpenBlocks msg = (OpenBlocks) msgObj; - checkAuth(client, msg.appId); - - List<ManagedBuffer> blocks = Lists.newArrayList(); - for (String blockId : msg.blockIds) { - blocks.add(blockManager.getBlockData(msg.appId, msg.execId, blockId)); + final Timer.Context responseDelayContext = metrics.openBlockRequestLatencyMillis.time(); + try { + OpenBlocks msg = (OpenBlocks) msgObj; + checkAuth(client, msg.appId); + + List<ManagedBuffer> blocks = Lists.newArrayList(); + long totalBlockSize = 0; + for (String blockId : msg.blockIds) { + final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId, blockId); + totalBlockSize += block != null ? block.size() : 0; + blocks.add(block); + } + long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator()); + logger.trace("Registered streamId {} with {} buffers for client {} from host {}", + streamId, + msg.blockIds.length, + client.getClientId(), + NettyUtils.getRemoteAddress(client.getChannel())); + callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer()); + metrics.blockTransferRateBytes.mark(totalBlockSize); + } finally { + responseDelayContext.stop(); } - long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator()); - logger.trace("Registered streamId {} with {} buffers for client {} from host {}", - streamId, - msg.blockIds.length, - client.getClientId(), - NettyUtils.getRemoteAddress(client.getChannel())); - callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer()); } else if (msgObj instanceof RegisterExecutor) { - RegisterExecutor msg = (RegisterExecutor) msgObj; - checkAuth(client, msg.appId); - blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo); - callback.onSuccess(ByteBuffer.wrap(new byte[0])); + final Timer.Context responseDelayContext = metrics.registerExecutorRequestLatencyMillis.time(); + try { + RegisterExecutor msg = (RegisterExecutor) msgObj; + checkAuth(client, msg.appId); + blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo); + callback.onSuccess(ByteBuffer.wrap(new byte[0])); + } finally { + responseDelayContext.stop(); + } } else { throw new UnsupportedOperationException("Unexpected message: " + msgObj); } } + public MetricSet getAllMetrics() { + return metrics; + } + @Override public StreamManager getStreamManager() { return streamManager; @@ -143,4 +170,35 @@ public class ExternalShuffleBlockHandler extends RpcHandler { } } + /** + * A simple class to wrap all shuffle service wrapper metrics + */ + private class ShuffleMetrics implements MetricSet { + private final Map<String, Metric> allMetrics; + // Time latency for open block request in ms + private final Timer openBlockRequestLatencyMillis = new Timer(); + // Time latency for executor registration latency in ms + private final Timer registerExecutorRequestLatencyMillis = new Timer(); + // Block transfer rate in byte per second + private final Meter blockTransferRateBytes = new Meter(); + + private ShuffleMetrics() { + allMetrics = new HashMap<>(); + allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis); + allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis); + allMetrics.put("blockTransferRateBytes", blockTransferRateBytes); + allMetrics.put("registeredExecutorsSize", new Gauge<Integer>() { + @Override + public Integer getValue() { + return blockManager.getRegisteredExecutorsSize(); + } + }); + } + + @Override + public Map<String, Metric> getMetrics() { + return allMetrics; + } + } + } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 54e870a9b56a650543f8e620d9ff2e6ef4fe396a..7eefccaaedb6c1bb502f3a9d889e36c88c6a44d2 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -146,6 +146,10 @@ public class ExternalShuffleBlockResolver { this.directoryCleaner = directoryCleaner; } + public int getRegisteredExecutorsSize() { + return executors.size(); + } + /** Registers a new Executor with all the configuration we need to find its shuffle files. */ public void registerExecutor( String appId, diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java index c2e0b7447fb8b692154431395edc94b252bcda97..c036bc2e8d25630d4eef5e1c1f82e1d5537ed9a5 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java @@ -20,6 +20,8 @@ package org.apache.spark.network.shuffle; import java.nio.ByteBuffer; import java.util.Iterator; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -66,6 +68,12 @@ public class ExternalShuffleBlockHandlerSuite { verify(callback, times(1)).onSuccess(any(ByteBuffer.class)); verify(callback, never()).onFailure(any(Throwable.class)); + // Verify register executor request latency metrics + Timer registerExecutorRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler) + .getAllMetrics() + .getMetrics() + .get("registerExecutorRequestLatencyMillis"); + assertEquals(1, registerExecutorRequestLatencyMillis.getCount()); } @SuppressWarnings("unchecked") @@ -99,6 +107,19 @@ public class ExternalShuffleBlockHandlerSuite { assertEquals(block0Marker, buffers.next()); assertEquals(block1Marker, buffers.next()); assertFalse(buffers.hasNext()); + + // Verify open block request latency metrics + Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler) + .getAllMetrics() + .getMetrics() + .get("openBlockRequestLatencyMillis"); + assertEquals(1, openBlockRequestLatencyMillis.getCount()); + // Verify block transfer metrics + Meter blockTransferRateBytes = (Meter) ((ExternalShuffleBlockHandler) handler) + .getAllMetrics() + .getMetrics() + .get("blockTransferRateBytes"); + assertEquals(10, blockTransferRateBytes.getCount()); } @Test diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index adc0de1e9127c88811fe2a5dad25a066463d1ef7..37a19a495bee61462859f6912a77c099557a8acf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging +import org.apache.spark.metrics.MetricsSystem import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.sasl.SaslServerBootstrap @@ -41,6 +42,8 @@ import org.apache.spark.util.{ShutdownHookManager, Utils} private[deploy] class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager) extends Logging { + protected val masterMetricsSystem = + MetricsSystem.createMetricsSystem("shuffleService", sparkConf, securityManager) private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false) private val port = sparkConf.getInt("spark.shuffle.service.port", 7337) @@ -54,6 +57,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana private var server: TransportServer = _ + private val shuffleServiceSource = new ExternalShuffleServiceSource(blockHandler) + /** Create a new shuffle block handler. Factored out for subclasses to override. */ protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = { new ExternalShuffleBlockHandler(conf, null) @@ -77,6 +82,9 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana Nil } server = transportContext.createServer(port, bootstraps.asJava) + + masterMetricsSystem.registerSource(shuffleServiceSource) + masterMetricsSystem.start() } /** Clean up all shuffle files associated with an application that has exited. */ diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleServiceSource.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleServiceSource.scala new file mode 100644 index 0000000000000000000000000000000000000000..e917679c838777ca39719ec1b24e9deec340cf3f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleServiceSource.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import javax.annotation.concurrent.ThreadSafe + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.source.Source +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler + +/** + * Provides metrics source for external shuffle service + */ +@ThreadSafe +private class ExternalShuffleServiceSource +(blockHandler: ExternalShuffleBlockHandler) extends Source { + override val metricRegistry = new MetricRegistry() + override val sourceName = "shuffleService" + + metricRegistry.registerAll(blockHandler.getAllMetrics) +}