Skip to content
Snippets Groups Projects
Commit 68df47ac authored by Yangyang Liu's avatar Yangyang Liu Committed by Reynold Xin
Browse files

[SPARK-16405] Add metrics and source for external shuffle service

## What changes were proposed in this pull request?

Since externalShuffleService is essential for spark, better monitoring for shuffle service is necessary. In order to do so, we added various metrics in shuffle service and imported into ExternalShuffleServiceSource for metric system.
Metrics added in shuffle service:
* registeredExecutorsSize
* openBlockRequestLatencyMillis
* registerExecutorRequestLatencyMillis
* blockTransferRateBytes

JIRA Issue: https://issues.apache.org/jira/browse/SPARK-16405

## How was this patch tested?

Some test cases are added to verify metrics as expected in metric system. Those unit test cases are shown in `ExternalShuffleBlockHandlerSuite `

Author: Yangyang Liu <yangyangliu@fb.com>

Closes #14080 from lovexi/yangyang-metrics.
parent d513c99c
No related branches found
No related tags found
No related merge requests found
......@@ -54,6 +54,11 @@
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
......
......@@ -20,8 +20,15 @@ package org.apache.spark.network.shuffle;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
......@@ -52,6 +59,7 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
@VisibleForTesting
final ExternalShuffleBlockResolver blockManager;
private final OneForOneStreamManager streamManager;
private final ShuffleMetrics metrics;
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile)
throws IOException {
......@@ -64,6 +72,7 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
public ExternalShuffleBlockHandler(
OneForOneStreamManager streamManager,
ExternalShuffleBlockResolver blockManager) {
this.metrics = new ShuffleMetrics();
this.streamManager = streamManager;
this.blockManager = blockManager;
}
......@@ -79,32 +88,50 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
TransportClient client,
RpcResponseCallback callback) {
if (msgObj instanceof OpenBlocks) {
OpenBlocks msg = (OpenBlocks) msgObj;
checkAuth(client, msg.appId);
List<ManagedBuffer> blocks = Lists.newArrayList();
for (String blockId : msg.blockIds) {
blocks.add(blockManager.getBlockData(msg.appId, msg.execId, blockId));
final Timer.Context responseDelayContext = metrics.openBlockRequestLatencyMillis.time();
try {
OpenBlocks msg = (OpenBlocks) msgObj;
checkAuth(client, msg.appId);
List<ManagedBuffer> blocks = Lists.newArrayList();
long totalBlockSize = 0;
for (String blockId : msg.blockIds) {
final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId, blockId);
totalBlockSize += block != null ? block.size() : 0;
blocks.add(block);
}
long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator());
logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
streamId,
msg.blockIds.length,
client.getClientId(),
NettyUtils.getRemoteAddress(client.getChannel()));
callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer());
metrics.blockTransferRateBytes.mark(totalBlockSize);
} finally {
responseDelayContext.stop();
}
long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator());
logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
streamId,
msg.blockIds.length,
client.getClientId(),
NettyUtils.getRemoteAddress(client.getChannel()));
callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer());
} else if (msgObj instanceof RegisterExecutor) {
RegisterExecutor msg = (RegisterExecutor) msgObj;
checkAuth(client, msg.appId);
blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);
callback.onSuccess(ByteBuffer.wrap(new byte[0]));
final Timer.Context responseDelayContext = metrics.registerExecutorRequestLatencyMillis.time();
try {
RegisterExecutor msg = (RegisterExecutor) msgObj;
checkAuth(client, msg.appId);
blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);
callback.onSuccess(ByteBuffer.wrap(new byte[0]));
} finally {
responseDelayContext.stop();
}
} else {
throw new UnsupportedOperationException("Unexpected message: " + msgObj);
}
}
public MetricSet getAllMetrics() {
return metrics;
}
@Override
public StreamManager getStreamManager() {
return streamManager;
......@@ -143,4 +170,35 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
}
}
/**
* A simple class to wrap all shuffle service wrapper metrics
*/
private class ShuffleMetrics implements MetricSet {
private final Map<String, Metric> allMetrics;
// Time latency for open block request in ms
private final Timer openBlockRequestLatencyMillis = new Timer();
// Time latency for executor registration latency in ms
private final Timer registerExecutorRequestLatencyMillis = new Timer();
// Block transfer rate in byte per second
private final Meter blockTransferRateBytes = new Meter();
private ShuffleMetrics() {
allMetrics = new HashMap<>();
allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis);
allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis);
allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
allMetrics.put("registeredExecutorsSize", new Gauge<Integer>() {
@Override
public Integer getValue() {
return blockManager.getRegisteredExecutorsSize();
}
});
}
@Override
public Map<String, Metric> getMetrics() {
return allMetrics;
}
}
}
......@@ -146,6 +146,10 @@ public class ExternalShuffleBlockResolver {
this.directoryCleaner = directoryCleaner;
}
public int getRegisteredExecutorsSize() {
return executors.size();
}
/** Registers a new Executor with all the configuration we need to find its shuffle files. */
public void registerExecutor(
String appId,
......
......@@ -20,6 +20,8 @@ package org.apache.spark.network.shuffle;
import java.nio.ByteBuffer;
import java.util.Iterator;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
......@@ -66,6 +68,12 @@ public class ExternalShuffleBlockHandlerSuite {
verify(callback, times(1)).onSuccess(any(ByteBuffer.class));
verify(callback, never()).onFailure(any(Throwable.class));
// Verify register executor request latency metrics
Timer registerExecutorRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)
.getAllMetrics()
.getMetrics()
.get("registerExecutorRequestLatencyMillis");
assertEquals(1, registerExecutorRequestLatencyMillis.getCount());
}
@SuppressWarnings("unchecked")
......@@ -99,6 +107,19 @@ public class ExternalShuffleBlockHandlerSuite {
assertEquals(block0Marker, buffers.next());
assertEquals(block1Marker, buffers.next());
assertFalse(buffers.hasNext());
// Verify open block request latency metrics
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)
.getAllMetrics()
.getMetrics()
.get("openBlockRequestLatencyMillis");
assertEquals(1, openBlockRequestLatencyMillis.getCount());
// Verify block transfer metrics
Meter blockTransferRateBytes = (Meter) ((ExternalShuffleBlockHandler) handler)
.getAllMetrics()
.getMetrics()
.get("blockTransferRateBytes");
assertEquals(10, blockTransferRateBytes.getCount());
}
@Test
......
......@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.TransportContext
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.sasl.SaslServerBootstrap
......@@ -41,6 +42,8 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
private[deploy]
class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
extends Logging {
protected val masterMetricsSystem =
MetricsSystem.createMetricsSystem("shuffleService", sparkConf, securityManager)
private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
......@@ -54,6 +57,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private var server: TransportServer = _
private val shuffleServiceSource = new ExternalShuffleServiceSource(blockHandler)
/** Create a new shuffle block handler. Factored out for subclasses to override. */
protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = {
new ExternalShuffleBlockHandler(conf, null)
......@@ -77,6 +82,9 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
Nil
}
server = transportContext.createServer(port, bootstraps.asJava)
masterMetricsSystem.registerSource(shuffleServiceSource)
masterMetricsSystem.start()
}
/** Clean up all shuffle files associated with an application that has exited. */
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy
import javax.annotation.concurrent.ThreadSafe
import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
/**
* Provides metrics source for external shuffle service
*/
@ThreadSafe
private class ExternalShuffleServiceSource
(blockHandler: ExternalShuffleBlockHandler) extends Source {
override val metricRegistry = new MetricRegistry()
override val sourceName = "shuffleService"
metricRegistry.registerAll(blockHandler.getAllMetrics)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment