From d328ded37077cd50fa93b3635e24903055e221df Mon Sep 17 00:00:00 2001
From: Yongzao <532741407@qq.com>
Date: Wed, 20 May 2026 10:58:39 +0800
Subject: [PATCH 1/8] RATIS-2529. Bound gRPC worker EventLoopGroup thread
count.
Add raft.grpc.server.worker.event-loop.threads and
raft.grpc.client.worker.event-loop.threads (default 0 = gRPC default,
i.e. availableProcessors * 2). When > 0, build a dedicated
Epoll/Nio EventLoopGroup of that size and wire it into both the
server NettyServerBuilders and the client NettyChannelBuilders so a
follower catch-up burst can't permanently inflate the worker thread
count.
---
.../org/apache/ratis/grpc/GrpcConfigKeys.java | 33 +++++
.../org/apache/ratis/grpc/GrpcEventLoops.java | 122 ++++++++++++++++++
.../grpc/client/GrpcClientProtocolClient.java | 11 ++
.../grpc/server/GrpcServerProtocolClient.java | 11 +-
.../ratis/grpc/server/GrpcServicesImpl.java | 66 +++++++---
.../ratis/grpc/server/GrpcStubPool.java | 15 ++-
.../apache/ratis/grpc/TestGrpcEventLoops.java | 88 +++++++++++++
.../grpc/TestGrpcWorkerEventLoopThreads.java | 57 ++++++++
8 files changed, 381 insertions(+), 22 deletions(-)
create mode 100644 ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java
create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java
create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index f31794ac36..c18cdb60be 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -155,6 +155,22 @@ static GrpcTlsConfig tlsConf(Parameters parameters) {
static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) {
parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS);
}
+
+ /**
+ * The number of worker threads for the gRPC client-side {@link
+ * org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup}.
+ * 0 (default) means use the gRPC default (i.e. {@code availableProcessors * 2});
+ * a positive value caps the worker event-loop thread count.
+ */
+ String WORKER_EVENT_LOOP_THREADS_KEY = PREFIX + ".worker.event-loop.threads";
+ int WORKER_EVENT_LOOP_THREADS_DEFAULT = 0;
+ static int workerEventLoopThreads(RaftProperties properties) {
+ return getInt(properties::getInt, WORKER_EVENT_LOOP_THREADS_KEY,
+ WORKER_EVENT_LOOP_THREADS_DEFAULT, getDefaultLog(), requireMin(0));
+ }
+ static void setWorkerEventLoopThreads(RaftProperties properties, int threads) {
+ setInt(properties::setInt, WORKER_EVENT_LOOP_THREADS_KEY, threads);
+ }
}
interface Server {
@@ -291,6 +307,23 @@ static int stubPoolSize(RaftProperties properties) {
static void setStubPoolSize(RaftProperties properties, int size) {
setInt(properties::setInt, STUB_POOL_SIZE_KEY, size);
}
+
+ /**
+ * The number of worker threads for the gRPC server-side {@link
+ * org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup}.
+ * The group is also reused by the server-to-server clients managed by this server.
+ * 0 (default) means use the gRPC default (i.e. {@code availableProcessors * 2});
+ * a positive value caps the worker event-loop thread count.
+ */
+ String WORKER_EVENT_LOOP_THREADS_KEY = PREFIX + ".worker.event-loop.threads";
+ int WORKER_EVENT_LOOP_THREADS_DEFAULT = 0;
+ static int workerEventLoopThreads(RaftProperties properties) {
+ return getInt(properties::getInt, WORKER_EVENT_LOOP_THREADS_KEY,
+ WORKER_EVENT_LOOP_THREADS_DEFAULT, getDefaultLog(), requireMin(0));
+ }
+ static void setWorkerEventLoopThreads(RaftProperties properties, int threads) {
+ setInt(properties::setInt, WORKER_EVENT_LOOP_THREADS_KEY, threads);
+ }
}
String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java
new file mode 100644
index 0000000000..9deb4b79f7
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
+import org.apache.ratis.thirdparty.io.netty.channel.Channel;
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.ServerChannel;
+import org.apache.ratis.thirdparty.io.netty.channel.epoll.Epoll;
+import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollServerSocketChannel;
+import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollSocketChannel;
+import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.ratis.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Helper for creating bounded Netty {@link EventLoopGroup} instances for gRPC,
+ * along with matching {@link Channel} / {@link ServerChannel} types.
+ *
+ *
By default, gRPC uses a shared {@link EventLoopGroup} sized to
+ * {@code Runtime.getRuntime().availableProcessors() * 2}, and Netty threads
+ * in that group never exit once started. A traffic burst (e.g. a follower
+ * catch-up after restart) can permanently inflate the active thread count.
+ * This helper lets callers construct a smaller, dedicated group and wire it
+ * into {@link NettyServerBuilder} / {@link NettyChannelBuilder} consistently
+ * with the matching channel type.
+ */
+public final class GrpcEventLoops {
+ private static final Logger LOG = LoggerFactory.getLogger(GrpcEventLoops.class);
+
+ private GrpcEventLoops() {}
+
+ public static boolean isEpollAvailable() {
+ return Epoll.isAvailable();
+ }
+
+ /** Create a new {@link EventLoopGroup} with the given number of threads. */
+ public static EventLoopGroup newEventLoopGroup(int threads, String name) {
+ if (threads <= 0) {
+ throw new IllegalArgumentException("threads must be > 0, but got " + threads);
+ }
+ final ThreadFactory threadFactory = new DefaultThreadFactory(name);
+ if (isEpollAvailable()) {
+ LOG.info("Creating EpollEventLoopGroup({}) for {}", threads, name);
+ return new EpollEventLoopGroup(threads, threadFactory);
+ }
+ LOG.info("Creating NioEventLoopGroup({}) for {} (Epoll not available)", threads, name);
+ return new NioEventLoopGroup(threads, threadFactory);
+ }
+
+ public static Class extends ServerChannel> getServerChannelType() {
+ return isEpollAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
+ }
+
+ public static Class extends Channel> getClientChannelType() {
+ return isEpollAvailable() ? EpollSocketChannel.class : NioSocketChannel.class;
+ }
+
+ /**
+ * Configure the {@link NettyServerBuilder} to use the given boss and worker groups.
+ *
+ *
gRPC requires that the boss group, worker group, and channel type are
+ * all provided together or all omitted. The caller owns the lifecycle of
+ * both groups and must shut them down.
+ */
+ public static void configure(NettyServerBuilder serverBuilder,
+ EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
+ if (workerGroup == null) {
+ return;
+ }
+ serverBuilder.channelType(getServerChannelType())
+ .bossEventLoopGroup(bossGroup)
+ .workerEventLoopGroup(workerGroup);
+ }
+
+ /** Configure the {@link NettyChannelBuilder} to use the given event-loop group. */
+ public static void configure(NettyChannelBuilder channelBuilder, EventLoopGroup group) {
+ if (group == null) {
+ return;
+ }
+ channelBuilder.channelType(getClientChannelType())
+ .eventLoopGroup(group);
+ }
+
+ /** Gracefully shut down the group, swallowing interruptions. */
+ public static void shutdownGracefully(EventLoopGroup group) {
+ if (group == null) {
+ return;
+ }
+ try {
+ group.shutdownGracefully(0, 3, TimeUnit.SECONDS).await(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted while shutting down EventLoopGroup", e);
+ } catch (Exception e) {
+ LOG.warn("Failed to shut down EventLoopGroup", e);
+ }
+ }
+}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 0eaec6b962..2837cb9cd7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -21,6 +21,7 @@
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcEventLoops;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.metrics.intercept.client.MetricClientInterceptor;
import org.apache.ratis.proto.RaftProtos.GroupInfoReplyProto;
@@ -51,6 +52,7 @@
import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
@@ -94,6 +96,7 @@ public class GrpcClientProtocolClient implements Closeable {
private final AtomicReference unorderedStreamObservers = new AtomicReference<>();
private final MetricClientInterceptor metricClientInterceptor;
+ private final EventLoopGroup workerEventLoopGroup;
GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties,
SslContext adminSslContext, SslContext clientSslContext) {
@@ -103,6 +106,11 @@ public class GrpcClientProtocolClient implements Closeable {
this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
metricClientInterceptor = new MetricClientInterceptor(getName());
+ final int workerThreads = GrpcConfigKeys.Client.workerEventLoopThreads(properties);
+ this.workerEventLoopGroup = workerThreads > 0
+ ? GrpcEventLoops.newEventLoopGroup(workerThreads, getName() + "-grpc-worker-ELG")
+ : null;
+
final String clientAddress = Optional.ofNullable(target.getClientAddress())
.filter(x -> !x.isEmpty()).orElse(target.getAddress());
final String adminAddress = Optional.ofNullable(target.getAdminAddress())
@@ -135,6 +143,8 @@ private ManagedChannel buildChannel(String address, SslContext sslContext,
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
+ GrpcEventLoops.configure(channelBuilder, workerEventLoopGroup);
+
return channelBuilder.flowControlWindow(flowControlWindow.getSizeInt())
.maxInboundMessageSize(maxMessageSize.getSizeInt())
.intercept(metricClientInterceptor)
@@ -154,6 +164,7 @@ public void close() {
GrpcUtil.shutdownManagedChannel(adminChannel);
}
metricClientInterceptor.close();
+ GrpcEventLoops.shutdownGracefully(workerEventLoopGroup);
}
RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index d2748c7be2..e9856e2aa5 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.grpc.server;
+import org.apache.ratis.grpc.GrpcEventLoops;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.util.StreamObserverWithTimeout;
import org.apache.ratis.protocol.RaftPeerId;
@@ -31,6 +32,7 @@
import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceBlockingStub;
import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
@@ -57,12 +59,15 @@ class GrpcServerProtocolClient implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolClient.class);
//visible for using in log / error messages AND to use in instrumented tests
private final RaftPeerId raftPeerId;
+ private final EventLoopGroup eventLoopGroup;
GrpcServerProtocolClient(RaftPeer target, int connections, int flowControlWindow,
- TimeDuration requestTimeout, SslContext sslContext, boolean separateHBChannel) {
+ TimeDuration requestTimeout, SslContext sslContext, boolean separateHBChannel,
+ EventLoopGroup eventLoopGroup) {
raftPeerId = target.getId();
LOG.info("Build channel for {}", target);
useSeparateHBChannel = separateHBChannel;
+ this.eventLoopGroup = eventLoopGroup;
channel = buildChannel(target, flowControlWindow, sslContext);
blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
@@ -75,7 +80,8 @@ class GrpcServerProtocolClient implements Closeable {
}
GrpcStubPool newGrpcStubPool(String address, SslContext sslContext, int connections) {
- return new GrpcStubPool<>(connections, address, sslContext, RaftServerProtocolServiceGrpc::newStub, 16);
+ return new GrpcStubPool<>(connections, address, sslContext, RaftServerProtocolServiceGrpc::newStub, 16,
+ eventLoopGroup);
}
private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow, SslContext sslContext) {
@@ -90,6 +96,7 @@ private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow, SslC
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
channelBuilder.disableRetry();
+ GrpcEventLoops.configure(channelBuilder, eventLoopGroup);
return channelBuilder.flowControlWindow(flowControlWindow).build();
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
index d554ca583a..436e154106 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
@@ -19,6 +19,7 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcEventLoops;
import org.apache.ratis.grpc.metrics.MessageMetrics;
import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor;
import org.apache.ratis.protocol.AdminAsynchronousProtocol;
@@ -36,6 +37,7 @@
import org.apache.ratis.thirdparty.io.grpc.Server;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.proto.RaftProtos.*;
@@ -114,6 +116,7 @@ public static final class Builder {
private SizeInBytes flowControlWindow;
private TimeDuration requestTimeoutDuration;
private boolean separateHeartbeatChannel;
+ private int workerEventLoopThreads;
private Builder() {}
@@ -132,6 +135,7 @@ public Builder setServer(RaftServer raftServer) {
this.requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(properties);
this.separateHeartbeatChannel = GrpcConfigKeys.Server.heartbeatChannel(properties);
this.serverStubPoolSize = GrpcConfigKeys.Server.stubPoolSize(properties);
+ this.workerEventLoopThreads = GrpcConfigKeys.Server.workerEventLoopThreads(properties);
final SizeInBytes appenderBufferSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
final SizeInBytes gap = SizeInBytes.ONE_MB;
@@ -151,9 +155,9 @@ public Builder setCustomizer(Customizer customizer) {
return this;
}
- private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer target) {
+ private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer target, EventLoopGroup eventLoopGroup) {
return new GrpcServerProtocolClient(target, serverStubPoolSize, flowControlWindow.getSizeInt(),
- requestTimeoutDuration, serverSslContextForClient, separateHeartbeatChannel);
+ requestTimeoutDuration, serverSslContextForClient, separateHeartbeatChannel, eventLoopGroup);
}
private ExecutorService newExecutor() {
@@ -182,19 +186,20 @@ Server buildServer(NettyServerBuilder builder, EnumSet types)
return customizer.customize(builder, types).build();
}
- private NettyServerBuilder newNettyServerBuilderForServer() {
- return newNettyServerBuilder(serverHost, serverPort, serverSslContextForServer);
+ private NettyServerBuilder newNettyServerBuilderForServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
+ return newNettyServerBuilder(serverHost, serverPort, serverSslContextForServer, bossGroup, workerGroup);
}
- private NettyServerBuilder newNettyServerBuilderForAdmin() {
- return newNettyServerBuilder(adminHost, adminPort, adminSslContext);
+ private NettyServerBuilder newNettyServerBuilderForAdmin(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
+ return newNettyServerBuilder(adminHost, adminPort, adminSslContext, bossGroup, workerGroup);
}
- private NettyServerBuilder newNettyServerBuilderForClient() {
- return newNettyServerBuilder(clientHost, clientPort, clientSslContext);
+ private NettyServerBuilder newNettyServerBuilderForClient(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
+ return newNettyServerBuilder(clientHost, clientPort, clientSslContext, bossGroup, workerGroup);
}
- private NettyServerBuilder newNettyServerBuilder(String hostname, int port, SslContext sslContext) {
+ private NettyServerBuilder newNettyServerBuilder(String hostname, int port, SslContext sslContext,
+ EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
final InetSocketAddress address = hostname == null || hostname.isEmpty() ?
new InetSocketAddress(port) : new InetSocketAddress(hostname, port);
final NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address)
@@ -202,6 +207,8 @@ private NettyServerBuilder newNettyServerBuilder(String hostname, int port, SslC
.maxInboundMessageSize(messageSizeMax.getSizeInt())
.flowControlWindow(flowControlWindow.getSizeInt());
+ GrpcEventLoops.configure(nettyServerBuilder, bossGroup, workerGroup);
+
if (sslContext != null) {
LOG.info("Setting TLS for {}", address);
nettyServerBuilder.sslContext(sslContext);
@@ -217,9 +224,10 @@ private boolean separateClientServer() {
return clientPort > 0 && clientPort != serverPort;
}
- Server newServer(GrpcClientProtocolService client, ServerInterceptor interceptor) {
+ Server newServer(GrpcClientProtocolService client, ServerInterceptor interceptor,
+ EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
final EnumSet types = EnumSet.of(GrpcServices.Type.SERVER);
- final NettyServerBuilder serverBuilder = newNettyServerBuilderForServer();
+ final NettyServerBuilder serverBuilder = newNettyServerBuilderForServer(bossGroup, workerGroup);
final GrpcServerProtocolService service = newGrpcServerProtocolService();
serverBuilder.addService(ServerInterceptors.intercept(service, interceptor));
@@ -234,6 +242,24 @@ Server newServer(GrpcClientProtocolService client, ServerInterceptor interceptor
return buildServer(serverBuilder, types);
}
+ private EventLoopGroup bossEventLoopGroup;
+ private EventLoopGroup workerEventLoopGroup;
+
+ EventLoopGroup getBossEventLoopGroup() {
+ if (workerEventLoopThreads > 0 && bossEventLoopGroup == null) {
+ bossEventLoopGroup = GrpcEventLoops.newEventLoopGroup(1, server.getId() + "-grpc-boss-ELG");
+ }
+ return bossEventLoopGroup;
+ }
+
+ EventLoopGroup getWorkerEventLoopGroup() {
+ if (workerEventLoopThreads > 0 && workerEventLoopGroup == null) {
+ final String name = server.getId() + "-grpc-worker-ELG";
+ workerEventLoopGroup = GrpcEventLoops.newEventLoopGroup(workerEventLoopThreads, name);
+ }
+ return workerEventLoopGroup;
+ }
+
public GrpcServicesImpl build() {
return new GrpcServicesImpl(this);
}
@@ -274,20 +300,27 @@ public static Builder newBuilder() {
private final GrpcClientProtocolService clientProtocolService;
private final MetricServerInterceptor serverInterceptor;
+ private final EventLoopGroup bossEventLoopGroup;
+ private final EventLoopGroup workerEventLoopGroup;
private GrpcServicesImpl(Builder b) {
- super(b.server::getId, id -> new PeerProxyMap<>(id.toString(), b::newGrpcServerProtocolClient));
+ super(b.server::getId, id -> new PeerProxyMap<>(id.toString(),
+ peer -> b.newGrpcServerProtocolClient(peer, b.getWorkerEventLoopGroup())));
+
+ this.bossEventLoopGroup = b.getBossEventLoopGroup();
+ this.workerEventLoopGroup = b.getWorkerEventLoopGroup();
this.executor = b.newExecutor();
this.clientProtocolService = b.newGrpcClientProtocolService(executor);
this.serverInterceptor = b.newMetricServerInterceptor();
- final Server server = b.newServer(clientProtocolService, serverInterceptor);
+ final Server server = b.newServer(clientProtocolService, serverInterceptor,
+ bossEventLoopGroup, workerEventLoopGroup);
servers.put(GrpcServerProtocolService.class.getSimpleName(), server);
addressSupplier = newAddressSupplier(b.serverPort, server);
if (b.separateAdminServer()) {
- final NettyServerBuilder builder = b.newNettyServerBuilderForAdmin();
+ final NettyServerBuilder builder = b.newNettyServerBuilderForAdmin(bossEventLoopGroup, workerEventLoopGroup);
addAdminService(builder, b.server, serverInterceptor);
final Server adminServer = b.buildServer(builder, EnumSet.of(GrpcServices.Type.ADMIN));
servers.put(GrpcAdminProtocolService.class.getName(), adminServer);
@@ -297,7 +330,7 @@ private GrpcServicesImpl(Builder b) {
}
if (b.separateClientServer()) {
- final NettyServerBuilder builder = b.newNettyServerBuilderForClient();
+ final NettyServerBuilder builder = b.newNettyServerBuilderForClient(bossEventLoopGroup, workerEventLoopGroup);
addClientService(builder, clientProtocolService, serverInterceptor);
final Server clientServer = b.buildServer(builder, EnumSet.of(GrpcServices.Type.CLIENT));
servers.put(GrpcClientProtocolService.class.getName(), clientServer);
@@ -375,6 +408,9 @@ public void closeImpl() {
} catch (IOException e) {
LOG.warn("{}: Failed to close proxies", getId(), e);
}
+
+ GrpcEventLoops.shutdownGracefully(workerEventLoopGroup);
+ GrpcEventLoops.shutdownGracefully(bossEventLoopGroup);
}
@Override
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
index 9667661d07..edd0413430 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
@@ -17,12 +17,14 @@
*/
package org.apache.ratis.grpc.server;
+import org.apache.ratis.grpc.GrpcEventLoops;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
import org.apache.ratis.thirdparty.io.grpc.stub.AbstractStub;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.WriteBufferWaterMark;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.MemoizedSupplier;
@@ -41,7 +43,7 @@
final class GrpcStubPool> {
public static final Logger LOG = LoggerFactory.getLogger(GrpcStubPool.class);
- static ManagedChannel buildManagedChannel(String address, SslContext sslContext) {
+ static ManagedChannel buildManagedChannel(String address, SslContext sslContext, EventLoopGroup eventLoopGroup) {
NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(address)
.keepAliveTime(10, TimeUnit.MINUTES)
.keepAliveWithoutCalls(false)
@@ -53,6 +55,7 @@ static ManagedChannel buildManagedChannel(String address, SslContext sslContext)
} else {
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
+ GrpcEventLoops.configure(channelBuilder, eventLoopGroup);
ManagedChannel ch = channelBuilder.build();
ch.getState(true);
return ch;
@@ -63,8 +66,9 @@ static final class Stub> {
private final S stub;
private final Semaphore permits;
- Stub(String address, SslContext sslContext, Function stubFactory, int maxInflight) {
- this.ch = buildManagedChannel(address, sslContext);
+ Stub(String address, SslContext sslContext, Function stubFactory, int maxInflight,
+ EventLoopGroup eventLoopGroup) {
+ this.ch = buildManagedChannel(address, sslContext, eventLoopGroup);
this.stub = stubFactory.apply(ch);
this.permits = new Semaphore(maxInflight);
}
@@ -85,11 +89,12 @@ void shutdown() {
private final List>> pool;
GrpcStubPool(int connections, String address, SslContext sslContext, Function stubFactory,
- int maxInflightPerConn) {
+ int maxInflightPerConn, EventLoopGroup eventLoopGroup) {
Preconditions.assertTrue(connections > 1, "connections must be > 1");
final List>> tmpPool = new ArrayList<>(connections);
for (int i = 0; i < connections; i++) {
- tmpPool.add(MemoizedSupplier.valueOf(() -> new Stub<>(address, sslContext, stubFactory, maxInflightPerConn)));
+ tmpPool.add(MemoizedSupplier.valueOf(
+ () -> new Stub<>(address, sslContext, stubFactory, maxInflightPerConn, eventLoopGroup)));
}
this.pool = Collections.unmodifiableList(tmpPool);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java
new file mode 100644
index 0000000000..6ea1caceba
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.util.concurrent.EventExecutor;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestGrpcEventLoops extends BaseTest {
+
+ @Test
+ public void testNewEventLoopGroupWithThreadCount() {
+ final int threads = 3;
+ final EventLoopGroup group = GrpcEventLoops.newEventLoopGroup(threads, "test-elg");
+ try {
+ Assertions.assertNotNull(group);
+ int count = 0;
+ for (EventExecutor ignored : group) {
+ count++;
+ }
+ Assertions.assertEquals(threads, count);
+ } finally {
+ GrpcEventLoops.shutdownGracefully(group);
+ }
+ }
+
+ @Test
+ public void testNewEventLoopGroupRejectsNonPositiveThreads() {
+ Assertions.assertThrows(IllegalArgumentException.class,
+ () -> GrpcEventLoops.newEventLoopGroup(0, "test-elg"));
+ Assertions.assertThrows(IllegalArgumentException.class,
+ () -> GrpcEventLoops.newEventLoopGroup(-1, "test-elg"));
+ }
+
+ @Test
+ public void testChannelTypeMatchesEpollAvailability() {
+ if (GrpcEventLoops.isEpollAvailable()) {
+ Assertions.assertEquals("EpollServerSocketChannel",
+ GrpcEventLoops.getServerChannelType().getSimpleName());
+ Assertions.assertEquals("EpollSocketChannel",
+ GrpcEventLoops.getClientChannelType().getSimpleName());
+ } else {
+ Assertions.assertEquals("NioServerSocketChannel",
+ GrpcEventLoops.getServerChannelType().getSimpleName());
+ Assertions.assertEquals("NioSocketChannel",
+ GrpcEventLoops.getClientChannelType().getSimpleName());
+ }
+ }
+
+ @Test
+ public void testConfigKeyDefaults() {
+ final RaftProperties properties = new RaftProperties();
+ Assertions.assertEquals(0, GrpcConfigKeys.Server.workerEventLoopThreads(properties));
+ Assertions.assertEquals(0, GrpcConfigKeys.Client.workerEventLoopThreads(properties));
+ }
+
+ @Test
+ public void testConfigKeyRoundtrip() {
+ final RaftProperties properties = new RaftProperties();
+ GrpcConfigKeys.Server.setWorkerEventLoopThreads(properties, 4);
+ GrpcConfigKeys.Client.setWorkerEventLoopThreads(properties, 2);
+ Assertions.assertEquals(4, GrpcConfigKeys.Server.workerEventLoopThreads(properties));
+ Assertions.assertEquals(2, GrpcConfigKeys.Client.workerEventLoopThreads(properties));
+ }
+
+ @Test
+ public void testShutdownNullIsNoop() {
+ GrpcEventLoops.shutdownGracefully(null);
+ }
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java
new file mode 100644
index 0000000000..9d0df3fd02
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+/**
+ * Verify a cluster comes up and processes requests when the worker
+ * event-loop thread count is capped via
+ * {@link GrpcConfigKeys.Server#WORKER_EVENT_LOOP_THREADS_KEY} and
+ * {@link GrpcConfigKeys.Client#WORKER_EVENT_LOOP_THREADS_KEY}.
+ *
+ * Regression: RATIS-2529 — gRPC worker threads permanently inflate to
+ * {@code availableProcessors * 2} after follower restart catch-up.
+ */
+public class TestGrpcWorkerEventLoopThreads extends BaseTest {
+
+ @Test
+ public void testClusterWithCappedWorkerEventLoopThreads() throws Exception {
+ final String[] ids = {"s0", "s1", "s2"};
+ final RaftProperties properties = new RaftProperties();
+ GrpcConfigKeys.Server.setWorkerEventLoopThreads(properties, 2);
+ GrpcConfigKeys.Client.setWorkerEventLoopThreads(properties, 1);
+
+ try (MiniRaftClusterWithGrpc cluster = new MiniRaftClusterWithGrpc(ids, properties, null)) {
+ cluster.start();
+ waitForLeader(cluster);
+ try (RaftClient client = cluster.createClient()) {
+ final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("hello"));
+ Assertions.assertTrue(reply.isSuccess());
+ }
+ }
+ }
+}
From 20ddb29af520e396790edf1d5ac6c1d2079e4c87 Mon Sep 17 00:00:00 2001
From: Yongzao <532741407@qq.com>
Date: Mon, 25 May 2026 22:54:51 +0800
Subject: [PATCH 2/8] RATIS-2529. Address gRPC event loop review comments
---
.../org/apache/ratis/util}/NettyUtils.java | 42 +++++-
.../org/apache/ratis/grpc/GrpcConfigKeys.java | 63 +++++----
.../org/apache/ratis/grpc/GrpcEventLoops.java | 122 ------------------
.../grpc/client/GrpcClientProtocolClient.java | 15 ++-
.../grpc/server/GrpcServerProtocolClient.java | 7 +-
.../ratis/grpc/server/GrpcServicesImpl.java | 87 ++++++-------
.../ratis/grpc/server/GrpcStubPool.java | 7 +-
.../org/apache/ratis/netty/NettyClient.java | 1 +
.../org/apache/ratis/netty/NettyRpcProxy.java | 1 +
.../netty/client/NettyClientStreamRpc.java | 2 +-
.../ratis/netty/server/NettyRpcService.java | 2 +-
.../netty/server/NettyServerStreamRpc.java | 2 +-
...WithRpcTypeGrpcAndDataStreamTypeNetty.java | 2 +-
.../apache/ratis/grpc/TestGrpcEventLoops.java | 62 +++++----
.../grpc/TestGrpcWorkerEventLoopThreads.java | 8 +-
.../apache/ratis/netty/TestNettyRpcProxy.java | 1 +
.../ratis/netty/TestTlsConfWithNetty.java | 1 +
.../shell/cli/sh/TestSecureRatisShell.java | 2 +-
18 files changed, 178 insertions(+), 249 deletions(-)
rename {ratis-netty/src/main/java/org/apache/ratis/netty => ratis-common/src/main/java/org/apache/ratis/util}/NettyUtils.java (85%)
delete mode 100644 ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/NettyUtils.java
similarity index 85%
rename from ratis-netty/src/main/java/org/apache/ratis/netty/NettyUtils.java
rename to ratis-common/src/main/java/org/apache/ratis/util/NettyUtils.java
index 37666bf189..163c3d869f 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/NettyUtils.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.netty;
+package org.apache.ratis.util;
import org.apache.ratis.security.TlsConf;
import org.apache.ratis.security.TlsConf.CertificatesConf;
@@ -36,13 +36,14 @@
import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
-import org.apache.ratis.util.ConcurrentUtils;
-import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.thirdparty.io.netty.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
@@ -81,6 +82,39 @@ static EventLoopGroup newEventLoopGroup(String name, int size, boolean useEpoll)
return new NioEventLoopGroup(size, ConcurrentUtils.newThreadFactory(name + "-"));
}
+ static void shutdownGracefully(EventLoopGroup... groups) {
+ shutdownGracefully(CLOSE_TIMEOUT, groups);
+ }
+
+ static void shutdownGracefully(TimeDuration awaitTime, EventLoopGroup... groups) {
+ if (groups == null || groups.length == 0) {
+ return;
+ }
+
+ final List nonNullGroups = new ArrayList<>(groups.length);
+ final List> futures = new ArrayList<>(groups.length);
+ for (EventLoopGroup group : groups) {
+ if (group != null) {
+ nonNullGroups.add(group);
+ futures.add(group.shutdownGracefully());
+ }
+ }
+
+ for (int i = 0; i < futures.size(); i++) {
+ final EventLoopGroup group = nonNullGroups.get(i);
+ try {
+ if (!futures.get(i).await(awaitTime.getDuration(), awaitTime.getUnit())) {
+ LOG.warn("Failed to shut down EventLoopGroup {} in {}", group, awaitTime);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted while shutting down EventLoopGroup {}", group, e);
+ } catch (Exception e) {
+ LOG.warn("Failed to shut down EventLoopGroup {} in {}", group, awaitTime, e);
+ }
+ }
+ }
+
static void setTrustManager(SslContextBuilder b, TrustManagerConf trustManagerConfig) {
if (trustManagerConfig == null) {
return;
@@ -196,4 +230,4 @@ static void closeChannel(Channel channel, String name) {
LOG.warn("closeChannel {} is not yet completed in {}", name, CLOSE_TIMEOUT);
}
}
-}
\ No newline at end of file
+}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index c18cdb60be..de26693229 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -21,6 +21,7 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.server.GrpcServices;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.io.netty.util.NettyRuntime;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
@@ -51,6 +52,15 @@ static Consumer getDefaultLog() {
String PREFIX = "raft.grpc";
+ String USE_EPOLL_KEY = PREFIX + ".use-epoll";
+ boolean USE_EPOLL_DEFAULT = true;
+ static boolean useEpoll(RaftProperties properties) {
+ return getBoolean(properties::getBoolean, USE_EPOLL_KEY, USE_EPOLL_DEFAULT, getDefaultLog());
+ }
+ static void setUseEpoll(RaftProperties properties, boolean useEpoll) {
+ setBoolean(properties::setBoolean, USE_EPOLL_KEY, useEpoll);
+ }
+
interface TLS {
String PREFIX = GrpcConfigKeys.PREFIX + ".tls";
@@ -156,20 +166,14 @@ static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) {
parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS);
}
- /**
- * The number of worker threads for the gRPC client-side {@link
- * org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup}.
- * 0 (default) means use the gRPC default (i.e. {@code availableProcessors * 2});
- * a positive value caps the worker event-loop thread count.
- */
- String WORKER_EVENT_LOOP_THREADS_KEY = PREFIX + ".worker.event-loop.threads";
- int WORKER_EVENT_LOOP_THREADS_DEFAULT = 0;
- static int workerEventLoopThreads(RaftProperties properties) {
- return getInt(properties::getInt, WORKER_EVENT_LOOP_THREADS_KEY,
- WORKER_EVENT_LOOP_THREADS_DEFAULT, getDefaultLog(), requireMin(0));
+ String WORKER_GROUP_SIZE_KEY = PREFIX + ".worker-group.size";
+ int WORKER_GROUP_SIZE_DEFAULT = Math.max(1, NettyRuntime.availableProcessors() * 2);
+ static int workerGroupSize(RaftProperties properties) {
+ return getInt(properties::getInt, WORKER_GROUP_SIZE_KEY,
+ WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(1), requireMax(65536));
}
- static void setWorkerEventLoopThreads(RaftProperties properties, int threads) {
- setInt(properties::setInt, WORKER_EVENT_LOOP_THREADS_KEY, threads);
+ static void setWorkerGroupSize(RaftProperties properties, int size) {
+ setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, size);
}
}
@@ -308,21 +312,24 @@ static void setStubPoolSize(RaftProperties properties, int size) {
setInt(properties::setInt, STUB_POOL_SIZE_KEY, size);
}
- /**
- * The number of worker threads for the gRPC server-side {@link
- * org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup}.
- * The group is also reused by the server-to-server clients managed by this server.
- * 0 (default) means use the gRPC default (i.e. {@code availableProcessors * 2});
- * a positive value caps the worker event-loop thread count.
- */
- String WORKER_EVENT_LOOP_THREADS_KEY = PREFIX + ".worker.event-loop.threads";
- int WORKER_EVENT_LOOP_THREADS_DEFAULT = 0;
- static int workerEventLoopThreads(RaftProperties properties) {
- return getInt(properties::getInt, WORKER_EVENT_LOOP_THREADS_KEY,
- WORKER_EVENT_LOOP_THREADS_DEFAULT, getDefaultLog(), requireMin(0));
- }
- static void setWorkerEventLoopThreads(RaftProperties properties, int threads) {
- setInt(properties::setInt, WORKER_EVENT_LOOP_THREADS_KEY, threads);
+ String BOSS_GROUP_SIZE_KEY = PREFIX + ".boss-group.size";
+ int BOSS_GROUP_SIZE_DEFAULT = 0;
+ static int bossGroupSize(RaftProperties properties) {
+ return getInt(properties::getInt, BOSS_GROUP_SIZE_KEY,
+ BOSS_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(0), requireMax(65536));
+ }
+ static void setBossGroupSize(RaftProperties properties, int size) {
+ setInt(properties::setInt, BOSS_GROUP_SIZE_KEY, size);
+ }
+
+ String WORKER_GROUP_SIZE_KEY = PREFIX + ".worker-group.size";
+ int WORKER_GROUP_SIZE_DEFAULT = Math.max(1, NettyRuntime.availableProcessors() * 2);
+ static int workerGroupSize(RaftProperties properties) {
+ return getInt(properties::getInt, WORKER_GROUP_SIZE_KEY,
+ WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(1), requireMax(65536));
+ }
+ static void setWorkerGroupSize(RaftProperties properties, int size) {
+ setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, size);
}
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java
deleted file mode 100644
index 9deb4b79f7..0000000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc;
-
-import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
-import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
-import org.apache.ratis.thirdparty.io.netty.channel.Channel;
-import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
-import org.apache.ratis.thirdparty.io.netty.channel.ServerChannel;
-import org.apache.ratis.thirdparty.io.netty.channel.epoll.Epoll;
-import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
-import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollServerSocketChannel;
-import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollSocketChannel;
-import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.ratis.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Helper for creating bounded Netty {@link EventLoopGroup} instances for gRPC,
- * along with matching {@link Channel} / {@link ServerChannel} types.
- *
- * By default, gRPC uses a shared {@link EventLoopGroup} sized to
- * {@code Runtime.getRuntime().availableProcessors() * 2}, and Netty threads
- * in that group never exit once started. A traffic burst (e.g. a follower
- * catch-up after restart) can permanently inflate the active thread count.
- * This helper lets callers construct a smaller, dedicated group and wire it
- * into {@link NettyServerBuilder} / {@link NettyChannelBuilder} consistently
- * with the matching channel type.
- */
-public final class GrpcEventLoops {
- private static final Logger LOG = LoggerFactory.getLogger(GrpcEventLoops.class);
-
- private GrpcEventLoops() {}
-
- public static boolean isEpollAvailable() {
- return Epoll.isAvailable();
- }
-
- /** Create a new {@link EventLoopGroup} with the given number of threads. */
- public static EventLoopGroup newEventLoopGroup(int threads, String name) {
- if (threads <= 0) {
- throw new IllegalArgumentException("threads must be > 0, but got " + threads);
- }
- final ThreadFactory threadFactory = new DefaultThreadFactory(name);
- if (isEpollAvailable()) {
- LOG.info("Creating EpollEventLoopGroup({}) for {}", threads, name);
- return new EpollEventLoopGroup(threads, threadFactory);
- }
- LOG.info("Creating NioEventLoopGroup({}) for {} (Epoll not available)", threads, name);
- return new NioEventLoopGroup(threads, threadFactory);
- }
-
- public static Class extends ServerChannel> getServerChannelType() {
- return isEpollAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
- }
-
- public static Class extends Channel> getClientChannelType() {
- return isEpollAvailable() ? EpollSocketChannel.class : NioSocketChannel.class;
- }
-
- /**
- * Configure the {@link NettyServerBuilder} to use the given boss and worker groups.
- *
- *
gRPC requires that the boss group, worker group, and channel type are
- * all provided together or all omitted. The caller owns the lifecycle of
- * both groups and must shut them down.
- */
- public static void configure(NettyServerBuilder serverBuilder,
- EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
- if (workerGroup == null) {
- return;
- }
- serverBuilder.channelType(getServerChannelType())
- .bossEventLoopGroup(bossGroup)
- .workerEventLoopGroup(workerGroup);
- }
-
- /** Configure the {@link NettyChannelBuilder} to use the given event-loop group. */
- public static void configure(NettyChannelBuilder channelBuilder, EventLoopGroup group) {
- if (group == null) {
- return;
- }
- channelBuilder.channelType(getClientChannelType())
- .eventLoopGroup(group);
- }
-
- /** Gracefully shut down the group, swallowing interruptions. */
- public static void shutdownGracefully(EventLoopGroup group) {
- if (group == null) {
- return;
- }
- try {
- group.shutdownGracefully(0, 3, TimeUnit.SECONDS).await(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted while shutting down EventLoopGroup", e);
- } catch (Exception e) {
- LOG.warn("Failed to shut down EventLoopGroup", e);
- }
- }
-}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 2837cb9cd7..d51e13c5b5 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -21,8 +21,8 @@
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.GrpcEventLoops;
import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.util.NettyUtils;
import org.apache.ratis.grpc.metrics.intercept.client.MetricClientInterceptor;
import org.apache.ratis.proto.RaftProtos.GroupInfoReplyProto;
import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto;
@@ -106,10 +106,10 @@ public class GrpcClientProtocolClient implements Closeable {
this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
metricClientInterceptor = new MetricClientInterceptor(getName());
- final int workerThreads = GrpcConfigKeys.Client.workerEventLoopThreads(properties);
- this.workerEventLoopGroup = workerThreads > 0
- ? GrpcEventLoops.newEventLoopGroup(workerThreads, getName() + "-grpc-worker-ELG")
- : null;
+ this.workerEventLoopGroup = NettyUtils.newEventLoopGroup(
+ getName() + "-client-workers",
+ GrpcConfigKeys.Client.workerGroupSize(properties),
+ GrpcConfigKeys.useEpoll(properties));
final String clientAddress = Optional.ofNullable(target.getClientAddress())
.filter(x -> !x.isEmpty()).orElse(target.getAddress());
@@ -143,7 +143,8 @@ private ManagedChannel buildChannel(String address, SslContext sslContext,
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
- GrpcEventLoops.configure(channelBuilder, workerEventLoopGroup);
+ channelBuilder.channelType(NettyUtils.getSocketChannelClass(workerEventLoopGroup))
+ .eventLoopGroup(workerEventLoopGroup);
return channelBuilder.flowControlWindow(flowControlWindow.getSizeInt())
.maxInboundMessageSize(maxMessageSize.getSizeInt())
@@ -164,7 +165,7 @@ public void close() {
GrpcUtil.shutdownManagedChannel(adminChannel);
}
metricClientInterceptor.close();
- GrpcEventLoops.shutdownGracefully(workerEventLoopGroup);
+ NettyUtils.shutdownGracefully(workerEventLoopGroup);
}
RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index e9856e2aa5..6dc0d32527 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -17,8 +17,8 @@
*/
package org.apache.ratis.grpc.server;
-import org.apache.ratis.grpc.GrpcEventLoops;
import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.util.NettyUtils;
import org.apache.ratis.grpc.util.StreamObserverWithTimeout;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.util.ServerStringUtils;
@@ -96,7 +96,10 @@ private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow, SslC
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
channelBuilder.disableRetry();
- GrpcEventLoops.configure(channelBuilder, eventLoopGroup);
+ if (eventLoopGroup != null) {
+ channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup))
+ .eventLoopGroup(eventLoopGroup);
+ }
return channelBuilder.flowControlWindow(flowControlWindow).build();
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
index 436e154106..c604ee0726 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
@@ -19,9 +19,9 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.GrpcEventLoops;
import org.apache.ratis.grpc.metrics.MessageMetrics;
import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor;
+import org.apache.ratis.util.NettyUtils;
import org.apache.ratis.protocol.AdminAsynchronousProtocol;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
@@ -116,7 +116,10 @@ public static final class Builder {
private SizeInBytes flowControlWindow;
private TimeDuration requestTimeoutDuration;
private boolean separateHeartbeatChannel;
- private int workerEventLoopThreads;
+
+ private EventLoopGroup serverBosses;
+ private EventLoopGroup serverWorkers;
+ private EventLoopGroup clientWorkers;
private Builder() {}
@@ -135,7 +138,6 @@ public Builder setServer(RaftServer raftServer) {
this.requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(properties);
this.separateHeartbeatChannel = GrpcConfigKeys.Server.heartbeatChannel(properties);
this.serverStubPoolSize = GrpcConfigKeys.Server.stubPoolSize(properties);
- this.workerEventLoopThreads = GrpcConfigKeys.Server.workerEventLoopThreads(properties);
final SizeInBytes appenderBufferSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
final SizeInBytes gap = SizeInBytes.ONE_MB;
@@ -168,8 +170,7 @@ private ExecutorService newExecutor() {
server.getId() + "-request-");
}
- private GrpcClientProtocolService newGrpcClientProtocolService(
- ExecutorService executor) {
+ private GrpcClientProtocolService newGrpcClientProtocolService(ExecutorService executor) {
return new GrpcClientProtocolService(server::getId, server, executor);
}
@@ -186,28 +187,28 @@ Server buildServer(NettyServerBuilder builder, EnumSet types)
return customizer.customize(builder, types).build();
}
- private NettyServerBuilder newNettyServerBuilderForServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
- return newNettyServerBuilder(serverHost, serverPort, serverSslContextForServer, bossGroup, workerGroup);
+ private NettyServerBuilder newNettyServerBuilderForServer() {
+ return newNettyServerBuilder(serverHost, serverPort, serverSslContextForServer);
}
- private NettyServerBuilder newNettyServerBuilderForAdmin(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
- return newNettyServerBuilder(adminHost, adminPort, adminSslContext, bossGroup, workerGroup);
+ private NettyServerBuilder newNettyServerBuilderForAdmin() {
+ return newNettyServerBuilder(adminHost, adminPort, adminSslContext);
}
- private NettyServerBuilder newNettyServerBuilderForClient(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
- return newNettyServerBuilder(clientHost, clientPort, clientSslContext, bossGroup, workerGroup);
+ private NettyServerBuilder newNettyServerBuilderForClient() {
+ return newNettyServerBuilder(clientHost, clientPort, clientSslContext);
}
- private NettyServerBuilder newNettyServerBuilder(String hostname, int port, SslContext sslContext,
- EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
+ private NettyServerBuilder newNettyServerBuilder(String hostname, int port, SslContext sslContext) {
final InetSocketAddress address = hostname == null || hostname.isEmpty() ?
new InetSocketAddress(port) : new InetSocketAddress(hostname, port);
final NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address)
.withChildOption(ChannelOption.SO_REUSEADDR, true)
.maxInboundMessageSize(messageSizeMax.getSizeInt())
- .flowControlWindow(flowControlWindow.getSizeInt());
-
- GrpcEventLoops.configure(nettyServerBuilder, bossGroup, workerGroup);
+ .flowControlWindow(flowControlWindow.getSizeInt())
+ .channelType(NettyUtils.getServerChannelClass(serverBosses))
+ .bossEventLoopGroup(serverBosses)
+ .workerEventLoopGroup(serverWorkers);
if (sslContext != null) {
LOG.info("Setting TLS for {}", address);
@@ -224,10 +225,9 @@ private boolean separateClientServer() {
return clientPort > 0 && clientPort != serverPort;
}
- Server newServer(GrpcClientProtocolService client, ServerInterceptor interceptor,
- EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
+ Server newServer(GrpcClientProtocolService client, ServerInterceptor interceptor) {
final EnumSet types = EnumSet.of(GrpcServices.Type.SERVER);
- final NettyServerBuilder serverBuilder = newNettyServerBuilderForServer(bossGroup, workerGroup);
+ final NettyServerBuilder serverBuilder = newNettyServerBuilderForServer();
final GrpcServerProtocolService service = newGrpcServerProtocolService();
serverBuilder.addService(ServerInterceptors.intercept(service, interceptor));
@@ -242,25 +242,16 @@ Server newServer(GrpcClientProtocolService client, ServerInterceptor interceptor
return buildServer(serverBuilder, types);
}
- private EventLoopGroup bossEventLoopGroup;
- private EventLoopGroup workerEventLoopGroup;
-
- EventLoopGroup getBossEventLoopGroup() {
- if (workerEventLoopThreads > 0 && bossEventLoopGroup == null) {
- bossEventLoopGroup = GrpcEventLoops.newEventLoopGroup(1, server.getId() + "-grpc-boss-ELG");
- }
- return bossEventLoopGroup;
- }
-
- EventLoopGroup getWorkerEventLoopGroup() {
- if (workerEventLoopThreads > 0 && workerEventLoopGroup == null) {
- final String name = server.getId() + "-grpc-worker-ELG";
- workerEventLoopGroup = GrpcEventLoops.newEventLoopGroup(workerEventLoopThreads, name);
- }
- return workerEventLoopGroup;
- }
-
public GrpcServicesImpl build() {
+ final RaftProperties props = server.getProperties();
+ final String id = server.getId() + "";
+ final boolean useEpoll = GrpcConfigKeys.useEpoll(props);
+ serverBosses = NettyUtils.newEventLoopGroup(id + "-boss",
+ GrpcConfigKeys.Server.bossGroupSize(props), useEpoll);
+ serverWorkers = NettyUtils.newEventLoopGroup(id + "-server-workers",
+ GrpcConfigKeys.Server.workerGroupSize(props), useEpoll);
+ clientWorkers = NettyUtils.newEventLoopGroup(id + "-client-workers",
+ GrpcConfigKeys.Client.workerGroupSize(props), useEpoll);
return new GrpcServicesImpl(this);
}
@@ -300,27 +291,28 @@ public static Builder newBuilder() {
private final GrpcClientProtocolService clientProtocolService;
private final MetricServerInterceptor serverInterceptor;
- private final EventLoopGroup bossEventLoopGroup;
- private final EventLoopGroup workerEventLoopGroup;
+ private final EventLoopGroup serverBosses;
+ private final EventLoopGroup serverWorkers;
+ private final EventLoopGroup clientWorkers;
private GrpcServicesImpl(Builder b) {
super(b.server::getId, id -> new PeerProxyMap<>(id.toString(),
- peer -> b.newGrpcServerProtocolClient(peer, b.getWorkerEventLoopGroup())));
+ peer -> b.newGrpcServerProtocolClient(peer, b.clientWorkers)));
- this.bossEventLoopGroup = b.getBossEventLoopGroup();
- this.workerEventLoopGroup = b.getWorkerEventLoopGroup();
+ this.serverBosses = b.serverBosses;
+ this.serverWorkers = b.serverWorkers;
+ this.clientWorkers = b.clientWorkers;
this.executor = b.newExecutor();
this.clientProtocolService = b.newGrpcClientProtocolService(executor);
this.serverInterceptor = b.newMetricServerInterceptor();
- final Server server = b.newServer(clientProtocolService, serverInterceptor,
- bossEventLoopGroup, workerEventLoopGroup);
+ final Server server = b.newServer(clientProtocolService, serverInterceptor);
servers.put(GrpcServerProtocolService.class.getSimpleName(), server);
addressSupplier = newAddressSupplier(b.serverPort, server);
if (b.separateAdminServer()) {
- final NettyServerBuilder builder = b.newNettyServerBuilderForAdmin(bossEventLoopGroup, workerEventLoopGroup);
+ final NettyServerBuilder builder = b.newNettyServerBuilderForAdmin();
addAdminService(builder, b.server, serverInterceptor);
final Server adminServer = b.buildServer(builder, EnumSet.of(GrpcServices.Type.ADMIN));
servers.put(GrpcAdminProtocolService.class.getName(), adminServer);
@@ -330,7 +322,7 @@ private GrpcServicesImpl(Builder b) {
}
if (b.separateClientServer()) {
- final NettyServerBuilder builder = b.newNettyServerBuilderForClient(bossEventLoopGroup, workerEventLoopGroup);
+ final NettyServerBuilder builder = b.newNettyServerBuilderForClient();
addClientService(builder, clientProtocolService, serverInterceptor);
final Server clientServer = b.buildServer(builder, EnumSet.of(GrpcServices.Type.CLIENT));
servers.put(GrpcClientProtocolService.class.getName(), clientServer);
@@ -409,8 +401,7 @@ public void closeImpl() {
LOG.warn("{}: Failed to close proxies", getId(), e);
}
- GrpcEventLoops.shutdownGracefully(workerEventLoopGroup);
- GrpcEventLoops.shutdownGracefully(bossEventLoopGroup);
+ NettyUtils.shutdownGracefully(clientWorkers, serverWorkers, serverBosses);
}
@Override
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
index edd0413430..84a4651f9d 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
@@ -17,8 +17,8 @@
*/
package org.apache.ratis.grpc.server;
-import org.apache.ratis.grpc.GrpcEventLoops;
import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.util.NettyUtils;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
@@ -55,7 +55,10 @@ static ManagedChannel buildManagedChannel(String address, SslContext sslContext,
} else {
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
- GrpcEventLoops.configure(channelBuilder, eventLoopGroup);
+ if (eventLoopGroup != null) {
+ channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup))
+ .eventLoopGroup(eventLoopGroup);
+ }
ManagedChannel ch = channelBuilder.build();
ch.getState(true);
return ch;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java
index 56ca6b030c..3c908f67a6 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java
@@ -27,6 +27,7 @@
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.NettyUtils;
import java.io.Closeable;
import java.net.InetSocketAddress;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index e72d6c6772..d2eb38859a 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -35,6 +35,7 @@
import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.NettyUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 2bfeea31e1..5b673d5189 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -28,7 +28,7 @@
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
-import org.apache.ratis.netty.NettyUtils;
+import org.apache.ratis.util.NettyUtils;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index bc57343fb2..f7d2805e8a 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -20,7 +20,7 @@
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyRpcProxy;
-import org.apache.ratis.netty.NettyUtils;
+import org.apache.ratis.util.NettyUtils;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.GroupListReply;
import org.apache.ratis.protocol.RaftClientReply;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 24303d867e..643ed15a0f 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -27,7 +27,7 @@
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
-import org.apache.ratis.netty.NettyUtils;
+import org.apache.ratis.util.NettyUtils;
import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamPacket;
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java
index 247a7bcac1..156ab410d3 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java
@@ -19,7 +19,7 @@
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.netty.NettyConfigKeys;
-import org.apache.ratis.netty.NettyUtils;
+import org.apache.ratis.util.NettyUtils;
import org.apache.ratis.security.SecurityTestUtils;
import org.apache.ratis.security.TlsConf;
import org.apache.ratis.util.JavaUtils;
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java
index 6ea1caceba..c09a533397 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java
@@ -19,7 +19,9 @@
import org.apache.ratis.BaseTest;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.util.NettyUtils;
import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.epoll.Epoll;
import org.apache.ratis.thirdparty.io.netty.util.concurrent.EventExecutor;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -29,7 +31,7 @@ public class TestGrpcEventLoops extends BaseTest {
@Test
public void testNewEventLoopGroupWithThreadCount() {
final int threads = 3;
- final EventLoopGroup group = GrpcEventLoops.newEventLoopGroup(threads, "test-elg");
+ final EventLoopGroup group = NettyUtils.newEventLoopGroup("test-elg", threads, false);
try {
Assertions.assertNotNull(group);
int count = 0;
@@ -38,51 +40,57 @@ public void testNewEventLoopGroupWithThreadCount() {
}
Assertions.assertEquals(threads, count);
} finally {
- GrpcEventLoops.shutdownGracefully(group);
+ NettyUtils.shutdownGracefully(group);
}
}
- @Test
- public void testNewEventLoopGroupRejectsNonPositiveThreads() {
- Assertions.assertThrows(IllegalArgumentException.class,
- () -> GrpcEventLoops.newEventLoopGroup(0, "test-elg"));
- Assertions.assertThrows(IllegalArgumentException.class,
- () -> GrpcEventLoops.newEventLoopGroup(-1, "test-elg"));
- }
-
@Test
public void testChannelTypeMatchesEpollAvailability() {
- if (GrpcEventLoops.isEpollAvailable()) {
- Assertions.assertEquals("EpollServerSocketChannel",
- GrpcEventLoops.getServerChannelType().getSimpleName());
- Assertions.assertEquals("EpollSocketChannel",
- GrpcEventLoops.getClientChannelType().getSimpleName());
- } else {
- Assertions.assertEquals("NioServerSocketChannel",
- GrpcEventLoops.getServerChannelType().getSimpleName());
- Assertions.assertEquals("NioSocketChannel",
- GrpcEventLoops.getClientChannelType().getSimpleName());
+ final EventLoopGroup group = NettyUtils.newEventLoopGroup("test-epoll", 1, true);
+ try {
+ if (Epoll.isAvailable()) {
+ Assertions.assertEquals("EpollServerSocketChannel",
+ NettyUtils.getServerChannelClass(group).getSimpleName());
+ Assertions.assertEquals("EpollSocketChannel",
+ NettyUtils.getSocketChannelClass(group).getSimpleName());
+ } else {
+ Assertions.assertEquals("NioServerSocketChannel",
+ NettyUtils.getServerChannelClass(group).getSimpleName());
+ Assertions.assertEquals("NioSocketChannel",
+ NettyUtils.getSocketChannelClass(group).getSimpleName());
+ }
+ } finally {
+ NettyUtils.shutdownGracefully(group);
}
}
@Test
public void testConfigKeyDefaults() {
final RaftProperties properties = new RaftProperties();
- Assertions.assertEquals(0, GrpcConfigKeys.Server.workerEventLoopThreads(properties));
- Assertions.assertEquals(0, GrpcConfigKeys.Client.workerEventLoopThreads(properties));
+ final int expectedWorker = GrpcConfigKeys.Server.WORKER_GROUP_SIZE_DEFAULT;
+ Assertions.assertTrue(expectedWorker > 0,
+ "default worker threads should be positive, but got " + expectedWorker);
+ Assertions.assertEquals(expectedWorker, GrpcConfigKeys.Server.workerGroupSize(properties));
+ Assertions.assertEquals(expectedWorker, GrpcConfigKeys.Client.workerGroupSize(properties));
+ Assertions.assertEquals(0, GrpcConfigKeys.Server.bossGroupSize(properties));
+ Assertions.assertTrue(GrpcConfigKeys.useEpoll(properties));
}
@Test
public void testConfigKeyRoundtrip() {
final RaftProperties properties = new RaftProperties();
- GrpcConfigKeys.Server.setWorkerEventLoopThreads(properties, 4);
- GrpcConfigKeys.Client.setWorkerEventLoopThreads(properties, 2);
- Assertions.assertEquals(4, GrpcConfigKeys.Server.workerEventLoopThreads(properties));
- Assertions.assertEquals(2, GrpcConfigKeys.Client.workerEventLoopThreads(properties));
+ GrpcConfigKeys.Server.setWorkerGroupSize(properties, 4);
+ GrpcConfigKeys.Server.setBossGroupSize(properties, 1);
+ GrpcConfigKeys.Client.setWorkerGroupSize(properties, 2);
+ GrpcConfigKeys.setUseEpoll(properties, false);
+ Assertions.assertEquals(4, GrpcConfigKeys.Server.workerGroupSize(properties));
+ Assertions.assertEquals(1, GrpcConfigKeys.Server.bossGroupSize(properties));
+ Assertions.assertEquals(2, GrpcConfigKeys.Client.workerGroupSize(properties));
+ Assertions.assertFalse(GrpcConfigKeys.useEpoll(properties));
}
@Test
public void testShutdownNullIsNoop() {
- GrpcEventLoops.shutdownGracefully(null);
+ NettyUtils.shutdownGracefully((EventLoopGroup) null);
}
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java
index 9d0df3fd02..6b85e61d1d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java
@@ -30,8 +30,8 @@
/**
* Verify a cluster comes up and processes requests when the worker
* event-loop thread count is capped via
- * {@link GrpcConfigKeys.Server#WORKER_EVENT_LOOP_THREADS_KEY} and
- * {@link GrpcConfigKeys.Client#WORKER_EVENT_LOOP_THREADS_KEY}.
+ * {@link GrpcConfigKeys.Server#WORKER_GROUP_SIZE_KEY} and
+ * {@link GrpcConfigKeys.Client#WORKER_GROUP_SIZE_KEY}.
*
* Regression: RATIS-2529 — gRPC worker threads permanently inflate to
* {@code availableProcessors * 2} after follower restart catch-up.
@@ -42,8 +42,8 @@ public class TestGrpcWorkerEventLoopThreads extends BaseTest {
public void testClusterWithCappedWorkerEventLoopThreads() throws Exception {
final String[] ids = {"s0", "s1", "s2"};
final RaftProperties properties = new RaftProperties();
- GrpcConfigKeys.Server.setWorkerEventLoopThreads(properties, 2);
- GrpcConfigKeys.Client.setWorkerEventLoopThreads(properties, 1);
+ GrpcConfigKeys.Server.setWorkerGroupSize(properties, 2);
+ GrpcConfigKeys.Client.setWorkerGroupSize(properties, 1);
try (MiniRaftClusterWithGrpc cluster = new MiniRaftClusterWithGrpc(ids, properties, null)) {
cluster.start();
diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java
index 2507220a8e..6b83ab7115 100644
--- a/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java
+++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java
@@ -34,6 +34,7 @@
import org.apache.ratis.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.NettyUtils;
import org.junit.jupiter.api.Test;
import java.lang.reflect.Field;
diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestTlsConfWithNetty.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestTlsConfWithNetty.java
index dbdcf1ebd8..5d73d9ea3a 100644
--- a/ratis-test/src/test/java/org/apache/ratis/netty/TestTlsConfWithNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestTlsConfWithNetty.java
@@ -38,6 +38,7 @@
import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.NettyUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
diff --git a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/TestSecureRatisShell.java b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/TestSecureRatisShell.java
index 21e9fe229a..7b7de79d21 100644
--- a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/TestSecureRatisShell.java
+++ b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/TestSecureRatisShell.java
@@ -24,7 +24,7 @@
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
-import org.apache.ratis.netty.NettyUtils;
+import org.apache.ratis.util.NettyUtils;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.security.SecurityTestUtils;
import org.apache.ratis.util.Slf4jUtils;
From 7b5fcf68218de7919e907de62168aca7a72bbba0 Mon Sep 17 00:00:00 2001
From: Yongzao <532741407@qq.com>
Date: Fri, 29 May 2026 13:39:33 +0800
Subject: [PATCH 3/8] RATIS-2529. Fix gRPC client worker ownership
---
.../grpc/client/GrpcClientProtocolClient.java | 13 ++--
.../ratis/grpc/client/GrpcClientRpc.java | 42 ++++++++++--
.../ratis/grpc/server/GrpcServicesImpl.java | 19 ++++--
.../grpc/client/TestGrpcClientEventLoops.java | 68 +++++++++++++++++++
4 files changed, 123 insertions(+), 19 deletions(-)
create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index d51e13c5b5..5c0f622a9b 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -99,17 +99,13 @@ public class GrpcClientProtocolClient implements Closeable {
private final EventLoopGroup workerEventLoopGroup;
GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties,
- SslContext adminSslContext, SslContext clientSslContext) {
+ SslContext adminSslContext, SslContext clientSslContext, EventLoopGroup workerEventLoopGroup) {
this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
this.target = target;
final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
metricClientInterceptor = new MetricClientInterceptor(getName());
-
- this.workerEventLoopGroup = NettyUtils.newEventLoopGroup(
- getName() + "-client-workers",
- GrpcConfigKeys.Client.workerGroupSize(properties),
- GrpcConfigKeys.useEpoll(properties));
+ this.workerEventLoopGroup = Objects.requireNonNull(workerEventLoopGroup, "workerEventLoopGroup");
final String clientAddress = Optional.ofNullable(target.getClientAddress())
.filter(x -> !x.isEmpty()).orElse(target.getAddress());
@@ -165,7 +161,10 @@ public void close() {
GrpcUtil.shutdownManagedChannel(adminChannel);
}
metricClientInterceptor.close();
- NettyUtils.shutdownGracefully(workerEventLoopGroup);
+ }
+
+ EventLoopGroup getWorkerEventLoopGroup() {
+ return workerEventLoopGroup;
}
RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 65175dc2a1..8e35dd9504 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -29,6 +29,7 @@
import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto;
import org.apache.ratis.proto.RaftProtos.GroupListRequestProto;
import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
@@ -41,6 +42,7 @@
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.NettyUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
@@ -60,15 +62,36 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy(clientId.toString(),
- p -> new GrpcClientProtocolClient(clientId, p, properties, adminSslContext, clientSslContext)));
- this.clientId = clientId;
- this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt();
- this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
- this.watchRequestTimeoutDuration = RaftClientConfigKeys.Rpc.watchRequestTimeout(properties);
+ p -> new GrpcClientProtocolClient(clientId, p, properties, adminSslContext, clientSslContext, clientWorkers)));
+ this.clientWorkers = clientWorkers;
+
+ boolean initialized = false;
+ try {
+ this.clientId = clientId;
+ this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt();
+ this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
+ this.watchRequestTimeoutDuration = RaftClientConfigKeys.Rpc.watchRequestTimeout(properties);
+ initialized = true;
+ } finally {
+ if (!initialized) {
+ NettyUtils.shutdownGracefully(clientWorkers);
+ }
+ }
+ }
+
+ private static EventLoopGroup newClientWorkers(ClientId clientId, RaftProperties properties) {
+ return NettyUtils.newEventLoopGroup(clientId + "-client-workers",
+ GrpcConfigKeys.Client.workerGroupSize(properties), GrpcConfigKeys.useEpoll(properties));
}
@Override
@@ -213,6 +236,15 @@ private RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest reques
return proto;
}
+ @Override
+ public void close() {
+ try {
+ super.close();
+ } finally {
+ NettyUtils.shutdownGracefully(clientWorkers);
+ }
+ }
+
@Override
public boolean shouldReconnect(Throwable e) {
final Throwable cause = e.getCause();
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
index c604ee0726..5326cd02e6 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
@@ -246,13 +246,18 @@ public GrpcServicesImpl build() {
final RaftProperties props = server.getProperties();
final String id = server.getId() + "";
final boolean useEpoll = GrpcConfigKeys.useEpoll(props);
- serverBosses = NettyUtils.newEventLoopGroup(id + "-boss",
- GrpcConfigKeys.Server.bossGroupSize(props), useEpoll);
- serverWorkers = NettyUtils.newEventLoopGroup(id + "-server-workers",
- GrpcConfigKeys.Server.workerGroupSize(props), useEpoll);
- clientWorkers = NettyUtils.newEventLoopGroup(id + "-client-workers",
- GrpcConfigKeys.Client.workerGroupSize(props), useEpoll);
- return new GrpcServicesImpl(this);
+ try {
+ serverBosses = NettyUtils.newEventLoopGroup(id + "-boss",
+ GrpcConfigKeys.Server.bossGroupSize(props), useEpoll);
+ serverWorkers = NettyUtils.newEventLoopGroup(id + "-server-workers",
+ GrpcConfigKeys.Server.workerGroupSize(props), useEpoll);
+ clientWorkers = NettyUtils.newEventLoopGroup(id + "-client-workers",
+ GrpcConfigKeys.Client.workerGroupSize(props), useEpoll);
+ return new GrpcServicesImpl(this);
+ } catch (RuntimeException | Error e) {
+ NettyUtils.shutdownGracefully(clientWorkers, serverWorkers, serverBosses);
+ throw e;
+ }
}
public Builder setAdminSslContext(SslContext adminSslContext) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java b/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java
new file mode 100644
index 0000000000..17ed2ba0c5
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.util.concurrent.EventExecutor;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+public class TestGrpcClientEventLoops extends BaseTest {
+ @Test
+ public void testClientWorkerEventLoopGroupSharedByPeerProxies() throws Exception {
+ final RaftProperties properties = new RaftProperties();
+ GrpcConfigKeys.Client.setWorkerGroupSize(properties, 1);
+ GrpcConfigKeys.setUseEpoll(properties, false);
+
+ final RaftPeer p0 = newPeer("s0", 15000);
+ final RaftPeer p1 = newPeer("s1", 15001);
+ EventLoopGroup clientWorkers = null;
+ try (GrpcClientRpc rpc = new GrpcClientRpc(ClientId.randomId(), properties, null, null)) {
+ rpc.addRaftPeers(Arrays.asList(p0, p1));
+ final GrpcClientProtocolClient c0 = rpc.getProxies().getProxy(p0.getId());
+ final GrpcClientProtocolClient c1 = rpc.getProxies().getProxy(p1.getId());
+
+ clientWorkers = c0.getWorkerEventLoopGroup();
+ Assertions.assertSame(clientWorkers, c1.getWorkerEventLoopGroup());
+ Assertions.assertEquals(1, countEventExecutors(clientWorkers));
+ Assertions.assertFalse(clientWorkers.isShuttingDown());
+ }
+
+ Assertions.assertNotNull(clientWorkers);
+ Assertions.assertTrue(clientWorkers.isShuttingDown() || clientWorkers.isShutdown() || clientWorkers.isTerminated());
+ }
+
+ private static RaftPeer newPeer(String id, int port) {
+ return RaftPeer.newBuilder().setId(id).setAddress("127.0.0.1:" + port).build();
+ }
+
+ private static int countEventExecutors(EventLoopGroup group) {
+ int count = 0;
+ for (EventExecutor ignored : group) {
+ count++;
+ }
+ return count;
+ }
+}
From 84c1a576729361d261535ba66c026fd7da71d20d Mon Sep 17 00:00:00 2001
From: Yongzao <532741407@qq.com>
Date: Wed, 27 May 2026 17:44:22 +0800
Subject: [PATCH 4/8] RATIS-2529. Address gRPC EventLoopGroup review comments
---
.../org/apache/ratis/grpc/GrpcFactory.java | 2 +-
.../grpc/client/GrpcClientProtocolClient.java | 18 +++++---
.../ratis/grpc/client/GrpcClientRpc.java | 44 ++++++++-----------
.../grpc/server/GrpcServerProtocolClient.java | 9 ++--
.../ratis/grpc/server/GrpcServicesImpl.java | 9 +++-
.../ratis/grpc/server/GrpcStubPool.java | 8 ++--
.../grpc/client/TestGrpcClientEventLoops.java | 6 +--
7 files changed, 49 insertions(+), 47 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
index 1053cab80e..f3ca3f5faf 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -146,6 +146,6 @@ public GrpcClientRpc newRaftClientRpc(ClientId clientId, RaftProperties properti
checkPooledByteBufAllocatorUseCacheForAllThreads(LOG::debug);
final SslContexts forClient = forClientSupplier.get();
- return new GrpcClientRpc(clientId, properties, forClient.adminSslContext, forClient.clientSslContext);
+ return GrpcClientRpc.create(clientId, properties, forClient.adminSslContext, forClient.clientSslContext);
}
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 5c0f622a9b..7c9545b559 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -56,6 +56,8 @@
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
@@ -96,16 +98,16 @@ public class GrpcClientProtocolClient implements Closeable {
private final AtomicReference unorderedStreamObservers = new AtomicReference<>();
private final MetricClientInterceptor metricClientInterceptor;
- private final EventLoopGroup workerEventLoopGroup;
+ private final MemoizedSupplier clientWorkers;
GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties,
- SslContext adminSslContext, SslContext clientSslContext, EventLoopGroup workerEventLoopGroup) {
+ SslContext adminSslContext, SslContext clientSslContext, MemoizedSupplier clientWorkers) {
this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
this.target = target;
final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
metricClientInterceptor = new MetricClientInterceptor(getName());
- this.workerEventLoopGroup = Objects.requireNonNull(workerEventLoopGroup, "workerEventLoopGroup");
+ this.clientWorkers = Objects.requireNonNull(clientWorkers, "clientWorkers");
final String clientAddress = Optional.ofNullable(target.getClientAddress())
.filter(x -> !x.isEmpty()).orElse(target.getAddress());
@@ -139,8 +141,9 @@ private ManagedChannel buildChannel(String address, SslContext sslContext,
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
- channelBuilder.channelType(NettyUtils.getSocketChannelClass(workerEventLoopGroup))
- .eventLoopGroup(workerEventLoopGroup);
+ final EventLoopGroup eventLoopGroup = clientWorkers.get();
+ channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup))
+ .eventLoopGroup(eventLoopGroup);
return channelBuilder.flowControlWindow(flowControlWindow.getSizeInt())
.maxInboundMessageSize(maxMessageSize.getSizeInt())
@@ -163,8 +166,9 @@ public void close() {
metricClientInterceptor.close();
}
- EventLoopGroup getWorkerEventLoopGroup() {
- return workerEventLoopGroup;
+ EventLoopGroup getClientWorkersForTesting() {
+ Preconditions.assertTrue(clientWorkers.isInitialized());
+ return clientWorkers.get();
}
RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 8e35dd9504..b5e8a4835e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -42,6 +42,7 @@
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.NettyUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.TimeDuration;
@@ -58,40 +59,31 @@
public class GrpcClientRpc extends RaftClientRpcWithProxy {
public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class);
+ public static GrpcClientRpc create(ClientId clientId, RaftProperties properties,
+ SslContext adminSslContext, SslContext clientSslContext) {
+ final MemoizedSupplier eventLoopGroup = MemoizedSupplier.valueOf(() -> NettyUtils.newEventLoopGroup(
+ clientId + "-client-workers",
+ GrpcConfigKeys.Client.workerGroupSize(properties),
+ GrpcConfigKeys.useEpoll(properties)));
+ return new GrpcClientRpc(clientId, properties, adminSslContext, clientSslContext, eventLoopGroup);
+ }
+
private final ClientId clientId;
private final int maxMessageSize;
private final TimeDuration requestTimeoutDuration;
private final TimeDuration watchRequestTimeoutDuration;
- private final EventLoopGroup clientWorkers;
-
- public GrpcClientRpc(ClientId clientId, RaftProperties properties,
- SslContext adminSslContext, SslContext clientSslContext) {
- this(clientId, properties, adminSslContext, clientSslContext, newClientWorkers(clientId, properties));
- }
+ private final MemoizedSupplier clientWorkers;
private GrpcClientRpc(ClientId clientId, RaftProperties properties,
- SslContext adminSslContext, SslContext clientSslContext, EventLoopGroup clientWorkers) {
+ SslContext adminSslContext, SslContext clientSslContext, MemoizedSupplier clientWorkers) {
super(new PeerProxyMap<>(clientId.toString(),
p -> new GrpcClientProtocolClient(clientId, p, properties, adminSslContext, clientSslContext, clientWorkers)));
this.clientWorkers = clientWorkers;
- boolean initialized = false;
- try {
- this.clientId = clientId;
- this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt();
- this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
- this.watchRequestTimeoutDuration = RaftClientConfigKeys.Rpc.watchRequestTimeout(properties);
- initialized = true;
- } finally {
- if (!initialized) {
- NettyUtils.shutdownGracefully(clientWorkers);
- }
- }
- }
-
- private static EventLoopGroup newClientWorkers(ClientId clientId, RaftProperties properties) {
- return NettyUtils.newEventLoopGroup(clientId + "-client-workers",
- GrpcConfigKeys.Client.workerGroupSize(properties), GrpcConfigKeys.useEpoll(properties));
+ this.clientId = clientId;
+ this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt();
+ this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
+ this.watchRequestTimeoutDuration = RaftClientConfigKeys.Rpc.watchRequestTimeout(properties);
}
@Override
@@ -241,7 +233,9 @@ public void close() {
try {
super.close();
} finally {
- NettyUtils.shutdownGracefully(clientWorkers);
+ if (clientWorkers.isInitialized()) {
+ NettyUtils.shutdownGracefully(clientWorkers.get());
+ }
}
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 6dc0d32527..601c0f58fb 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -39,6 +39,7 @@
import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.util.Objects;
/**
* This is a RaftClient implementation that supports streaming data to the raft
@@ -67,7 +68,7 @@ class GrpcServerProtocolClient implements Closeable {
raftPeerId = target.getId();
LOG.info("Build channel for {}", target);
useSeparateHBChannel = separateHBChannel;
- this.eventLoopGroup = eventLoopGroup;
+ this.eventLoopGroup = Objects.requireNonNull(eventLoopGroup, "eventLoopGroup == null");
channel = buildChannel(target, flowControlWindow, sslContext);
blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
@@ -96,10 +97,8 @@ private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow, SslC
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
channelBuilder.disableRetry();
- if (eventLoopGroup != null) {
- channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup))
- .eventLoopGroup(eventLoopGroup);
- }
+ channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup))
+ .eventLoopGroup(eventLoopGroup);
return channelBuilder.flowControlWindow(flowControlWindow).build();
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
index 5326cd02e6..04b72fd8b9 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
@@ -254,9 +254,14 @@ public GrpcServicesImpl build() {
clientWorkers = NettyUtils.newEventLoopGroup(id + "-client-workers",
GrpcConfigKeys.Client.workerGroupSize(props), useEpoll);
return new GrpcServicesImpl(this);
- } catch (RuntimeException | Error e) {
+ } catch (Throwable t) {
NettyUtils.shutdownGracefully(clientWorkers, serverWorkers, serverBosses);
- throw e;
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ } else if (t instanceof Error) {
+ throw (Error) t;
+ }
+ throw new RuntimeException(t);
}
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
index 84a4651f9d..5a196579f6 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
@@ -35,6 +35,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -44,6 +45,7 @@ final class GrpcStubPool> {
public static final Logger LOG = LoggerFactory.getLogger(GrpcStubPool.class);
static ManagedChannel buildManagedChannel(String address, SslContext sslContext, EventLoopGroup eventLoopGroup) {
+ Objects.requireNonNull(eventLoopGroup, "eventLoopGroup == null");
NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(address)
.keepAliveTime(10, TimeUnit.MINUTES)
.keepAliveWithoutCalls(false)
@@ -55,10 +57,8 @@ static ManagedChannel buildManagedChannel(String address, SslContext sslContext,
} else {
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
- if (eventLoopGroup != null) {
- channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup))
- .eventLoopGroup(eventLoopGroup);
- }
+ channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup))
+ .eventLoopGroup(eventLoopGroup);
ManagedChannel ch = channelBuilder.build();
ch.getState(true);
return ch;
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java b/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java
index 17ed2ba0c5..76c8475450 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java
@@ -39,13 +39,13 @@ public void testClientWorkerEventLoopGroupSharedByPeerProxies() throws Exception
final RaftPeer p0 = newPeer("s0", 15000);
final RaftPeer p1 = newPeer("s1", 15001);
EventLoopGroup clientWorkers = null;
- try (GrpcClientRpc rpc = new GrpcClientRpc(ClientId.randomId(), properties, null, null)) {
+ try (GrpcClientRpc rpc = GrpcClientRpc.create(ClientId.randomId(), properties, null, null)) {
rpc.addRaftPeers(Arrays.asList(p0, p1));
final GrpcClientProtocolClient c0 = rpc.getProxies().getProxy(p0.getId());
final GrpcClientProtocolClient c1 = rpc.getProxies().getProxy(p1.getId());
- clientWorkers = c0.getWorkerEventLoopGroup();
- Assertions.assertSame(clientWorkers, c1.getWorkerEventLoopGroup());
+ clientWorkers = c0.getClientWorkersForTesting();
+ Assertions.assertSame(clientWorkers, c1.getClientWorkersForTesting());
Assertions.assertEquals(1, countEventExecutors(clientWorkers));
Assertions.assertFalse(clientWorkers.isShuttingDown());
}
From 84a17687c8af4b2afdcf4465d2b4fa686ded02f9 Mon Sep 17 00:00:00 2001
From: Yongzao <532741407@qq.com>
Date: Thu, 28 May 2026 14:49:20 +0800
Subject: [PATCH 5/8] RATIS-2529. Fix gRPC client RPC checkstyle
---
.../main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index b5e8a4835e..099f913873 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -56,7 +56,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-public class GrpcClientRpc extends RaftClientRpcWithProxy {
+public final class GrpcClientRpc extends RaftClientRpcWithProxy {
public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class);
public static GrpcClientRpc create(ClientId clientId, RaftProperties properties,
From a5b60e1f83150a6ab47f92a916822533ccf1e829 Mon Sep 17 00:00:00 2001
From: Yongzao <532741407@qq.com>
Date: Fri, 29 May 2026 12:38:38 +0800
Subject: [PATCH 6/8] RATIS-2529. Address gRPC build cleanup review
---
.../org/apache/ratis/grpc/server/GrpcServicesImpl.java | 7 +------
1 file changed, 1 insertion(+), 6 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
index 04b72fd8b9..99ac87f06f 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
@@ -256,12 +256,7 @@ public GrpcServicesImpl build() {
return new GrpcServicesImpl(this);
} catch (Throwable t) {
NettyUtils.shutdownGracefully(clientWorkers, serverWorkers, serverBosses);
- if (t instanceof RuntimeException) {
- throw (RuntimeException) t;
- } else if (t instanceof Error) {
- throw (Error) t;
- }
- throw new RuntimeException(t);
+ throw t;
}
}
From a2a5e3bb6e7b0fa34819169d267aa1f234a19d90 Mon Sep 17 00:00:00 2001
From: Yongzao <532741407@qq.com>
Date: Fri, 29 May 2026 17:59:17 +0800
Subject: [PATCH 7/8] RATIS-2529. Preserve default gRPC event loop behavior
---
.../org/apache/ratis/grpc/GrpcConfigKeys.java | 9 +++--
.../grpc/client/GrpcClientProtocolClient.java | 11 ++++---
.../ratis/grpc/client/GrpcClientRpc.java | 10 +++---
.../grpc/server/GrpcServerProtocolClient.java | 9 ++---
.../ratis/grpc/server/GrpcServicesImpl.java | 33 +++++++++++++------
.../ratis/grpc/server/GrpcStubPool.java | 8 ++---
.../apache/ratis/grpc/TestGrpcEventLoops.java | 3 +-
7 files changed, 49 insertions(+), 34 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index de26693229..cd7ac10f32 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -21,7 +21,6 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.server.GrpcServices;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.thirdparty.io.netty.util.NettyRuntime;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
@@ -167,10 +166,10 @@ static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) {
}
String WORKER_GROUP_SIZE_KEY = PREFIX + ".worker-group.size";
- int WORKER_GROUP_SIZE_DEFAULT = Math.max(1, NettyRuntime.availableProcessors() * 2);
+ int WORKER_GROUP_SIZE_DEFAULT = 0;
static int workerGroupSize(RaftProperties properties) {
return getInt(properties::getInt, WORKER_GROUP_SIZE_KEY,
- WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(1), requireMax(65536));
+ WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(0), requireMax(65536));
}
static void setWorkerGroupSize(RaftProperties properties, int size) {
setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, size);
@@ -323,10 +322,10 @@ static void setBossGroupSize(RaftProperties properties, int size) {
}
String WORKER_GROUP_SIZE_KEY = PREFIX + ".worker-group.size";
- int WORKER_GROUP_SIZE_DEFAULT = Math.max(1, NettyRuntime.availableProcessors() * 2);
+ int WORKER_GROUP_SIZE_DEFAULT = 0;
static int workerGroupSize(RaftProperties properties) {
return getInt(properties::getInt, WORKER_GROUP_SIZE_KEY,
- WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(1), requireMax(65536));
+ WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(0), requireMax(65536));
}
static void setWorkerGroupSize(RaftProperties properties, int size) {
setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, size);
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 7c9545b559..e958c9e000 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -107,7 +107,7 @@ public class GrpcClientProtocolClient implements Closeable {
final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
metricClientInterceptor = new MetricClientInterceptor(getName());
- this.clientWorkers = Objects.requireNonNull(clientWorkers, "clientWorkers");
+ this.clientWorkers = clientWorkers;
final String clientAddress = Optional.ofNullable(target.getClientAddress())
.filter(x -> !x.isEmpty()).orElse(target.getAddress());
@@ -141,9 +141,11 @@ private ManagedChannel buildChannel(String address, SslContext sslContext,
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
- final EventLoopGroup eventLoopGroup = clientWorkers.get();
- channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup))
- .eventLoopGroup(eventLoopGroup);
+ if (clientWorkers != null) {
+ final EventLoopGroup eventLoopGroup = clientWorkers.get();
+ channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup))
+ .eventLoopGroup(eventLoopGroup);
+ }
return channelBuilder.flowControlWindow(flowControlWindow.getSizeInt())
.maxInboundMessageSize(maxMessageSize.getSizeInt())
@@ -167,6 +169,7 @@ public void close() {
}
EventLoopGroup getClientWorkersForTesting() {
+ Preconditions.assertTrue(clientWorkers != null);
Preconditions.assertTrue(clientWorkers.isInitialized());
return clientWorkers.get();
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 099f913873..b12b5afe45 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -61,10 +61,10 @@ public final class GrpcClientRpc extends RaftClientRpcWithProxy eventLoopGroup = MemoizedSupplier.valueOf(() -> NettyUtils.newEventLoopGroup(
- clientId + "-client-workers",
- GrpcConfigKeys.Client.workerGroupSize(properties),
- GrpcConfigKeys.useEpoll(properties)));
+ final int workerGroupSize = GrpcConfigKeys.Client.workerGroupSize(properties);
+ final MemoizedSupplier eventLoopGroup = workerGroupSize > 0 ? MemoizedSupplier.valueOf(
+ () -> NettyUtils.newEventLoopGroup(
+ clientId + "-client-workers", workerGroupSize, GrpcConfigKeys.useEpoll(properties))) : null;
return new GrpcClientRpc(clientId, properties, adminSslContext, clientSslContext, eventLoopGroup);
}
@@ -233,7 +233,7 @@ public void close() {
try {
super.close();
} finally {
- if (clientWorkers.isInitialized()) {
+ if (clientWorkers != null && clientWorkers.isInitialized()) {
NettyUtils.shutdownGracefully(clientWorkers.get());
}
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 601c0f58fb..6dc0d32527 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -39,7 +39,6 @@
import org.slf4j.LoggerFactory;
import java.io.Closeable;
-import java.util.Objects;
/**
* This is a RaftClient implementation that supports streaming data to the raft
@@ -68,7 +67,7 @@ class GrpcServerProtocolClient implements Closeable {
raftPeerId = target.getId();
LOG.info("Build channel for {}", target);
useSeparateHBChannel = separateHBChannel;
- this.eventLoopGroup = Objects.requireNonNull(eventLoopGroup, "eventLoopGroup == null");
+ this.eventLoopGroup = eventLoopGroup;
channel = buildChannel(target, flowControlWindow, sslContext);
blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
@@ -97,8 +96,10 @@ private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow, SslC
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
channelBuilder.disableRetry();
- channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup))
- .eventLoopGroup(eventLoopGroup);
+ if (eventLoopGroup != null) {
+ channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup))
+ .eventLoopGroup(eventLoopGroup);
+ }
return channelBuilder.flowControlWindow(flowControlWindow).build();
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
index 99ac87f06f..3b31753b86 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
@@ -205,10 +205,18 @@ private NettyServerBuilder newNettyServerBuilder(String hostname, int port, SslC
final NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address)
.withChildOption(ChannelOption.SO_REUSEADDR, true)
.maxInboundMessageSize(messageSizeMax.getSizeInt())
- .flowControlWindow(flowControlWindow.getSizeInt())
- .channelType(NettyUtils.getServerChannelClass(serverBosses))
- .bossEventLoopGroup(serverBosses)
- .workerEventLoopGroup(serverWorkers);
+ .flowControlWindow(flowControlWindow.getSizeInt());
+
+ final EventLoopGroup channelGroup = serverBosses != null ? serverBosses : serverWorkers;
+ if (channelGroup != null) {
+ nettyServerBuilder.channelType(NettyUtils.getServerChannelClass(channelGroup));
+ }
+ if (serverBosses != null) {
+ nettyServerBuilder.bossEventLoopGroup(serverBosses);
+ }
+ if (serverWorkers != null) {
+ nettyServerBuilder.workerEventLoopGroup(serverWorkers);
+ }
if (sslContext != null) {
LOG.info("Setting TLS for {}", address);
@@ -247,12 +255,17 @@ public GrpcServicesImpl build() {
final String id = server.getId() + "";
final boolean useEpoll = GrpcConfigKeys.useEpoll(props);
try {
- serverBosses = NettyUtils.newEventLoopGroup(id + "-boss",
- GrpcConfigKeys.Server.bossGroupSize(props), useEpoll);
- serverWorkers = NettyUtils.newEventLoopGroup(id + "-server-workers",
- GrpcConfigKeys.Server.workerGroupSize(props), useEpoll);
- clientWorkers = NettyUtils.newEventLoopGroup(id + "-client-workers",
- GrpcConfigKeys.Client.workerGroupSize(props), useEpoll);
+ final int bossGroupSize = GrpcConfigKeys.Server.bossGroupSize(props);
+ final int serverWorkerGroupSize = GrpcConfigKeys.Server.workerGroupSize(props);
+ if (bossGroupSize > 0 || serverWorkerGroupSize > 0) {
+ serverBosses = NettyUtils.newEventLoopGroup(id + "-boss", bossGroupSize, useEpoll);
+ serverWorkers = NettyUtils.newEventLoopGroup(id + "-server-workers", serverWorkerGroupSize, useEpoll);
+ }
+
+ final int clientWorkerGroupSize = GrpcConfigKeys.Client.workerGroupSize(props);
+ if (clientWorkerGroupSize > 0) {
+ clientWorkers = NettyUtils.newEventLoopGroup(id + "-client-workers", clientWorkerGroupSize, useEpoll);
+ }
return new GrpcServicesImpl(this);
} catch (Throwable t) {
NettyUtils.shutdownGracefully(clientWorkers, serverWorkers, serverBosses);
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
index 5a196579f6..84a4651f9d 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
@@ -35,7 +35,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -45,7 +44,6 @@ final class GrpcStubPool> {
public static final Logger LOG = LoggerFactory.getLogger(GrpcStubPool.class);
static ManagedChannel buildManagedChannel(String address, SslContext sslContext, EventLoopGroup eventLoopGroup) {
- Objects.requireNonNull(eventLoopGroup, "eventLoopGroup == null");
NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(address)
.keepAliveTime(10, TimeUnit.MINUTES)
.keepAliveWithoutCalls(false)
@@ -57,8 +55,10 @@ static ManagedChannel buildManagedChannel(String address, SslContext sslContext,
} else {
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
- channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup))
- .eventLoopGroup(eventLoopGroup);
+ if (eventLoopGroup != null) {
+ channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup))
+ .eventLoopGroup(eventLoopGroup);
+ }
ManagedChannel ch = channelBuilder.build();
ch.getState(true);
return ch;
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java
index c09a533397..fdee06ebec 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java
@@ -68,10 +68,9 @@ public void testChannelTypeMatchesEpollAvailability() {
public void testConfigKeyDefaults() {
final RaftProperties properties = new RaftProperties();
final int expectedWorker = GrpcConfigKeys.Server.WORKER_GROUP_SIZE_DEFAULT;
- Assertions.assertTrue(expectedWorker > 0,
- "default worker threads should be positive, but got " + expectedWorker);
Assertions.assertEquals(expectedWorker, GrpcConfigKeys.Server.workerGroupSize(properties));
Assertions.assertEquals(expectedWorker, GrpcConfigKeys.Client.workerGroupSize(properties));
+ Assertions.assertEquals(0, expectedWorker);
Assertions.assertEquals(0, GrpcConfigKeys.Server.bossGroupSize(properties));
Assertions.assertTrue(GrpcConfigKeys.useEpoll(properties));
}
From 823b4872e391900bdb723599ca5d3583fd6f42c7 Mon Sep 17 00:00:00 2001
From: Yongzao <532741407@qq.com>
Date: Sun, 31 May 2026 13:42:50 +0800
Subject: [PATCH 8/8] RATIS-2529. Stabilize gRPC server metrics test
---
.../java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index b5247cf63d..d48ad39f30 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -356,16 +356,17 @@ void testRequestMetrics(MiniRaftClusterWithGrpc cluster) throws Exception {
// Block stateMachine flush data, so that 2nd request will not be
// completed, and so it will not be removed from pending request map.
List clients = new ArrayList<>();
+ final List> replies = new ArrayList<>();
try {
RaftClient client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
clients.add(client);
- client.async().send(new SimpleMessage("2nd Message"));
+ replies.add(client.async().send(new SimpleMessage("2nd Message")));
for (int i = 0; i < 10; i++) {
client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
clients.add(client);
- client.async().send(new SimpleMessage("message " + i));
+ replies.add(client.async().send(new SimpleMessage("message " + i)));
}
// Because we have passed 11 requests, and the element queue size is 10.
@@ -373,6 +374,7 @@ void testRequestMetrics(MiniRaftClusterWithGrpc cluster) throws Exception {
.getNumRequestQueueLimitHits().getCount() == 1, 300, 5000);
stateMachine.unblockFlushStateMachineData();
+ RaftTestUtil.waitFor(() -> replies.stream().allMatch(CompletableFuture::isDone), 300, 5000);
// Send a message with 1025kb , our byte size limit is 1024kb (1mb) , so it should fail
// and byte size counter limit will be hit.