Skip to content
Snippets Groups Projects
Commit 346bc17a authored by Aaron Davidson's avatar Aaron Davidson Committed by Patrick Wendell
Browse files

[SPARK-4516] Avoid allocating Netty PooledByteBufAllocators unnecessarily

Turns out we are allocating an allocator pool for every TransportClient (which means that the number increases with the number of nodes in the cluster), when really we should just reuse one for all clients.

This patch, as expected, greatly decreases off-heap memory allocation, and appears to make allocation only proportional to the number of cores.

Author: Aaron Davidson <aaron@databricks.com>

Closes #3465 from aarondav/fewer-pools and squashes the following commits:

36c49da [Aaron Davidson] [SPARK-4516] Avoid allocating unnecessarily Netty PooledByteBufAllocators
parent f5f2d273
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,6 @@ package org.apache.spark.network.client;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
......@@ -37,7 +36,6 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.internal.PlatformDependent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -67,6 +65,7 @@ public class TransportClientFactory implements Closeable {
private final Class<? extends Channel> socketChannelClass;
private EventLoopGroup workerGroup;
private PooledByteBufAllocator pooledAllocator;
public TransportClientFactory(
TransportContext context,
......@@ -80,6 +79,8 @@ public class TransportClientFactory implements Closeable {
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
// TODO: Make thread pool name configurable.
this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client");
this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
}
/**
......@@ -115,11 +116,8 @@ public class TransportClientFactory implements Closeable {
// Disable Nagle's Algorithm since we don't want packets to wait
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs());
// Use pooled buffers to reduce temporary buffer allocation
bootstrap.option(ChannelOption.ALLOCATOR, NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()));
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
.option(ChannelOption.ALLOCATOR, pooledAllocator);
final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();
......
......@@ -109,9 +109,9 @@ public class NettyUtils {
/**
* Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
* are disabled because the ByteBufs are allocated by the event loop thread, but released by the
* executor thread rather than the event loop thread. Those thread-local caches actually delay
* the recycling of buffers, leading to larger memory usage.
* are disabled for TransportClients because the ByteBufs are allocated by the event loop thread,
* but released by the executor thread rather than the event loop thread. Those thread-local
* caches actually delay the recycling of buffers, leading to larger memory usage.
*/
public static PooledByteBufAllocator createPooledByteBufAllocator(
boolean allowDirectBufs,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment