From d328ded37077cd50fa93b3635e24903055e221df Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Wed, 20 May 2026 10:58:39 +0800 Subject: [PATCH 1/8] RATIS-2529. Bound gRPC worker EventLoopGroup thread count. Add raft.grpc.server.worker.event-loop.threads and raft.grpc.client.worker.event-loop.threads (default 0 = gRPC default, i.e. availableProcessors * 2). When > 0, build a dedicated Epoll/Nio EventLoopGroup of that size and wire it into both the server NettyServerBuilders and the client NettyChannelBuilders so a follower catch-up burst can't permanently inflate the worker thread count. --- .../org/apache/ratis/grpc/GrpcConfigKeys.java | 33 +++++ .../org/apache/ratis/grpc/GrpcEventLoops.java | 122 ++++++++++++++++++ .../grpc/client/GrpcClientProtocolClient.java | 11 ++ .../grpc/server/GrpcServerProtocolClient.java | 11 +- .../ratis/grpc/server/GrpcServicesImpl.java | 66 +++++++--- .../ratis/grpc/server/GrpcStubPool.java | 15 ++- .../apache/ratis/grpc/TestGrpcEventLoops.java | 88 +++++++++++++ .../grpc/TestGrpcWorkerEventLoopThreads.java | 57 ++++++++ 8 files changed, 381 insertions(+), 22 deletions(-) create mode 100644 ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java index f31794ac36..c18cdb60be 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java @@ -155,6 +155,22 @@ static GrpcTlsConfig tlsConf(Parameters parameters) { static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) { parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS); } + + /** + * The number of worker threads for the gRPC client-side {@link + * org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup}. + * 0 (default) means use the gRPC default (i.e. {@code availableProcessors * 2}); + * a positive value caps the worker event-loop thread count. + */ + String WORKER_EVENT_LOOP_THREADS_KEY = PREFIX + ".worker.event-loop.threads"; + int WORKER_EVENT_LOOP_THREADS_DEFAULT = 0; + static int workerEventLoopThreads(RaftProperties properties) { + return getInt(properties::getInt, WORKER_EVENT_LOOP_THREADS_KEY, + WORKER_EVENT_LOOP_THREADS_DEFAULT, getDefaultLog(), requireMin(0)); + } + static void setWorkerEventLoopThreads(RaftProperties properties, int threads) { + setInt(properties::setInt, WORKER_EVENT_LOOP_THREADS_KEY, threads); + } } interface Server { @@ -291,6 +307,23 @@ static int stubPoolSize(RaftProperties properties) { static void setStubPoolSize(RaftProperties properties, int size) { setInt(properties::setInt, STUB_POOL_SIZE_KEY, size); } + + /** + * The number of worker threads for the gRPC server-side {@link + * org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup}. + * The group is also reused by the server-to-server clients managed by this server. + * 0 (default) means use the gRPC default (i.e. {@code availableProcessors * 2}); + * a positive value caps the worker event-loop thread count. + */ + String WORKER_EVENT_LOOP_THREADS_KEY = PREFIX + ".worker.event-loop.threads"; + int WORKER_EVENT_LOOP_THREADS_DEFAULT = 0; + static int workerEventLoopThreads(RaftProperties properties) { + return getInt(properties::getInt, WORKER_EVENT_LOOP_THREADS_KEY, + WORKER_EVENT_LOOP_THREADS_DEFAULT, getDefaultLog(), requireMin(0)); + } + static void setWorkerEventLoopThreads(RaftProperties properties, int threads) { + setInt(properties::setInt, WORKER_EVENT_LOOP_THREADS_KEY, threads); + } } String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max"; diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java new file mode 100644 index 0000000000..9deb4b79f7 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; +import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder; +import org.apache.ratis.thirdparty.io.netty.channel.Channel; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.ratis.thirdparty.io.netty.channel.ServerChannel; +import org.apache.ratis.thirdparty.io.netty.channel.epoll.Epoll; +import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup; +import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollServerSocketChannel; +import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollSocketChannel; +import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel; +import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.ratis.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * Helper for creating bounded Netty {@link EventLoopGroup} instances for gRPC, + * along with matching {@link Channel} / {@link ServerChannel} types. + * + *

By default, gRPC uses a shared {@link EventLoopGroup} sized to + * {@code Runtime.getRuntime().availableProcessors() * 2}, and Netty threads + * in that group never exit once started. A traffic burst (e.g. a follower + * catch-up after restart) can permanently inflate the active thread count. + * This helper lets callers construct a smaller, dedicated group and wire it + * into {@link NettyServerBuilder} / {@link NettyChannelBuilder} consistently + * with the matching channel type. + */ +public final class GrpcEventLoops { + private static final Logger LOG = LoggerFactory.getLogger(GrpcEventLoops.class); + + private GrpcEventLoops() {} + + public static boolean isEpollAvailable() { + return Epoll.isAvailable(); + } + + /** Create a new {@link EventLoopGroup} with the given number of threads. */ + public static EventLoopGroup newEventLoopGroup(int threads, String name) { + if (threads <= 0) { + throw new IllegalArgumentException("threads must be > 0, but got " + threads); + } + final ThreadFactory threadFactory = new DefaultThreadFactory(name); + if (isEpollAvailable()) { + LOG.info("Creating EpollEventLoopGroup({}) for {}", threads, name); + return new EpollEventLoopGroup(threads, threadFactory); + } + LOG.info("Creating NioEventLoopGroup({}) for {} (Epoll not available)", threads, name); + return new NioEventLoopGroup(threads, threadFactory); + } + + public static Class getServerChannelType() { + return isEpollAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class; + } + + public static Class getClientChannelType() { + return isEpollAvailable() ? EpollSocketChannel.class : NioSocketChannel.class; + } + + /** + * Configure the {@link NettyServerBuilder} to use the given boss and worker groups. + * + *

gRPC requires that the boss group, worker group, and channel type are + * all provided together or all omitted. The caller owns the lifecycle of + * both groups and must shut them down. + */ + public static void configure(NettyServerBuilder serverBuilder, + EventLoopGroup bossGroup, EventLoopGroup workerGroup) { + if (workerGroup == null) { + return; + } + serverBuilder.channelType(getServerChannelType()) + .bossEventLoopGroup(bossGroup) + .workerEventLoopGroup(workerGroup); + } + + /** Configure the {@link NettyChannelBuilder} to use the given event-loop group. */ + public static void configure(NettyChannelBuilder channelBuilder, EventLoopGroup group) { + if (group == null) { + return; + } + channelBuilder.channelType(getClientChannelType()) + .eventLoopGroup(group); + } + + /** Gracefully shut down the group, swallowing interruptions. */ + public static void shutdownGracefully(EventLoopGroup group) { + if (group == null) { + return; + } + try { + group.shutdownGracefully(0, 3, TimeUnit.SECONDS).await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while shutting down EventLoopGroup", e); + } catch (Exception e) { + LOG.warn("Failed to shut down EventLoopGroup", e); + } + } +} diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java index 0eaec6b962..2837cb9cd7 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java @@ -21,6 +21,7 @@ import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.grpc.GrpcEventLoops; import org.apache.ratis.grpc.GrpcUtil; import org.apache.ratis.grpc.metrics.intercept.client.MetricClientInterceptor; import org.apache.ratis.proto.RaftProtos.GroupInfoReplyProto; @@ -51,6 +52,7 @@ import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType; import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.JavaUtils; @@ -94,6 +96,7 @@ public class GrpcClientProtocolClient implements Closeable { private final AtomicReference unorderedStreamObservers = new AtomicReference<>(); private final MetricClientInterceptor metricClientInterceptor; + private final EventLoopGroup workerEventLoopGroup; GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties, SslContext adminSslContext, SslContext clientSslContext) { @@ -103,6 +106,11 @@ public class GrpcClientProtocolClient implements Closeable { this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug); metricClientInterceptor = new MetricClientInterceptor(getName()); + final int workerThreads = GrpcConfigKeys.Client.workerEventLoopThreads(properties); + this.workerEventLoopGroup = workerThreads > 0 + ? GrpcEventLoops.newEventLoopGroup(workerThreads, getName() + "-grpc-worker-ELG") + : null; + final String clientAddress = Optional.ofNullable(target.getClientAddress()) .filter(x -> !x.isEmpty()).orElse(target.getAddress()); final String adminAddress = Optional.ofNullable(target.getAdminAddress()) @@ -135,6 +143,8 @@ private ManagedChannel buildChannel(String address, SslContext sslContext, channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } + GrpcEventLoops.configure(channelBuilder, workerEventLoopGroup); + return channelBuilder.flowControlWindow(flowControlWindow.getSizeInt()) .maxInboundMessageSize(maxMessageSize.getSizeInt()) .intercept(metricClientInterceptor) @@ -154,6 +164,7 @@ public void close() { GrpcUtil.shutdownManagedChannel(adminChannel); } metricClientInterceptor.close(); + GrpcEventLoops.shutdownGracefully(workerEventLoopGroup); } RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException { diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java index d2748c7be2..e9856e2aa5 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.grpc.server; +import org.apache.ratis.grpc.GrpcEventLoops; import org.apache.ratis.grpc.GrpcUtil; import org.apache.ratis.grpc.util.StreamObserverWithTimeout; import org.apache.ratis.protocol.RaftPeerId; @@ -31,6 +32,7 @@ import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceBlockingStub; import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -57,12 +59,15 @@ class GrpcServerProtocolClient implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolClient.class); //visible for using in log / error messages AND to use in instrumented tests private final RaftPeerId raftPeerId; + private final EventLoopGroup eventLoopGroup; GrpcServerProtocolClient(RaftPeer target, int connections, int flowControlWindow, - TimeDuration requestTimeout, SslContext sslContext, boolean separateHBChannel) { + TimeDuration requestTimeout, SslContext sslContext, boolean separateHBChannel, + EventLoopGroup eventLoopGroup) { raftPeerId = target.getId(); LOG.info("Build channel for {}", target); useSeparateHBChannel = separateHBChannel; + this.eventLoopGroup = eventLoopGroup; channel = buildChannel(target, flowControlWindow, sslContext); blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel); asyncStub = RaftServerProtocolServiceGrpc.newStub(channel); @@ -75,7 +80,8 @@ class GrpcServerProtocolClient implements Closeable { } GrpcStubPool newGrpcStubPool(String address, SslContext sslContext, int connections) { - return new GrpcStubPool<>(connections, address, sslContext, RaftServerProtocolServiceGrpc::newStub, 16); + return new GrpcStubPool<>(connections, address, sslContext, RaftServerProtocolServiceGrpc::newStub, 16, + eventLoopGroup); } private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow, SslContext sslContext) { @@ -90,6 +96,7 @@ private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow, SslC channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } channelBuilder.disableRetry(); + GrpcEventLoops.configure(channelBuilder, eventLoopGroup); return channelBuilder.flowControlWindow(flowControlWindow).build(); } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java index d554ca583a..436e154106 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java @@ -19,6 +19,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.grpc.GrpcEventLoops; import org.apache.ratis.grpc.metrics.MessageMetrics; import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor; import org.apache.ratis.protocol.AdminAsynchronousProtocol; @@ -36,6 +37,7 @@ import org.apache.ratis.thirdparty.io.grpc.Server; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.proto.RaftProtos.*; @@ -114,6 +116,7 @@ public static final class Builder { private SizeInBytes flowControlWindow; private TimeDuration requestTimeoutDuration; private boolean separateHeartbeatChannel; + private int workerEventLoopThreads; private Builder() {} @@ -132,6 +135,7 @@ public Builder setServer(RaftServer raftServer) { this.requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(properties); this.separateHeartbeatChannel = GrpcConfigKeys.Server.heartbeatChannel(properties); this.serverStubPoolSize = GrpcConfigKeys.Server.stubPoolSize(properties); + this.workerEventLoopThreads = GrpcConfigKeys.Server.workerEventLoopThreads(properties); final SizeInBytes appenderBufferSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties); final SizeInBytes gap = SizeInBytes.ONE_MB; @@ -151,9 +155,9 @@ public Builder setCustomizer(Customizer customizer) { return this; } - private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer target) { + private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer target, EventLoopGroup eventLoopGroup) { return new GrpcServerProtocolClient(target, serverStubPoolSize, flowControlWindow.getSizeInt(), - requestTimeoutDuration, serverSslContextForClient, separateHeartbeatChannel); + requestTimeoutDuration, serverSslContextForClient, separateHeartbeatChannel, eventLoopGroup); } private ExecutorService newExecutor() { @@ -182,19 +186,20 @@ Server buildServer(NettyServerBuilder builder, EnumSet types) return customizer.customize(builder, types).build(); } - private NettyServerBuilder newNettyServerBuilderForServer() { - return newNettyServerBuilder(serverHost, serverPort, serverSslContextForServer); + private NettyServerBuilder newNettyServerBuilderForServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup) { + return newNettyServerBuilder(serverHost, serverPort, serverSslContextForServer, bossGroup, workerGroup); } - private NettyServerBuilder newNettyServerBuilderForAdmin() { - return newNettyServerBuilder(adminHost, adminPort, adminSslContext); + private NettyServerBuilder newNettyServerBuilderForAdmin(EventLoopGroup bossGroup, EventLoopGroup workerGroup) { + return newNettyServerBuilder(adminHost, adminPort, adminSslContext, bossGroup, workerGroup); } - private NettyServerBuilder newNettyServerBuilderForClient() { - return newNettyServerBuilder(clientHost, clientPort, clientSslContext); + private NettyServerBuilder newNettyServerBuilderForClient(EventLoopGroup bossGroup, EventLoopGroup workerGroup) { + return newNettyServerBuilder(clientHost, clientPort, clientSslContext, bossGroup, workerGroup); } - private NettyServerBuilder newNettyServerBuilder(String hostname, int port, SslContext sslContext) { + private NettyServerBuilder newNettyServerBuilder(String hostname, int port, SslContext sslContext, + EventLoopGroup bossGroup, EventLoopGroup workerGroup) { final InetSocketAddress address = hostname == null || hostname.isEmpty() ? new InetSocketAddress(port) : new InetSocketAddress(hostname, port); final NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address) @@ -202,6 +207,8 @@ private NettyServerBuilder newNettyServerBuilder(String hostname, int port, SslC .maxInboundMessageSize(messageSizeMax.getSizeInt()) .flowControlWindow(flowControlWindow.getSizeInt()); + GrpcEventLoops.configure(nettyServerBuilder, bossGroup, workerGroup); + if (sslContext != null) { LOG.info("Setting TLS for {}", address); nettyServerBuilder.sslContext(sslContext); @@ -217,9 +224,10 @@ private boolean separateClientServer() { return clientPort > 0 && clientPort != serverPort; } - Server newServer(GrpcClientProtocolService client, ServerInterceptor interceptor) { + Server newServer(GrpcClientProtocolService client, ServerInterceptor interceptor, + EventLoopGroup bossGroup, EventLoopGroup workerGroup) { final EnumSet types = EnumSet.of(GrpcServices.Type.SERVER); - final NettyServerBuilder serverBuilder = newNettyServerBuilderForServer(); + final NettyServerBuilder serverBuilder = newNettyServerBuilderForServer(bossGroup, workerGroup); final GrpcServerProtocolService service = newGrpcServerProtocolService(); serverBuilder.addService(ServerInterceptors.intercept(service, interceptor)); @@ -234,6 +242,24 @@ Server newServer(GrpcClientProtocolService client, ServerInterceptor interceptor return buildServer(serverBuilder, types); } + private EventLoopGroup bossEventLoopGroup; + private EventLoopGroup workerEventLoopGroup; + + EventLoopGroup getBossEventLoopGroup() { + if (workerEventLoopThreads > 0 && bossEventLoopGroup == null) { + bossEventLoopGroup = GrpcEventLoops.newEventLoopGroup(1, server.getId() + "-grpc-boss-ELG"); + } + return bossEventLoopGroup; + } + + EventLoopGroup getWorkerEventLoopGroup() { + if (workerEventLoopThreads > 0 && workerEventLoopGroup == null) { + final String name = server.getId() + "-grpc-worker-ELG"; + workerEventLoopGroup = GrpcEventLoops.newEventLoopGroup(workerEventLoopThreads, name); + } + return workerEventLoopGroup; + } + public GrpcServicesImpl build() { return new GrpcServicesImpl(this); } @@ -274,20 +300,27 @@ public static Builder newBuilder() { private final GrpcClientProtocolService clientProtocolService; private final MetricServerInterceptor serverInterceptor; + private final EventLoopGroup bossEventLoopGroup; + private final EventLoopGroup workerEventLoopGroup; private GrpcServicesImpl(Builder b) { - super(b.server::getId, id -> new PeerProxyMap<>(id.toString(), b::newGrpcServerProtocolClient)); + super(b.server::getId, id -> new PeerProxyMap<>(id.toString(), + peer -> b.newGrpcServerProtocolClient(peer, b.getWorkerEventLoopGroup()))); + + this.bossEventLoopGroup = b.getBossEventLoopGroup(); + this.workerEventLoopGroup = b.getWorkerEventLoopGroup(); this.executor = b.newExecutor(); this.clientProtocolService = b.newGrpcClientProtocolService(executor); this.serverInterceptor = b.newMetricServerInterceptor(); - final Server server = b.newServer(clientProtocolService, serverInterceptor); + final Server server = b.newServer(clientProtocolService, serverInterceptor, + bossEventLoopGroup, workerEventLoopGroup); servers.put(GrpcServerProtocolService.class.getSimpleName(), server); addressSupplier = newAddressSupplier(b.serverPort, server); if (b.separateAdminServer()) { - final NettyServerBuilder builder = b.newNettyServerBuilderForAdmin(); + final NettyServerBuilder builder = b.newNettyServerBuilderForAdmin(bossEventLoopGroup, workerEventLoopGroup); addAdminService(builder, b.server, serverInterceptor); final Server adminServer = b.buildServer(builder, EnumSet.of(GrpcServices.Type.ADMIN)); servers.put(GrpcAdminProtocolService.class.getName(), adminServer); @@ -297,7 +330,7 @@ private GrpcServicesImpl(Builder b) { } if (b.separateClientServer()) { - final NettyServerBuilder builder = b.newNettyServerBuilderForClient(); + final NettyServerBuilder builder = b.newNettyServerBuilderForClient(bossEventLoopGroup, workerEventLoopGroup); addClientService(builder, clientProtocolService, serverInterceptor); final Server clientServer = b.buildServer(builder, EnumSet.of(GrpcServices.Type.CLIENT)); servers.put(GrpcClientProtocolService.class.getName(), clientServer); @@ -375,6 +408,9 @@ public void closeImpl() { } catch (IOException e) { LOG.warn("{}: Failed to close proxies", getId(), e); } + + GrpcEventLoops.shutdownGracefully(workerEventLoopGroup); + GrpcEventLoops.shutdownGracefully(bossEventLoopGroup); } @Override diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java index 9667661d07..edd0413430 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java @@ -17,12 +17,14 @@ */ package org.apache.ratis.grpc.server; +import org.apache.ratis.grpc.GrpcEventLoops; import org.apache.ratis.grpc.GrpcUtil; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType; import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; import org.apache.ratis.thirdparty.io.grpc.stub.AbstractStub; import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.ratis.thirdparty.io.netty.channel.WriteBufferWaterMark; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.util.MemoizedSupplier; @@ -41,7 +43,7 @@ final class GrpcStubPool> { public static final Logger LOG = LoggerFactory.getLogger(GrpcStubPool.class); - static ManagedChannel buildManagedChannel(String address, SslContext sslContext) { + static ManagedChannel buildManagedChannel(String address, SslContext sslContext, EventLoopGroup eventLoopGroup) { NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(address) .keepAliveTime(10, TimeUnit.MINUTES) .keepAliveWithoutCalls(false) @@ -53,6 +55,7 @@ static ManagedChannel buildManagedChannel(String address, SslContext sslContext) } else { channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } + GrpcEventLoops.configure(channelBuilder, eventLoopGroup); ManagedChannel ch = channelBuilder.build(); ch.getState(true); return ch; @@ -63,8 +66,9 @@ static final class Stub> { private final S stub; private final Semaphore permits; - Stub(String address, SslContext sslContext, Function stubFactory, int maxInflight) { - this.ch = buildManagedChannel(address, sslContext); + Stub(String address, SslContext sslContext, Function stubFactory, int maxInflight, + EventLoopGroup eventLoopGroup) { + this.ch = buildManagedChannel(address, sslContext, eventLoopGroup); this.stub = stubFactory.apply(ch); this.permits = new Semaphore(maxInflight); } @@ -85,11 +89,12 @@ void shutdown() { private final List>> pool; GrpcStubPool(int connections, String address, SslContext sslContext, Function stubFactory, - int maxInflightPerConn) { + int maxInflightPerConn, EventLoopGroup eventLoopGroup) { Preconditions.assertTrue(connections > 1, "connections must be > 1"); final List>> tmpPool = new ArrayList<>(connections); for (int i = 0; i < connections; i++) { - tmpPool.add(MemoizedSupplier.valueOf(() -> new Stub<>(address, sslContext, stubFactory, maxInflightPerConn))); + tmpPool.add(MemoizedSupplier.valueOf( + () -> new Stub<>(address, sslContext, stubFactory, maxInflightPerConn, eventLoopGroup))); } this.pool = Collections.unmodifiableList(tmpPool); } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java new file mode 100644 index 0000000000..6ea1caceba --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.ratis.thirdparty.io.netty.util.concurrent.EventExecutor; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestGrpcEventLoops extends BaseTest { + + @Test + public void testNewEventLoopGroupWithThreadCount() { + final int threads = 3; + final EventLoopGroup group = GrpcEventLoops.newEventLoopGroup(threads, "test-elg"); + try { + Assertions.assertNotNull(group); + int count = 0; + for (EventExecutor ignored : group) { + count++; + } + Assertions.assertEquals(threads, count); + } finally { + GrpcEventLoops.shutdownGracefully(group); + } + } + + @Test + public void testNewEventLoopGroupRejectsNonPositiveThreads() { + Assertions.assertThrows(IllegalArgumentException.class, + () -> GrpcEventLoops.newEventLoopGroup(0, "test-elg")); + Assertions.assertThrows(IllegalArgumentException.class, + () -> GrpcEventLoops.newEventLoopGroup(-1, "test-elg")); + } + + @Test + public void testChannelTypeMatchesEpollAvailability() { + if (GrpcEventLoops.isEpollAvailable()) { + Assertions.assertEquals("EpollServerSocketChannel", + GrpcEventLoops.getServerChannelType().getSimpleName()); + Assertions.assertEquals("EpollSocketChannel", + GrpcEventLoops.getClientChannelType().getSimpleName()); + } else { + Assertions.assertEquals("NioServerSocketChannel", + GrpcEventLoops.getServerChannelType().getSimpleName()); + Assertions.assertEquals("NioSocketChannel", + GrpcEventLoops.getClientChannelType().getSimpleName()); + } + } + + @Test + public void testConfigKeyDefaults() { + final RaftProperties properties = new RaftProperties(); + Assertions.assertEquals(0, GrpcConfigKeys.Server.workerEventLoopThreads(properties)); + Assertions.assertEquals(0, GrpcConfigKeys.Client.workerEventLoopThreads(properties)); + } + + @Test + public void testConfigKeyRoundtrip() { + final RaftProperties properties = new RaftProperties(); + GrpcConfigKeys.Server.setWorkerEventLoopThreads(properties, 4); + GrpcConfigKeys.Client.setWorkerEventLoopThreads(properties, 2); + Assertions.assertEquals(4, GrpcConfigKeys.Server.workerEventLoopThreads(properties)); + Assertions.assertEquals(2, GrpcConfigKeys.Client.workerEventLoopThreads(properties)); + } + + @Test + public void testShutdownNullIsNoop() { + GrpcEventLoops.shutdownGracefully(null); + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java new file mode 100644 index 0000000000..9d0df3fd02 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftClientReply; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.apache.ratis.RaftTestUtil.waitForLeader; + +/** + * Verify a cluster comes up and processes requests when the worker + * event-loop thread count is capped via + * {@link GrpcConfigKeys.Server#WORKER_EVENT_LOOP_THREADS_KEY} and + * {@link GrpcConfigKeys.Client#WORKER_EVENT_LOOP_THREADS_KEY}. + * + *

Regression: RATIS-2529 — gRPC worker threads permanently inflate to + * {@code availableProcessors * 2} after follower restart catch-up. + */ +public class TestGrpcWorkerEventLoopThreads extends BaseTest { + + @Test + public void testClusterWithCappedWorkerEventLoopThreads() throws Exception { + final String[] ids = {"s0", "s1", "s2"}; + final RaftProperties properties = new RaftProperties(); + GrpcConfigKeys.Server.setWorkerEventLoopThreads(properties, 2); + GrpcConfigKeys.Client.setWorkerEventLoopThreads(properties, 1); + + try (MiniRaftClusterWithGrpc cluster = new MiniRaftClusterWithGrpc(ids, properties, null)) { + cluster.start(); + waitForLeader(cluster); + try (RaftClient client = cluster.createClient()) { + final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("hello")); + Assertions.assertTrue(reply.isSuccess()); + } + } + } +} From 20ddb29af520e396790edf1d5ac6c1d2079e4c87 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Mon, 25 May 2026 22:54:51 +0800 Subject: [PATCH 2/8] RATIS-2529. Address gRPC event loop review comments --- .../org/apache/ratis/util}/NettyUtils.java | 42 +++++- .../org/apache/ratis/grpc/GrpcConfigKeys.java | 63 +++++---- .../org/apache/ratis/grpc/GrpcEventLoops.java | 122 ------------------ .../grpc/client/GrpcClientProtocolClient.java | 15 ++- .../grpc/server/GrpcServerProtocolClient.java | 7 +- .../ratis/grpc/server/GrpcServicesImpl.java | 87 ++++++------- .../ratis/grpc/server/GrpcStubPool.java | 7 +- .../org/apache/ratis/netty/NettyClient.java | 1 + .../org/apache/ratis/netty/NettyRpcProxy.java | 1 + .../netty/client/NettyClientStreamRpc.java | 2 +- .../ratis/netty/server/NettyRpcService.java | 2 +- .../netty/server/NettyServerStreamRpc.java | 2 +- ...WithRpcTypeGrpcAndDataStreamTypeNetty.java | 2 +- .../apache/ratis/grpc/TestGrpcEventLoops.java | 62 +++++---- .../grpc/TestGrpcWorkerEventLoopThreads.java | 8 +- .../apache/ratis/netty/TestNettyRpcProxy.java | 1 + .../ratis/netty/TestTlsConfWithNetty.java | 1 + .../shell/cli/sh/TestSecureRatisShell.java | 2 +- 18 files changed, 178 insertions(+), 249 deletions(-) rename {ratis-netty/src/main/java/org/apache/ratis/netty => ratis-common/src/main/java/org/apache/ratis/util}/NettyUtils.java (85%) delete mode 100644 ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/NettyUtils.java similarity index 85% rename from ratis-netty/src/main/java/org/apache/ratis/netty/NettyUtils.java rename to ratis-common/src/main/java/org/apache/ratis/util/NettyUtils.java index 37666bf189..163c3d869f 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/NettyUtils.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.ratis.netty; +package org.apache.ratis.util; import org.apache.ratis.security.TlsConf; import org.apache.ratis.security.TlsConf.CertificatesConf; @@ -36,13 +36,14 @@ import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder; -import org.apache.ratis.util.ConcurrentUtils; -import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.thirdparty.io.netty.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.KeyManager; import javax.net.ssl.TrustManager; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -81,6 +82,39 @@ static EventLoopGroup newEventLoopGroup(String name, int size, boolean useEpoll) return new NioEventLoopGroup(size, ConcurrentUtils.newThreadFactory(name + "-")); } + static void shutdownGracefully(EventLoopGroup... groups) { + shutdownGracefully(CLOSE_TIMEOUT, groups); + } + + static void shutdownGracefully(TimeDuration awaitTime, EventLoopGroup... groups) { + if (groups == null || groups.length == 0) { + return; + } + + final List nonNullGroups = new ArrayList<>(groups.length); + final List> futures = new ArrayList<>(groups.length); + for (EventLoopGroup group : groups) { + if (group != null) { + nonNullGroups.add(group); + futures.add(group.shutdownGracefully()); + } + } + + for (int i = 0; i < futures.size(); i++) { + final EventLoopGroup group = nonNullGroups.get(i); + try { + if (!futures.get(i).await(awaitTime.getDuration(), awaitTime.getUnit())) { + LOG.warn("Failed to shut down EventLoopGroup {} in {}", group, awaitTime); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while shutting down EventLoopGroup {}", group, e); + } catch (Exception e) { + LOG.warn("Failed to shut down EventLoopGroup {} in {}", group, awaitTime, e); + } + } + } + static void setTrustManager(SslContextBuilder b, TrustManagerConf trustManagerConfig) { if (trustManagerConfig == null) { return; @@ -196,4 +230,4 @@ static void closeChannel(Channel channel, String name) { LOG.warn("closeChannel {} is not yet completed in {}", name, CLOSE_TIMEOUT); } } -} \ No newline at end of file +} diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java index c18cdb60be..de26693229 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java @@ -21,6 +21,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.server.GrpcServices; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.thirdparty.io.netty.util.NettyRuntime; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -51,6 +52,15 @@ static Consumer getDefaultLog() { String PREFIX = "raft.grpc"; + String USE_EPOLL_KEY = PREFIX + ".use-epoll"; + boolean USE_EPOLL_DEFAULT = true; + static boolean useEpoll(RaftProperties properties) { + return getBoolean(properties::getBoolean, USE_EPOLL_KEY, USE_EPOLL_DEFAULT, getDefaultLog()); + } + static void setUseEpoll(RaftProperties properties, boolean useEpoll) { + setBoolean(properties::setBoolean, USE_EPOLL_KEY, useEpoll); + } + interface TLS { String PREFIX = GrpcConfigKeys.PREFIX + ".tls"; @@ -156,20 +166,14 @@ static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) { parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS); } - /** - * The number of worker threads for the gRPC client-side {@link - * org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup}. - * 0 (default) means use the gRPC default (i.e. {@code availableProcessors * 2}); - * a positive value caps the worker event-loop thread count. - */ - String WORKER_EVENT_LOOP_THREADS_KEY = PREFIX + ".worker.event-loop.threads"; - int WORKER_EVENT_LOOP_THREADS_DEFAULT = 0; - static int workerEventLoopThreads(RaftProperties properties) { - return getInt(properties::getInt, WORKER_EVENT_LOOP_THREADS_KEY, - WORKER_EVENT_LOOP_THREADS_DEFAULT, getDefaultLog(), requireMin(0)); + String WORKER_GROUP_SIZE_KEY = PREFIX + ".worker-group.size"; + int WORKER_GROUP_SIZE_DEFAULT = Math.max(1, NettyRuntime.availableProcessors() * 2); + static int workerGroupSize(RaftProperties properties) { + return getInt(properties::getInt, WORKER_GROUP_SIZE_KEY, + WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(1), requireMax(65536)); } - static void setWorkerEventLoopThreads(RaftProperties properties, int threads) { - setInt(properties::setInt, WORKER_EVENT_LOOP_THREADS_KEY, threads); + static void setWorkerGroupSize(RaftProperties properties, int size) { + setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, size); } } @@ -308,21 +312,24 @@ static void setStubPoolSize(RaftProperties properties, int size) { setInt(properties::setInt, STUB_POOL_SIZE_KEY, size); } - /** - * The number of worker threads for the gRPC server-side {@link - * org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup}. - * The group is also reused by the server-to-server clients managed by this server. - * 0 (default) means use the gRPC default (i.e. {@code availableProcessors * 2}); - * a positive value caps the worker event-loop thread count. - */ - String WORKER_EVENT_LOOP_THREADS_KEY = PREFIX + ".worker.event-loop.threads"; - int WORKER_EVENT_LOOP_THREADS_DEFAULT = 0; - static int workerEventLoopThreads(RaftProperties properties) { - return getInt(properties::getInt, WORKER_EVENT_LOOP_THREADS_KEY, - WORKER_EVENT_LOOP_THREADS_DEFAULT, getDefaultLog(), requireMin(0)); - } - static void setWorkerEventLoopThreads(RaftProperties properties, int threads) { - setInt(properties::setInt, WORKER_EVENT_LOOP_THREADS_KEY, threads); + String BOSS_GROUP_SIZE_KEY = PREFIX + ".boss-group.size"; + int BOSS_GROUP_SIZE_DEFAULT = 0; + static int bossGroupSize(RaftProperties properties) { + return getInt(properties::getInt, BOSS_GROUP_SIZE_KEY, + BOSS_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(0), requireMax(65536)); + } + static void setBossGroupSize(RaftProperties properties, int size) { + setInt(properties::setInt, BOSS_GROUP_SIZE_KEY, size); + } + + String WORKER_GROUP_SIZE_KEY = PREFIX + ".worker-group.size"; + int WORKER_GROUP_SIZE_DEFAULT = Math.max(1, NettyRuntime.availableProcessors() * 2); + static int workerGroupSize(RaftProperties properties) { + return getInt(properties::getInt, WORKER_GROUP_SIZE_KEY, + WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(1), requireMax(65536)); + } + static void setWorkerGroupSize(RaftProperties properties, int size) { + setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, size); } } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java deleted file mode 100644 index 9deb4b79f7..0000000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcEventLoops.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc; - -import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; -import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder; -import org.apache.ratis.thirdparty.io.netty.channel.Channel; -import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; -import org.apache.ratis.thirdparty.io.netty.channel.ServerChannel; -import org.apache.ratis.thirdparty.io.netty.channel.epoll.Epoll; -import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup; -import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollServerSocketChannel; -import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollSocketChannel; -import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel; -import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; -import org.apache.ratis.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -/** - * Helper for creating bounded Netty {@link EventLoopGroup} instances for gRPC, - * along with matching {@link Channel} / {@link ServerChannel} types. - * - *

By default, gRPC uses a shared {@link EventLoopGroup} sized to - * {@code Runtime.getRuntime().availableProcessors() * 2}, and Netty threads - * in that group never exit once started. A traffic burst (e.g. a follower - * catch-up after restart) can permanently inflate the active thread count. - * This helper lets callers construct a smaller, dedicated group and wire it - * into {@link NettyServerBuilder} / {@link NettyChannelBuilder} consistently - * with the matching channel type. - */ -public final class GrpcEventLoops { - private static final Logger LOG = LoggerFactory.getLogger(GrpcEventLoops.class); - - private GrpcEventLoops() {} - - public static boolean isEpollAvailable() { - return Epoll.isAvailable(); - } - - /** Create a new {@link EventLoopGroup} with the given number of threads. */ - public static EventLoopGroup newEventLoopGroup(int threads, String name) { - if (threads <= 0) { - throw new IllegalArgumentException("threads must be > 0, but got " + threads); - } - final ThreadFactory threadFactory = new DefaultThreadFactory(name); - if (isEpollAvailable()) { - LOG.info("Creating EpollEventLoopGroup({}) for {}", threads, name); - return new EpollEventLoopGroup(threads, threadFactory); - } - LOG.info("Creating NioEventLoopGroup({}) for {} (Epoll not available)", threads, name); - return new NioEventLoopGroup(threads, threadFactory); - } - - public static Class getServerChannelType() { - return isEpollAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class; - } - - public static Class getClientChannelType() { - return isEpollAvailable() ? EpollSocketChannel.class : NioSocketChannel.class; - } - - /** - * Configure the {@link NettyServerBuilder} to use the given boss and worker groups. - * - *

gRPC requires that the boss group, worker group, and channel type are - * all provided together or all omitted. The caller owns the lifecycle of - * both groups and must shut them down. - */ - public static void configure(NettyServerBuilder serverBuilder, - EventLoopGroup bossGroup, EventLoopGroup workerGroup) { - if (workerGroup == null) { - return; - } - serverBuilder.channelType(getServerChannelType()) - .bossEventLoopGroup(bossGroup) - .workerEventLoopGroup(workerGroup); - } - - /** Configure the {@link NettyChannelBuilder} to use the given event-loop group. */ - public static void configure(NettyChannelBuilder channelBuilder, EventLoopGroup group) { - if (group == null) { - return; - } - channelBuilder.channelType(getClientChannelType()) - .eventLoopGroup(group); - } - - /** Gracefully shut down the group, swallowing interruptions. */ - public static void shutdownGracefully(EventLoopGroup group) { - if (group == null) { - return; - } - try { - group.shutdownGracefully(0, 3, TimeUnit.SECONDS).await(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn("Interrupted while shutting down EventLoopGroup", e); - } catch (Exception e) { - LOG.warn("Failed to shut down EventLoopGroup", e); - } - } -} diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java index 2837cb9cd7..d51e13c5b5 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java @@ -21,8 +21,8 @@ import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; -import org.apache.ratis.grpc.GrpcEventLoops; import org.apache.ratis.grpc.GrpcUtil; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.grpc.metrics.intercept.client.MetricClientInterceptor; import org.apache.ratis.proto.RaftProtos.GroupInfoReplyProto; import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto; @@ -106,10 +106,10 @@ public class GrpcClientProtocolClient implements Closeable { this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug); metricClientInterceptor = new MetricClientInterceptor(getName()); - final int workerThreads = GrpcConfigKeys.Client.workerEventLoopThreads(properties); - this.workerEventLoopGroup = workerThreads > 0 - ? GrpcEventLoops.newEventLoopGroup(workerThreads, getName() + "-grpc-worker-ELG") - : null; + this.workerEventLoopGroup = NettyUtils.newEventLoopGroup( + getName() + "-client-workers", + GrpcConfigKeys.Client.workerGroupSize(properties), + GrpcConfigKeys.useEpoll(properties)); final String clientAddress = Optional.ofNullable(target.getClientAddress()) .filter(x -> !x.isEmpty()).orElse(target.getAddress()); @@ -143,7 +143,8 @@ private ManagedChannel buildChannel(String address, SslContext sslContext, channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } - GrpcEventLoops.configure(channelBuilder, workerEventLoopGroup); + channelBuilder.channelType(NettyUtils.getSocketChannelClass(workerEventLoopGroup)) + .eventLoopGroup(workerEventLoopGroup); return channelBuilder.flowControlWindow(flowControlWindow.getSizeInt()) .maxInboundMessageSize(maxMessageSize.getSizeInt()) @@ -164,7 +165,7 @@ public void close() { GrpcUtil.shutdownManagedChannel(adminChannel); } metricClientInterceptor.close(); - GrpcEventLoops.shutdownGracefully(workerEventLoopGroup); + NettyUtils.shutdownGracefully(workerEventLoopGroup); } RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException { diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java index e9856e2aa5..6dc0d32527 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java @@ -17,8 +17,8 @@ */ package org.apache.ratis.grpc.server; -import org.apache.ratis.grpc.GrpcEventLoops; import org.apache.ratis.grpc.GrpcUtil; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.grpc.util.StreamObserverWithTimeout; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.util.ServerStringUtils; @@ -96,7 +96,10 @@ private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow, SslC channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } channelBuilder.disableRetry(); - GrpcEventLoops.configure(channelBuilder, eventLoopGroup); + if (eventLoopGroup != null) { + channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) + .eventLoopGroup(eventLoopGroup); + } return channelBuilder.flowControlWindow(flowControlWindow).build(); } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java index 436e154106..c604ee0726 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java @@ -19,9 +19,9 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; -import org.apache.ratis.grpc.GrpcEventLoops; import org.apache.ratis.grpc.metrics.MessageMetrics; import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.protocol.AdminAsynchronousProtocol; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; @@ -116,7 +116,10 @@ public static final class Builder { private SizeInBytes flowControlWindow; private TimeDuration requestTimeoutDuration; private boolean separateHeartbeatChannel; - private int workerEventLoopThreads; + + private EventLoopGroup serverBosses; + private EventLoopGroup serverWorkers; + private EventLoopGroup clientWorkers; private Builder() {} @@ -135,7 +138,6 @@ public Builder setServer(RaftServer raftServer) { this.requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(properties); this.separateHeartbeatChannel = GrpcConfigKeys.Server.heartbeatChannel(properties); this.serverStubPoolSize = GrpcConfigKeys.Server.stubPoolSize(properties); - this.workerEventLoopThreads = GrpcConfigKeys.Server.workerEventLoopThreads(properties); final SizeInBytes appenderBufferSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties); final SizeInBytes gap = SizeInBytes.ONE_MB; @@ -168,8 +170,7 @@ private ExecutorService newExecutor() { server.getId() + "-request-"); } - private GrpcClientProtocolService newGrpcClientProtocolService( - ExecutorService executor) { + private GrpcClientProtocolService newGrpcClientProtocolService(ExecutorService executor) { return new GrpcClientProtocolService(server::getId, server, executor); } @@ -186,28 +187,28 @@ Server buildServer(NettyServerBuilder builder, EnumSet types) return customizer.customize(builder, types).build(); } - private NettyServerBuilder newNettyServerBuilderForServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup) { - return newNettyServerBuilder(serverHost, serverPort, serverSslContextForServer, bossGroup, workerGroup); + private NettyServerBuilder newNettyServerBuilderForServer() { + return newNettyServerBuilder(serverHost, serverPort, serverSslContextForServer); } - private NettyServerBuilder newNettyServerBuilderForAdmin(EventLoopGroup bossGroup, EventLoopGroup workerGroup) { - return newNettyServerBuilder(adminHost, adminPort, adminSslContext, bossGroup, workerGroup); + private NettyServerBuilder newNettyServerBuilderForAdmin() { + return newNettyServerBuilder(adminHost, adminPort, adminSslContext); } - private NettyServerBuilder newNettyServerBuilderForClient(EventLoopGroup bossGroup, EventLoopGroup workerGroup) { - return newNettyServerBuilder(clientHost, clientPort, clientSslContext, bossGroup, workerGroup); + private NettyServerBuilder newNettyServerBuilderForClient() { + return newNettyServerBuilder(clientHost, clientPort, clientSslContext); } - private NettyServerBuilder newNettyServerBuilder(String hostname, int port, SslContext sslContext, - EventLoopGroup bossGroup, EventLoopGroup workerGroup) { + private NettyServerBuilder newNettyServerBuilder(String hostname, int port, SslContext sslContext) { final InetSocketAddress address = hostname == null || hostname.isEmpty() ? new InetSocketAddress(port) : new InetSocketAddress(hostname, port); final NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address) .withChildOption(ChannelOption.SO_REUSEADDR, true) .maxInboundMessageSize(messageSizeMax.getSizeInt()) - .flowControlWindow(flowControlWindow.getSizeInt()); - - GrpcEventLoops.configure(nettyServerBuilder, bossGroup, workerGroup); + .flowControlWindow(flowControlWindow.getSizeInt()) + .channelType(NettyUtils.getServerChannelClass(serverBosses)) + .bossEventLoopGroup(serverBosses) + .workerEventLoopGroup(serverWorkers); if (sslContext != null) { LOG.info("Setting TLS for {}", address); @@ -224,10 +225,9 @@ private boolean separateClientServer() { return clientPort > 0 && clientPort != serverPort; } - Server newServer(GrpcClientProtocolService client, ServerInterceptor interceptor, - EventLoopGroup bossGroup, EventLoopGroup workerGroup) { + Server newServer(GrpcClientProtocolService client, ServerInterceptor interceptor) { final EnumSet types = EnumSet.of(GrpcServices.Type.SERVER); - final NettyServerBuilder serverBuilder = newNettyServerBuilderForServer(bossGroup, workerGroup); + final NettyServerBuilder serverBuilder = newNettyServerBuilderForServer(); final GrpcServerProtocolService service = newGrpcServerProtocolService(); serverBuilder.addService(ServerInterceptors.intercept(service, interceptor)); @@ -242,25 +242,16 @@ Server newServer(GrpcClientProtocolService client, ServerInterceptor interceptor return buildServer(serverBuilder, types); } - private EventLoopGroup bossEventLoopGroup; - private EventLoopGroup workerEventLoopGroup; - - EventLoopGroup getBossEventLoopGroup() { - if (workerEventLoopThreads > 0 && bossEventLoopGroup == null) { - bossEventLoopGroup = GrpcEventLoops.newEventLoopGroup(1, server.getId() + "-grpc-boss-ELG"); - } - return bossEventLoopGroup; - } - - EventLoopGroup getWorkerEventLoopGroup() { - if (workerEventLoopThreads > 0 && workerEventLoopGroup == null) { - final String name = server.getId() + "-grpc-worker-ELG"; - workerEventLoopGroup = GrpcEventLoops.newEventLoopGroup(workerEventLoopThreads, name); - } - return workerEventLoopGroup; - } - public GrpcServicesImpl build() { + final RaftProperties props = server.getProperties(); + final String id = server.getId() + ""; + final boolean useEpoll = GrpcConfigKeys.useEpoll(props); + serverBosses = NettyUtils.newEventLoopGroup(id + "-boss", + GrpcConfigKeys.Server.bossGroupSize(props), useEpoll); + serverWorkers = NettyUtils.newEventLoopGroup(id + "-server-workers", + GrpcConfigKeys.Server.workerGroupSize(props), useEpoll); + clientWorkers = NettyUtils.newEventLoopGroup(id + "-client-workers", + GrpcConfigKeys.Client.workerGroupSize(props), useEpoll); return new GrpcServicesImpl(this); } @@ -300,27 +291,28 @@ public static Builder newBuilder() { private final GrpcClientProtocolService clientProtocolService; private final MetricServerInterceptor serverInterceptor; - private final EventLoopGroup bossEventLoopGroup; - private final EventLoopGroup workerEventLoopGroup; + private final EventLoopGroup serverBosses; + private final EventLoopGroup serverWorkers; + private final EventLoopGroup clientWorkers; private GrpcServicesImpl(Builder b) { super(b.server::getId, id -> new PeerProxyMap<>(id.toString(), - peer -> b.newGrpcServerProtocolClient(peer, b.getWorkerEventLoopGroup()))); + peer -> b.newGrpcServerProtocolClient(peer, b.clientWorkers))); - this.bossEventLoopGroup = b.getBossEventLoopGroup(); - this.workerEventLoopGroup = b.getWorkerEventLoopGroup(); + this.serverBosses = b.serverBosses; + this.serverWorkers = b.serverWorkers; + this.clientWorkers = b.clientWorkers; this.executor = b.newExecutor(); this.clientProtocolService = b.newGrpcClientProtocolService(executor); this.serverInterceptor = b.newMetricServerInterceptor(); - final Server server = b.newServer(clientProtocolService, serverInterceptor, - bossEventLoopGroup, workerEventLoopGroup); + final Server server = b.newServer(clientProtocolService, serverInterceptor); servers.put(GrpcServerProtocolService.class.getSimpleName(), server); addressSupplier = newAddressSupplier(b.serverPort, server); if (b.separateAdminServer()) { - final NettyServerBuilder builder = b.newNettyServerBuilderForAdmin(bossEventLoopGroup, workerEventLoopGroup); + final NettyServerBuilder builder = b.newNettyServerBuilderForAdmin(); addAdminService(builder, b.server, serverInterceptor); final Server adminServer = b.buildServer(builder, EnumSet.of(GrpcServices.Type.ADMIN)); servers.put(GrpcAdminProtocolService.class.getName(), adminServer); @@ -330,7 +322,7 @@ private GrpcServicesImpl(Builder b) { } if (b.separateClientServer()) { - final NettyServerBuilder builder = b.newNettyServerBuilderForClient(bossEventLoopGroup, workerEventLoopGroup); + final NettyServerBuilder builder = b.newNettyServerBuilderForClient(); addClientService(builder, clientProtocolService, serverInterceptor); final Server clientServer = b.buildServer(builder, EnumSet.of(GrpcServices.Type.CLIENT)); servers.put(GrpcClientProtocolService.class.getName(), clientServer); @@ -409,8 +401,7 @@ public void closeImpl() { LOG.warn("{}: Failed to close proxies", getId(), e); } - GrpcEventLoops.shutdownGracefully(workerEventLoopGroup); - GrpcEventLoops.shutdownGracefully(bossEventLoopGroup); + NettyUtils.shutdownGracefully(clientWorkers, serverWorkers, serverBosses); } @Override diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java index edd0413430..84a4651f9d 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java @@ -17,8 +17,8 @@ */ package org.apache.ratis.grpc.server; -import org.apache.ratis.grpc.GrpcEventLoops; import org.apache.ratis.grpc.GrpcUtil; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType; import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; @@ -55,7 +55,10 @@ static ManagedChannel buildManagedChannel(String address, SslContext sslContext, } else { channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } - GrpcEventLoops.configure(channelBuilder, eventLoopGroup); + if (eventLoopGroup != null) { + channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) + .eventLoopGroup(eventLoopGroup); + } ManagedChannel ch = channelBuilder.build(); ch.getState(true); return ch; diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java index 56ca6b030c..3c908f67a6 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java @@ -27,6 +27,7 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.NetUtils; +import org.apache.ratis.util.NettyUtils; import java.io.Closeable; import java.net.InetSocketAddress; diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java index e72d6c6772..d2eb38859a 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java @@ -35,6 +35,7 @@ import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.util.PeerProxyMap; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java index 2bfeea31e1..5b673d5189 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java @@ -28,7 +28,7 @@ import org.apache.ratis.io.WriteOption; import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.netty.NettyDataStreamUtils; -import org.apache.ratis.netty.NettyUtils; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.protocol.ClientInvocationId; import org.apache.ratis.protocol.DataStreamReply; import org.apache.ratis.protocol.DataStreamRequest; diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index bc57343fb2..f7d2805e8a 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -20,7 +20,7 @@ import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.netty.NettyRpcProxy; -import org.apache.ratis.netty.NettyUtils; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.protocol.GroupInfoReply; import org.apache.ratis.protocol.GroupListReply; import org.apache.ratis.protocol.RaftClientReply; diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java index 24303d867e..643ed15a0f 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java @@ -27,7 +27,7 @@ import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf; import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.netty.NettyDataStreamUtils; -import org.apache.ratis.netty.NettyUtils; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.DataStreamPacket; diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java index 247a7bcac1..156ab410d3 100644 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java +++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java @@ -19,7 +19,7 @@ import org.apache.ratis.conf.Parameters; import org.apache.ratis.netty.NettyConfigKeys; -import org.apache.ratis.netty.NettyUtils; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.security.SecurityTestUtils; import org.apache.ratis.security.TlsConf; import org.apache.ratis.util.JavaUtils; diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java index 6ea1caceba..c09a533397 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java @@ -19,7 +19,9 @@ import org.apache.ratis.BaseTest; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.ratis.thirdparty.io.netty.channel.epoll.Epoll; import org.apache.ratis.thirdparty.io.netty.util.concurrent.EventExecutor; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -29,7 +31,7 @@ public class TestGrpcEventLoops extends BaseTest { @Test public void testNewEventLoopGroupWithThreadCount() { final int threads = 3; - final EventLoopGroup group = GrpcEventLoops.newEventLoopGroup(threads, "test-elg"); + final EventLoopGroup group = NettyUtils.newEventLoopGroup("test-elg", threads, false); try { Assertions.assertNotNull(group); int count = 0; @@ -38,51 +40,57 @@ public void testNewEventLoopGroupWithThreadCount() { } Assertions.assertEquals(threads, count); } finally { - GrpcEventLoops.shutdownGracefully(group); + NettyUtils.shutdownGracefully(group); } } - @Test - public void testNewEventLoopGroupRejectsNonPositiveThreads() { - Assertions.assertThrows(IllegalArgumentException.class, - () -> GrpcEventLoops.newEventLoopGroup(0, "test-elg")); - Assertions.assertThrows(IllegalArgumentException.class, - () -> GrpcEventLoops.newEventLoopGroup(-1, "test-elg")); - } - @Test public void testChannelTypeMatchesEpollAvailability() { - if (GrpcEventLoops.isEpollAvailable()) { - Assertions.assertEquals("EpollServerSocketChannel", - GrpcEventLoops.getServerChannelType().getSimpleName()); - Assertions.assertEquals("EpollSocketChannel", - GrpcEventLoops.getClientChannelType().getSimpleName()); - } else { - Assertions.assertEquals("NioServerSocketChannel", - GrpcEventLoops.getServerChannelType().getSimpleName()); - Assertions.assertEquals("NioSocketChannel", - GrpcEventLoops.getClientChannelType().getSimpleName()); + final EventLoopGroup group = NettyUtils.newEventLoopGroup("test-epoll", 1, true); + try { + if (Epoll.isAvailable()) { + Assertions.assertEquals("EpollServerSocketChannel", + NettyUtils.getServerChannelClass(group).getSimpleName()); + Assertions.assertEquals("EpollSocketChannel", + NettyUtils.getSocketChannelClass(group).getSimpleName()); + } else { + Assertions.assertEquals("NioServerSocketChannel", + NettyUtils.getServerChannelClass(group).getSimpleName()); + Assertions.assertEquals("NioSocketChannel", + NettyUtils.getSocketChannelClass(group).getSimpleName()); + } + } finally { + NettyUtils.shutdownGracefully(group); } } @Test public void testConfigKeyDefaults() { final RaftProperties properties = new RaftProperties(); - Assertions.assertEquals(0, GrpcConfigKeys.Server.workerEventLoopThreads(properties)); - Assertions.assertEquals(0, GrpcConfigKeys.Client.workerEventLoopThreads(properties)); + final int expectedWorker = GrpcConfigKeys.Server.WORKER_GROUP_SIZE_DEFAULT; + Assertions.assertTrue(expectedWorker > 0, + "default worker threads should be positive, but got " + expectedWorker); + Assertions.assertEquals(expectedWorker, GrpcConfigKeys.Server.workerGroupSize(properties)); + Assertions.assertEquals(expectedWorker, GrpcConfigKeys.Client.workerGroupSize(properties)); + Assertions.assertEquals(0, GrpcConfigKeys.Server.bossGroupSize(properties)); + Assertions.assertTrue(GrpcConfigKeys.useEpoll(properties)); } @Test public void testConfigKeyRoundtrip() { final RaftProperties properties = new RaftProperties(); - GrpcConfigKeys.Server.setWorkerEventLoopThreads(properties, 4); - GrpcConfigKeys.Client.setWorkerEventLoopThreads(properties, 2); - Assertions.assertEquals(4, GrpcConfigKeys.Server.workerEventLoopThreads(properties)); - Assertions.assertEquals(2, GrpcConfigKeys.Client.workerEventLoopThreads(properties)); + GrpcConfigKeys.Server.setWorkerGroupSize(properties, 4); + GrpcConfigKeys.Server.setBossGroupSize(properties, 1); + GrpcConfigKeys.Client.setWorkerGroupSize(properties, 2); + GrpcConfigKeys.setUseEpoll(properties, false); + Assertions.assertEquals(4, GrpcConfigKeys.Server.workerGroupSize(properties)); + Assertions.assertEquals(1, GrpcConfigKeys.Server.bossGroupSize(properties)); + Assertions.assertEquals(2, GrpcConfigKeys.Client.workerGroupSize(properties)); + Assertions.assertFalse(GrpcConfigKeys.useEpoll(properties)); } @Test public void testShutdownNullIsNoop() { - GrpcEventLoops.shutdownGracefully(null); + NettyUtils.shutdownGracefully((EventLoopGroup) null); } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java index 9d0df3fd02..6b85e61d1d 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java @@ -30,8 +30,8 @@ /** * Verify a cluster comes up and processes requests when the worker * event-loop thread count is capped via - * {@link GrpcConfigKeys.Server#WORKER_EVENT_LOOP_THREADS_KEY} and - * {@link GrpcConfigKeys.Client#WORKER_EVENT_LOOP_THREADS_KEY}. + * {@link GrpcConfigKeys.Server#WORKER_GROUP_SIZE_KEY} and + * {@link GrpcConfigKeys.Client#WORKER_GROUP_SIZE_KEY}. * *

Regression: RATIS-2529 — gRPC worker threads permanently inflate to * {@code availableProcessors * 2} after follower restart catch-up. @@ -42,8 +42,8 @@ public class TestGrpcWorkerEventLoopThreads extends BaseTest { public void testClusterWithCappedWorkerEventLoopThreads() throws Exception { final String[] ids = {"s0", "s1", "s2"}; final RaftProperties properties = new RaftProperties(); - GrpcConfigKeys.Server.setWorkerEventLoopThreads(properties, 2); - GrpcConfigKeys.Client.setWorkerEventLoopThreads(properties, 1); + GrpcConfigKeys.Server.setWorkerGroupSize(properties, 2); + GrpcConfigKeys.Client.setWorkerGroupSize(properties, 1); try (MiniRaftClusterWithGrpc cluster = new MiniRaftClusterWithGrpc(ids, properties, null)) { cluster.start(); diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java index 2507220a8e..6b83ab7115 100644 --- a/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java +++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java @@ -34,6 +34,7 @@ import org.apache.ratis.thirdparty.io.netty.channel.SimpleChannelInboundHandler; import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.NettyUtils; import org.junit.jupiter.api.Test; import java.lang.reflect.Field; diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestTlsConfWithNetty.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestTlsConfWithNetty.java index dbdcf1ebd8..5d73d9ea3a 100644 --- a/ratis-test/src/test/java/org/apache/ratis/netty/TestTlsConfWithNetty.java +++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestTlsConfWithNetty.java @@ -38,6 +38,7 @@ import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.NettyUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; diff --git a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/TestSecureRatisShell.java b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/TestSecureRatisShell.java index 21e9fe229a..7b7de79d21 100644 --- a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/TestSecureRatisShell.java +++ b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/TestSecureRatisShell.java @@ -24,7 +24,7 @@ import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.grpc.MiniRaftClusterWithGrpc; -import org.apache.ratis.netty.NettyUtils; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.security.SecurityTestUtils; import org.apache.ratis.util.Slf4jUtils; From 7b5fcf68218de7919e907de62168aca7a72bbba0 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Fri, 29 May 2026 13:39:33 +0800 Subject: [PATCH 3/8] RATIS-2529. Fix gRPC client worker ownership --- .../grpc/client/GrpcClientProtocolClient.java | 13 ++-- .../ratis/grpc/client/GrpcClientRpc.java | 42 ++++++++++-- .../ratis/grpc/server/GrpcServicesImpl.java | 19 ++++-- .../grpc/client/TestGrpcClientEventLoops.java | 68 +++++++++++++++++++ 4 files changed, 123 insertions(+), 19 deletions(-) create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java index d51e13c5b5..5c0f622a9b 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java @@ -99,17 +99,13 @@ public class GrpcClientProtocolClient implements Closeable { private final EventLoopGroup workerEventLoopGroup; GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties, - SslContext adminSslContext, SslContext clientSslContext) { + SslContext adminSslContext, SslContext clientSslContext, EventLoopGroup workerEventLoopGroup) { this.name = JavaUtils.memoize(() -> id + "->" + target.getId()); this.target = target; final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug); this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug); metricClientInterceptor = new MetricClientInterceptor(getName()); - - this.workerEventLoopGroup = NettyUtils.newEventLoopGroup( - getName() + "-client-workers", - GrpcConfigKeys.Client.workerGroupSize(properties), - GrpcConfigKeys.useEpoll(properties)); + this.workerEventLoopGroup = Objects.requireNonNull(workerEventLoopGroup, "workerEventLoopGroup"); final String clientAddress = Optional.ofNullable(target.getClientAddress()) .filter(x -> !x.isEmpty()).orElse(target.getAddress()); @@ -165,7 +161,10 @@ public void close() { GrpcUtil.shutdownManagedChannel(adminChannel); } metricClientInterceptor.close(); - NettyUtils.shutdownGracefully(workerEventLoopGroup); + } + + EventLoopGroup getWorkerEventLoopGroup() { + return workerEventLoopGroup; } RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException { diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index 65175dc2a1..8e35dd9504 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -29,6 +29,7 @@ import org.apache.ratis.thirdparty.io.grpc.Status; import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto; import org.apache.ratis.proto.RaftProtos.GroupListRequestProto; import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto; @@ -41,6 +42,7 @@ import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.util.PeerProxyMap; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -60,15 +62,36 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy(clientId.toString(), - p -> new GrpcClientProtocolClient(clientId, p, properties, adminSslContext, clientSslContext))); - this.clientId = clientId; - this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt(); - this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties); - this.watchRequestTimeoutDuration = RaftClientConfigKeys.Rpc.watchRequestTimeout(properties); + p -> new GrpcClientProtocolClient(clientId, p, properties, adminSslContext, clientSslContext, clientWorkers))); + this.clientWorkers = clientWorkers; + + boolean initialized = false; + try { + this.clientId = clientId; + this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt(); + this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties); + this.watchRequestTimeoutDuration = RaftClientConfigKeys.Rpc.watchRequestTimeout(properties); + initialized = true; + } finally { + if (!initialized) { + NettyUtils.shutdownGracefully(clientWorkers); + } + } + } + + private static EventLoopGroup newClientWorkers(ClientId clientId, RaftProperties properties) { + return NettyUtils.newEventLoopGroup(clientId + "-client-workers", + GrpcConfigKeys.Client.workerGroupSize(properties), GrpcConfigKeys.useEpoll(properties)); } @Override @@ -213,6 +236,15 @@ private RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest reques return proto; } + @Override + public void close() { + try { + super.close(); + } finally { + NettyUtils.shutdownGracefully(clientWorkers); + } + } + @Override public boolean shouldReconnect(Throwable e) { final Throwable cause = e.getCause(); diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java index c604ee0726..5326cd02e6 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java @@ -246,13 +246,18 @@ public GrpcServicesImpl build() { final RaftProperties props = server.getProperties(); final String id = server.getId() + ""; final boolean useEpoll = GrpcConfigKeys.useEpoll(props); - serverBosses = NettyUtils.newEventLoopGroup(id + "-boss", - GrpcConfigKeys.Server.bossGroupSize(props), useEpoll); - serverWorkers = NettyUtils.newEventLoopGroup(id + "-server-workers", - GrpcConfigKeys.Server.workerGroupSize(props), useEpoll); - clientWorkers = NettyUtils.newEventLoopGroup(id + "-client-workers", - GrpcConfigKeys.Client.workerGroupSize(props), useEpoll); - return new GrpcServicesImpl(this); + try { + serverBosses = NettyUtils.newEventLoopGroup(id + "-boss", + GrpcConfigKeys.Server.bossGroupSize(props), useEpoll); + serverWorkers = NettyUtils.newEventLoopGroup(id + "-server-workers", + GrpcConfigKeys.Server.workerGroupSize(props), useEpoll); + clientWorkers = NettyUtils.newEventLoopGroup(id + "-client-workers", + GrpcConfigKeys.Client.workerGroupSize(props), useEpoll); + return new GrpcServicesImpl(this); + } catch (RuntimeException | Error e) { + NettyUtils.shutdownGracefully(clientWorkers, serverWorkers, serverBosses); + throw e; + } } public Builder setAdminSslContext(SslContext adminSslContext) { diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java b/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java new file mode 100644 index 0000000000..17ed2ba0c5 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.client; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.ratis.thirdparty.io.netty.util.concurrent.EventExecutor; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +public class TestGrpcClientEventLoops extends BaseTest { + @Test + public void testClientWorkerEventLoopGroupSharedByPeerProxies() throws Exception { + final RaftProperties properties = new RaftProperties(); + GrpcConfigKeys.Client.setWorkerGroupSize(properties, 1); + GrpcConfigKeys.setUseEpoll(properties, false); + + final RaftPeer p0 = newPeer("s0", 15000); + final RaftPeer p1 = newPeer("s1", 15001); + EventLoopGroup clientWorkers = null; + try (GrpcClientRpc rpc = new GrpcClientRpc(ClientId.randomId(), properties, null, null)) { + rpc.addRaftPeers(Arrays.asList(p0, p1)); + final GrpcClientProtocolClient c0 = rpc.getProxies().getProxy(p0.getId()); + final GrpcClientProtocolClient c1 = rpc.getProxies().getProxy(p1.getId()); + + clientWorkers = c0.getWorkerEventLoopGroup(); + Assertions.assertSame(clientWorkers, c1.getWorkerEventLoopGroup()); + Assertions.assertEquals(1, countEventExecutors(clientWorkers)); + Assertions.assertFalse(clientWorkers.isShuttingDown()); + } + + Assertions.assertNotNull(clientWorkers); + Assertions.assertTrue(clientWorkers.isShuttingDown() || clientWorkers.isShutdown() || clientWorkers.isTerminated()); + } + + private static RaftPeer newPeer(String id, int port) { + return RaftPeer.newBuilder().setId(id).setAddress("127.0.0.1:" + port).build(); + } + + private static int countEventExecutors(EventLoopGroup group) { + int count = 0; + for (EventExecutor ignored : group) { + count++; + } + return count; + } +} From 84c1a576729361d261535ba66c026fd7da71d20d Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Wed, 27 May 2026 17:44:22 +0800 Subject: [PATCH 4/8] RATIS-2529. Address gRPC EventLoopGroup review comments --- .../org/apache/ratis/grpc/GrpcFactory.java | 2 +- .../grpc/client/GrpcClientProtocolClient.java | 18 +++++--- .../ratis/grpc/client/GrpcClientRpc.java | 44 ++++++++----------- .../grpc/server/GrpcServerProtocolClient.java | 9 ++-- .../ratis/grpc/server/GrpcServicesImpl.java | 9 +++- .../ratis/grpc/server/GrpcStubPool.java | 8 ++-- .../grpc/client/TestGrpcClientEventLoops.java | 6 +-- 7 files changed, 49 insertions(+), 47 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java index 1053cab80e..f3ca3f5faf 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java @@ -146,6 +146,6 @@ public GrpcClientRpc newRaftClientRpc(ClientId clientId, RaftProperties properti checkPooledByteBufAllocatorUseCacheForAllThreads(LOG::debug); final SslContexts forClient = forClientSupplier.get(); - return new GrpcClientRpc(clientId, properties, forClient.adminSslContext, forClient.clientSslContext); + return GrpcClientRpc.create(clientId, properties, forClient.adminSslContext, forClient.clientSslContext); } } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java index 5c0f622a9b..7c9545b559 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java @@ -56,6 +56,8 @@ import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.MemoizedSupplier; +import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TimeoutExecutor; @@ -96,16 +98,16 @@ public class GrpcClientProtocolClient implements Closeable { private final AtomicReference unorderedStreamObservers = new AtomicReference<>(); private final MetricClientInterceptor metricClientInterceptor; - private final EventLoopGroup workerEventLoopGroup; + private final MemoizedSupplier clientWorkers; GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties, - SslContext adminSslContext, SslContext clientSslContext, EventLoopGroup workerEventLoopGroup) { + SslContext adminSslContext, SslContext clientSslContext, MemoizedSupplier clientWorkers) { this.name = JavaUtils.memoize(() -> id + "->" + target.getId()); this.target = target; final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug); this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug); metricClientInterceptor = new MetricClientInterceptor(getName()); - this.workerEventLoopGroup = Objects.requireNonNull(workerEventLoopGroup, "workerEventLoopGroup"); + this.clientWorkers = Objects.requireNonNull(clientWorkers, "clientWorkers"); final String clientAddress = Optional.ofNullable(target.getClientAddress()) .filter(x -> !x.isEmpty()).orElse(target.getAddress()); @@ -139,8 +141,9 @@ private ManagedChannel buildChannel(String address, SslContext sslContext, channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } - channelBuilder.channelType(NettyUtils.getSocketChannelClass(workerEventLoopGroup)) - .eventLoopGroup(workerEventLoopGroup); + final EventLoopGroup eventLoopGroup = clientWorkers.get(); + channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) + .eventLoopGroup(eventLoopGroup); return channelBuilder.flowControlWindow(flowControlWindow.getSizeInt()) .maxInboundMessageSize(maxMessageSize.getSizeInt()) @@ -163,8 +166,9 @@ public void close() { metricClientInterceptor.close(); } - EventLoopGroup getWorkerEventLoopGroup() { - return workerEventLoopGroup; + EventLoopGroup getClientWorkersForTesting() { + Preconditions.assertTrue(clientWorkers.isInitialized()); + return clientWorkers.get(); } RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException { diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index 8e35dd9504..b5e8a4835e 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -42,6 +42,7 @@ import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.NettyUtils; import org.apache.ratis.util.PeerProxyMap; import org.apache.ratis.util.TimeDuration; @@ -58,40 +59,31 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy { public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class); + public static GrpcClientRpc create(ClientId clientId, RaftProperties properties, + SslContext adminSslContext, SslContext clientSslContext) { + final MemoizedSupplier eventLoopGroup = MemoizedSupplier.valueOf(() -> NettyUtils.newEventLoopGroup( + clientId + "-client-workers", + GrpcConfigKeys.Client.workerGroupSize(properties), + GrpcConfigKeys.useEpoll(properties))); + return new GrpcClientRpc(clientId, properties, adminSslContext, clientSslContext, eventLoopGroup); + } + private final ClientId clientId; private final int maxMessageSize; private final TimeDuration requestTimeoutDuration; private final TimeDuration watchRequestTimeoutDuration; - private final EventLoopGroup clientWorkers; - - public GrpcClientRpc(ClientId clientId, RaftProperties properties, - SslContext adminSslContext, SslContext clientSslContext) { - this(clientId, properties, adminSslContext, clientSslContext, newClientWorkers(clientId, properties)); - } + private final MemoizedSupplier clientWorkers; private GrpcClientRpc(ClientId clientId, RaftProperties properties, - SslContext adminSslContext, SslContext clientSslContext, EventLoopGroup clientWorkers) { + SslContext adminSslContext, SslContext clientSslContext, MemoizedSupplier clientWorkers) { super(new PeerProxyMap<>(clientId.toString(), p -> new GrpcClientProtocolClient(clientId, p, properties, adminSslContext, clientSslContext, clientWorkers))); this.clientWorkers = clientWorkers; - boolean initialized = false; - try { - this.clientId = clientId; - this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt(); - this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties); - this.watchRequestTimeoutDuration = RaftClientConfigKeys.Rpc.watchRequestTimeout(properties); - initialized = true; - } finally { - if (!initialized) { - NettyUtils.shutdownGracefully(clientWorkers); - } - } - } - - private static EventLoopGroup newClientWorkers(ClientId clientId, RaftProperties properties) { - return NettyUtils.newEventLoopGroup(clientId + "-client-workers", - GrpcConfigKeys.Client.workerGroupSize(properties), GrpcConfigKeys.useEpoll(properties)); + this.clientId = clientId; + this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt(); + this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties); + this.watchRequestTimeoutDuration = RaftClientConfigKeys.Rpc.watchRequestTimeout(properties); } @Override @@ -241,7 +233,9 @@ public void close() { try { super.close(); } finally { - NettyUtils.shutdownGracefully(clientWorkers); + if (clientWorkers.isInitialized()) { + NettyUtils.shutdownGracefully(clientWorkers.get()); + } } } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java index 6dc0d32527..601c0f58fb 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.util.Objects; /** * This is a RaftClient implementation that supports streaming data to the raft @@ -67,7 +68,7 @@ class GrpcServerProtocolClient implements Closeable { raftPeerId = target.getId(); LOG.info("Build channel for {}", target); useSeparateHBChannel = separateHBChannel; - this.eventLoopGroup = eventLoopGroup; + this.eventLoopGroup = Objects.requireNonNull(eventLoopGroup, "eventLoopGroup == null"); channel = buildChannel(target, flowControlWindow, sslContext); blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel); asyncStub = RaftServerProtocolServiceGrpc.newStub(channel); @@ -96,10 +97,8 @@ private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow, SslC channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } channelBuilder.disableRetry(); - if (eventLoopGroup != null) { - channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) - .eventLoopGroup(eventLoopGroup); - } + channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) + .eventLoopGroup(eventLoopGroup); return channelBuilder.flowControlWindow(flowControlWindow).build(); } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java index 5326cd02e6..04b72fd8b9 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java @@ -254,9 +254,14 @@ public GrpcServicesImpl build() { clientWorkers = NettyUtils.newEventLoopGroup(id + "-client-workers", GrpcConfigKeys.Client.workerGroupSize(props), useEpoll); return new GrpcServicesImpl(this); - } catch (RuntimeException | Error e) { + } catch (Throwable t) { NettyUtils.shutdownGracefully(clientWorkers, serverWorkers, serverBosses); - throw e; + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } + throw new RuntimeException(t); } } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java index 84a4651f9d..5a196579f6 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -44,6 +45,7 @@ final class GrpcStubPool> { public static final Logger LOG = LoggerFactory.getLogger(GrpcStubPool.class); static ManagedChannel buildManagedChannel(String address, SslContext sslContext, EventLoopGroup eventLoopGroup) { + Objects.requireNonNull(eventLoopGroup, "eventLoopGroup == null"); NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(address) .keepAliveTime(10, TimeUnit.MINUTES) .keepAliveWithoutCalls(false) @@ -55,10 +57,8 @@ static ManagedChannel buildManagedChannel(String address, SslContext sslContext, } else { channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } - if (eventLoopGroup != null) { - channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) - .eventLoopGroup(eventLoopGroup); - } + channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) + .eventLoopGroup(eventLoopGroup); ManagedChannel ch = channelBuilder.build(); ch.getState(true); return ch; diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java b/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java index 17ed2ba0c5..76c8475450 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java @@ -39,13 +39,13 @@ public void testClientWorkerEventLoopGroupSharedByPeerProxies() throws Exception final RaftPeer p0 = newPeer("s0", 15000); final RaftPeer p1 = newPeer("s1", 15001); EventLoopGroup clientWorkers = null; - try (GrpcClientRpc rpc = new GrpcClientRpc(ClientId.randomId(), properties, null, null)) { + try (GrpcClientRpc rpc = GrpcClientRpc.create(ClientId.randomId(), properties, null, null)) { rpc.addRaftPeers(Arrays.asList(p0, p1)); final GrpcClientProtocolClient c0 = rpc.getProxies().getProxy(p0.getId()); final GrpcClientProtocolClient c1 = rpc.getProxies().getProxy(p1.getId()); - clientWorkers = c0.getWorkerEventLoopGroup(); - Assertions.assertSame(clientWorkers, c1.getWorkerEventLoopGroup()); + clientWorkers = c0.getClientWorkersForTesting(); + Assertions.assertSame(clientWorkers, c1.getClientWorkersForTesting()); Assertions.assertEquals(1, countEventExecutors(clientWorkers)); Assertions.assertFalse(clientWorkers.isShuttingDown()); } From 84a17687c8af4b2afdcf4465d2b4fa686ded02f9 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Thu, 28 May 2026 14:49:20 +0800 Subject: [PATCH 5/8] RATIS-2529. Fix gRPC client RPC checkstyle --- .../main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index b5e8a4835e..099f913873 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -56,7 +56,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -public class GrpcClientRpc extends RaftClientRpcWithProxy { +public final class GrpcClientRpc extends RaftClientRpcWithProxy { public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class); public static GrpcClientRpc create(ClientId clientId, RaftProperties properties, From a5b60e1f83150a6ab47f92a916822533ccf1e829 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Fri, 29 May 2026 12:38:38 +0800 Subject: [PATCH 6/8] RATIS-2529. Address gRPC build cleanup review --- .../org/apache/ratis/grpc/server/GrpcServicesImpl.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java index 04b72fd8b9..99ac87f06f 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java @@ -256,12 +256,7 @@ public GrpcServicesImpl build() { return new GrpcServicesImpl(this); } catch (Throwable t) { NettyUtils.shutdownGracefully(clientWorkers, serverWorkers, serverBosses); - if (t instanceof RuntimeException) { - throw (RuntimeException) t; - } else if (t instanceof Error) { - throw (Error) t; - } - throw new RuntimeException(t); + throw t; } } From a2a5e3bb6e7b0fa34819169d267aa1f234a19d90 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Fri, 29 May 2026 17:59:17 +0800 Subject: [PATCH 7/8] RATIS-2529. Preserve default gRPC event loop behavior --- .../org/apache/ratis/grpc/GrpcConfigKeys.java | 9 +++-- .../grpc/client/GrpcClientProtocolClient.java | 11 ++++--- .../ratis/grpc/client/GrpcClientRpc.java | 10 +++--- .../grpc/server/GrpcServerProtocolClient.java | 9 ++--- .../ratis/grpc/server/GrpcServicesImpl.java | 33 +++++++++++++------ .../ratis/grpc/server/GrpcStubPool.java | 8 ++--- .../apache/ratis/grpc/TestGrpcEventLoops.java | 3 +- 7 files changed, 49 insertions(+), 34 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java index de26693229..cd7ac10f32 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java @@ -21,7 +21,6 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.server.GrpcServices; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.thirdparty.io.netty.util.NettyRuntime; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -167,10 +166,10 @@ static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) { } String WORKER_GROUP_SIZE_KEY = PREFIX + ".worker-group.size"; - int WORKER_GROUP_SIZE_DEFAULT = Math.max(1, NettyRuntime.availableProcessors() * 2); + int WORKER_GROUP_SIZE_DEFAULT = 0; static int workerGroupSize(RaftProperties properties) { return getInt(properties::getInt, WORKER_GROUP_SIZE_KEY, - WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(1), requireMax(65536)); + WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(0), requireMax(65536)); } static void setWorkerGroupSize(RaftProperties properties, int size) { setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, size); @@ -323,10 +322,10 @@ static void setBossGroupSize(RaftProperties properties, int size) { } String WORKER_GROUP_SIZE_KEY = PREFIX + ".worker-group.size"; - int WORKER_GROUP_SIZE_DEFAULT = Math.max(1, NettyRuntime.availableProcessors() * 2); + int WORKER_GROUP_SIZE_DEFAULT = 0; static int workerGroupSize(RaftProperties properties) { return getInt(properties::getInt, WORKER_GROUP_SIZE_KEY, - WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(1), requireMax(65536)); + WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(0), requireMax(65536)); } static void setWorkerGroupSize(RaftProperties properties, int size) { setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, size); diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java index 7c9545b559..e958c9e000 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java @@ -107,7 +107,7 @@ public class GrpcClientProtocolClient implements Closeable { final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug); this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug); metricClientInterceptor = new MetricClientInterceptor(getName()); - this.clientWorkers = Objects.requireNonNull(clientWorkers, "clientWorkers"); + this.clientWorkers = clientWorkers; final String clientAddress = Optional.ofNullable(target.getClientAddress()) .filter(x -> !x.isEmpty()).orElse(target.getAddress()); @@ -141,9 +141,11 @@ private ManagedChannel buildChannel(String address, SslContext sslContext, channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } - final EventLoopGroup eventLoopGroup = clientWorkers.get(); - channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) - .eventLoopGroup(eventLoopGroup); + if (clientWorkers != null) { + final EventLoopGroup eventLoopGroup = clientWorkers.get(); + channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) + .eventLoopGroup(eventLoopGroup); + } return channelBuilder.flowControlWindow(flowControlWindow.getSizeInt()) .maxInboundMessageSize(maxMessageSize.getSizeInt()) @@ -167,6 +169,7 @@ public void close() { } EventLoopGroup getClientWorkersForTesting() { + Preconditions.assertTrue(clientWorkers != null); Preconditions.assertTrue(clientWorkers.isInitialized()); return clientWorkers.get(); } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index 099f913873..b12b5afe45 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -61,10 +61,10 @@ public final class GrpcClientRpc extends RaftClientRpcWithProxy eventLoopGroup = MemoizedSupplier.valueOf(() -> NettyUtils.newEventLoopGroup( - clientId + "-client-workers", - GrpcConfigKeys.Client.workerGroupSize(properties), - GrpcConfigKeys.useEpoll(properties))); + final int workerGroupSize = GrpcConfigKeys.Client.workerGroupSize(properties); + final MemoizedSupplier eventLoopGroup = workerGroupSize > 0 ? MemoizedSupplier.valueOf( + () -> NettyUtils.newEventLoopGroup( + clientId + "-client-workers", workerGroupSize, GrpcConfigKeys.useEpoll(properties))) : null; return new GrpcClientRpc(clientId, properties, adminSslContext, clientSslContext, eventLoopGroup); } @@ -233,7 +233,7 @@ public void close() { try { super.close(); } finally { - if (clientWorkers.isInitialized()) { + if (clientWorkers != null && clientWorkers.isInitialized()) { NettyUtils.shutdownGracefully(clientWorkers.get()); } } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java index 601c0f58fb..6dc0d32527 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java @@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; -import java.util.Objects; /** * This is a RaftClient implementation that supports streaming data to the raft @@ -68,7 +67,7 @@ class GrpcServerProtocolClient implements Closeable { raftPeerId = target.getId(); LOG.info("Build channel for {}", target); useSeparateHBChannel = separateHBChannel; - this.eventLoopGroup = Objects.requireNonNull(eventLoopGroup, "eventLoopGroup == null"); + this.eventLoopGroup = eventLoopGroup; channel = buildChannel(target, flowControlWindow, sslContext); blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel); asyncStub = RaftServerProtocolServiceGrpc.newStub(channel); @@ -97,8 +96,10 @@ private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow, SslC channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } channelBuilder.disableRetry(); - channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) - .eventLoopGroup(eventLoopGroup); + if (eventLoopGroup != null) { + channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) + .eventLoopGroup(eventLoopGroup); + } return channelBuilder.flowControlWindow(flowControlWindow).build(); } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java index 99ac87f06f..3b31753b86 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java @@ -205,10 +205,18 @@ private NettyServerBuilder newNettyServerBuilder(String hostname, int port, SslC final NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address) .withChildOption(ChannelOption.SO_REUSEADDR, true) .maxInboundMessageSize(messageSizeMax.getSizeInt()) - .flowControlWindow(flowControlWindow.getSizeInt()) - .channelType(NettyUtils.getServerChannelClass(serverBosses)) - .bossEventLoopGroup(serverBosses) - .workerEventLoopGroup(serverWorkers); + .flowControlWindow(flowControlWindow.getSizeInt()); + + final EventLoopGroup channelGroup = serverBosses != null ? serverBosses : serverWorkers; + if (channelGroup != null) { + nettyServerBuilder.channelType(NettyUtils.getServerChannelClass(channelGroup)); + } + if (serverBosses != null) { + nettyServerBuilder.bossEventLoopGroup(serverBosses); + } + if (serverWorkers != null) { + nettyServerBuilder.workerEventLoopGroup(serverWorkers); + } if (sslContext != null) { LOG.info("Setting TLS for {}", address); @@ -247,12 +255,17 @@ public GrpcServicesImpl build() { final String id = server.getId() + ""; final boolean useEpoll = GrpcConfigKeys.useEpoll(props); try { - serverBosses = NettyUtils.newEventLoopGroup(id + "-boss", - GrpcConfigKeys.Server.bossGroupSize(props), useEpoll); - serverWorkers = NettyUtils.newEventLoopGroup(id + "-server-workers", - GrpcConfigKeys.Server.workerGroupSize(props), useEpoll); - clientWorkers = NettyUtils.newEventLoopGroup(id + "-client-workers", - GrpcConfigKeys.Client.workerGroupSize(props), useEpoll); + final int bossGroupSize = GrpcConfigKeys.Server.bossGroupSize(props); + final int serverWorkerGroupSize = GrpcConfigKeys.Server.workerGroupSize(props); + if (bossGroupSize > 0 || serverWorkerGroupSize > 0) { + serverBosses = NettyUtils.newEventLoopGroup(id + "-boss", bossGroupSize, useEpoll); + serverWorkers = NettyUtils.newEventLoopGroup(id + "-server-workers", serverWorkerGroupSize, useEpoll); + } + + final int clientWorkerGroupSize = GrpcConfigKeys.Client.workerGroupSize(props); + if (clientWorkerGroupSize > 0) { + clientWorkers = NettyUtils.newEventLoopGroup(id + "-client-workers", clientWorkerGroupSize, useEpoll); + } return new GrpcServicesImpl(this); } catch (Throwable t) { NettyUtils.shutdownGracefully(clientWorkers, serverWorkers, serverBosses); diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java index 5a196579f6..84a4651f9d 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java @@ -35,7 +35,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -45,7 +44,6 @@ final class GrpcStubPool> { public static final Logger LOG = LoggerFactory.getLogger(GrpcStubPool.class); static ManagedChannel buildManagedChannel(String address, SslContext sslContext, EventLoopGroup eventLoopGroup) { - Objects.requireNonNull(eventLoopGroup, "eventLoopGroup == null"); NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(address) .keepAliveTime(10, TimeUnit.MINUTES) .keepAliveWithoutCalls(false) @@ -57,8 +55,10 @@ static ManagedChannel buildManagedChannel(String address, SslContext sslContext, } else { channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } - channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) - .eventLoopGroup(eventLoopGroup); + if (eventLoopGroup != null) { + channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) + .eventLoopGroup(eventLoopGroup); + } ManagedChannel ch = channelBuilder.build(); ch.getState(true); return ch; diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java index c09a533397..fdee06ebec 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java @@ -68,10 +68,9 @@ public void testChannelTypeMatchesEpollAvailability() { public void testConfigKeyDefaults() { final RaftProperties properties = new RaftProperties(); final int expectedWorker = GrpcConfigKeys.Server.WORKER_GROUP_SIZE_DEFAULT; - Assertions.assertTrue(expectedWorker > 0, - "default worker threads should be positive, but got " + expectedWorker); Assertions.assertEquals(expectedWorker, GrpcConfigKeys.Server.workerGroupSize(properties)); Assertions.assertEquals(expectedWorker, GrpcConfigKeys.Client.workerGroupSize(properties)); + Assertions.assertEquals(0, expectedWorker); Assertions.assertEquals(0, GrpcConfigKeys.Server.bossGroupSize(properties)); Assertions.assertTrue(GrpcConfigKeys.useEpoll(properties)); } From 823b4872e391900bdb723599ca5d3583fd6f42c7 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Sun, 31 May 2026 13:42:50 +0800 Subject: [PATCH 8/8] RATIS-2529. Stabilize gRPC server metrics test --- .../java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java index b5247cf63d..d48ad39f30 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java @@ -356,16 +356,17 @@ void testRequestMetrics(MiniRaftClusterWithGrpc cluster) throws Exception { // Block stateMachine flush data, so that 2nd request will not be // completed, and so it will not be removed from pending request map. List clients = new ArrayList<>(); + final List> replies = new ArrayList<>(); try { RaftClient client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry()); clients.add(client); - client.async().send(new SimpleMessage("2nd Message")); + replies.add(client.async().send(new SimpleMessage("2nd Message"))); for (int i = 0; i < 10; i++) { client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry()); clients.add(client); - client.async().send(new SimpleMessage("message " + i)); + replies.add(client.async().send(new SimpleMessage("message " + i))); } // Because we have passed 11 requests, and the element queue size is 10. @@ -373,6 +374,7 @@ void testRequestMetrics(MiniRaftClusterWithGrpc cluster) throws Exception { .getNumRequestQueueLimitHits().getCount() == 1, 300, 5000); stateMachine.unblockFlushStateMachineData(); + RaftTestUtil.waitFor(() -> replies.stream().allMatch(CompletableFuture::isDone), 300, 5000); // Send a message with 1025kb , our byte size limit is 1024kb (1mb) , so it should fail // and byte size counter limit will be hit.