Skip to content

Commit

Permalink
[#1767] feat(netty): The client-side supports choosing Netty's ByteBu…
Browse files Browse the repository at this point in the history
…fAllocator (#1768)

### What changes were proposed in this pull request?

The client-side supports choosing Netty's ByteBufAllocator.

### Why are the changes needed?

For #1767.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.
  • Loading branch information
rickyma authored Jun 11, 2024
1 parent 648931c commit cc3f52b
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,20 @@ public class RssClientConf {
.defaultValue(0)
.withDescription("Number of threads used in the client thread pool.");

public static final ConfigOption<Boolean> NETWORK_CLIENT_PREFER_DIRECT_BUFS =
public static final ConfigOption<Boolean> NETTY_CLIENT_PREFER_DIRECT_BUFS =
ConfigOptions.key("rss.client.netty.client.prefer.direct.bufs")
.booleanType()
.defaultValue(true)
.withDescription(
"If true, we will prefer allocating off-heap byte buffers within Netty.");

public static final ConfigOption<Boolean> NETTY_CLIENT_POOLED_ALLOCATOR_ENABLED =
ConfigOptions.key("rss.client.netty.client.pooled.allocator.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"If true, we will use PooledByteBufAllocator to allocate byte buffers within Netty, otherwise we'll use UnpooledByteBufAllocator.");

public static final ConfigOption<Integer> NETTY_CLIENT_NUM_CONNECTIONS_PER_PEER =
ConfigOptions.key("rss.client.netty.client.connections.per.peer")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.concurrent.atomic.AtomicReference;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInboundHandlerAdapter;
Expand Down Expand Up @@ -73,7 +73,7 @@ private static class ClientPool {

private final Class<? extends Channel> socketChannelClass;
private EventLoopGroup workerGroup;
private PooledByteBufAllocator pooledAllocator;
private AbstractByteBufAllocator byteBufAllocator;

public TransportClientFactory(TransportContext context) {
this.context = Objects.requireNonNull(context);
Expand All @@ -85,9 +85,11 @@ public TransportClientFactory(TransportContext context) {
IOMode ioMode = conf.ioMode();
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "netty-rpc-client");
this.pooledAllocator =
NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), false, conf.clientThreads());
this.byteBufAllocator =
conf.isPooledAllocatorEnabled()
? NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), false, conf.clientThreads())
: NettyUtils.createUnpooledByteBufAllocator(conf.preferDirectBufs());
}

public TransportClient createClient(String remoteHost, int remotePort, int partitionId)
Expand Down Expand Up @@ -179,7 +181,7 @@ private TransportClient internalCreateClient(
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectTimeoutMs())
.option(ChannelOption.ALLOCATOR, pooledAllocator);
.option(ChannelOption.ALLOCATOR, byteBufAllocator);

if (conf.receiveBuf() > 0) {
bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ public int numConnectionsPerPeer() {
}

public boolean preferDirectBufs() {
return rssConf.get(RssClientConf.NETWORK_CLIENT_PREFER_DIRECT_BUFS);
return rssConf.get(RssClientConf.NETTY_CLIENT_PREFER_DIRECT_BUFS);
}

public boolean isPooledAllocatorEnabled() {
return rssConf.get(RssClientConf.NETTY_CLIENT_POOLED_ALLOCATOR_ENABLED);
}

public int receiveBuf() {
Expand Down
21 changes: 11 additions & 10 deletions docs/client_guide/client_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,14 @@ spark.rss.data.replica.read 2
```

### Netty Setting
| Property Name | Default | Description |
|-------------------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| <client_type>.rss.client.type | GRPC | Supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using GRPC_NETTY to enable Netty on the client side for better stability and performance. |
| <client_type>.rss.client.netty.io.mode | NIO | Netty EventLoopGroup backend, available options: NIO, EPOLL. |
| <client_type>.rss.client.netty.client.connection.timeout.ms | 600000 | Connection active timeout. |
| <client_type>.rss.client.netty.client.threads | 0 | Number of threads used in the client thread pool. Default is 0, Netty will use the number of (available logical cores * 2) as the number of threads. |
| <client_type>.rss.client.netty.client.prefer.direct.bufs | true | If true, we will prefer allocating off-heap byte buffers within Netty. |
| <client_type>.rss.client.netty.client.connections.per.peer | 2 | Suppose there are 100 executors, spark.rss.client.netty.client.connections.per.peer = 2, then each ShuffleServer will establish a total of (100 * 2) connections with multiple clients. |
| <client_type>.rss.client.netty.client.receive.buffer | 0 | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system automatically estimates the receive buffer size based on default settings. |
| <client_type>.rss.client.netty.client.send.buffer | 0 | Send buffer size (SO_SNDBUF). |
| Property Name | Default | Description |
|----------------------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| <client_type>.rss.client.type | GRPC | Supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using GRPC_NETTY to enable Netty on the client side for better stability and performance. |
| <client_type>.rss.client.netty.io.mode | NIO | Netty EventLoopGroup backend, available options: NIO, EPOLL. |
| <client_type>.rss.client.netty.client.connection.timeout.ms | 600000 | Connection active timeout. |
| <client_type>.rss.client.netty.client.threads | 0 | Number of threads used in the client thread pool. Default is 0, Netty will use the number of (available logical cores * 2) as the number of threads. |
| <client_type>.rss.client.netty.client.prefer.direct.bufs | true | If true, we will prefer allocating off-heap byte buffers within Netty. |
| <client_type>.rss.client.netty.client.pooled.allocator.enabled | true | If true, we will use PooledByteBufAllocator to allocate byte buffers within Netty, otherwise we'll use UnpooledByteBufAllocator. |
| <client_type>.rss.client.netty.client.connections.per.peer | 2 | Suppose there are 100 executors, spark.rss.client.netty.client.connections.per.peer = 2, then each ShuffleServer will establish a total of (100 * 2) connections with multiple clients. |
| <client_type>.rss.client.netty.client.receive.buffer | 0 | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system automatically estimates the receive buffer size based on default settings. |
| <client_type>.rss.client.netty.client.send.buffer | 0 | Send buffer size (SO_SNDBUF). |

0 comments on commit cc3f52b

Please sign in to comment.