diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/NettyUtils.java similarity index 85% rename from ratis-netty/src/main/java/org/apache/ratis/netty/NettyUtils.java rename to ratis-common/src/main/java/org/apache/ratis/util/NettyUtils.java index 37666bf189..163c3d869f 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/NettyUtils.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.ratis.netty; +package org.apache.ratis.util; import org.apache.ratis.security.TlsConf; import org.apache.ratis.security.TlsConf.CertificatesConf; @@ -36,13 +36,14 @@ import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder; -import org.apache.ratis.util.ConcurrentUtils; -import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.thirdparty.io.netty.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.KeyManager; import javax.net.ssl.TrustManager; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -81,6 +82,39 @@ static EventLoopGroup newEventLoopGroup(String name, int size, boolean useEpoll) return new NioEventLoopGroup(size, ConcurrentUtils.newThreadFactory(name + "-")); } + static void shutdownGracefully(EventLoopGroup... groups) { + shutdownGracefully(CLOSE_TIMEOUT, groups); + } + + static void shutdownGracefully(TimeDuration awaitTime, EventLoopGroup... groups) { + if (groups == null || groups.length == 0) { + return; + } + + final List nonNullGroups = new ArrayList<>(groups.length); + final List> futures = new ArrayList<>(groups.length); + for (EventLoopGroup group : groups) { + if (group != null) { + nonNullGroups.add(group); + futures.add(group.shutdownGracefully()); + } + } + + for (int i = 0; i < futures.size(); i++) { + final EventLoopGroup group = nonNullGroups.get(i); + try { + if (!futures.get(i).await(awaitTime.getDuration(), awaitTime.getUnit())) { + LOG.warn("Failed to shut down EventLoopGroup {} in {}", group, awaitTime); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while shutting down EventLoopGroup {}", group, e); + } catch (Exception e) { + LOG.warn("Failed to shut down EventLoopGroup {} in {}", group, awaitTime, e); + } + } + } + static void setTrustManager(SslContextBuilder b, TrustManagerConf trustManagerConfig) { if (trustManagerConfig == null) { return; @@ -196,4 +230,4 @@ static void closeChannel(Channel channel, String name) { LOG.warn("closeChannel {} is not yet completed in {}", name, CLOSE_TIMEOUT); } } -} \ No newline at end of file +} diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java index f31794ac36..cd7ac10f32 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java @@ -51,6 +51,15 @@ static Consumer getDefaultLog() { String PREFIX = "raft.grpc"; + String USE_EPOLL_KEY = PREFIX + ".use-epoll"; + boolean USE_EPOLL_DEFAULT = true; + static boolean useEpoll(RaftProperties properties) { + return getBoolean(properties::getBoolean, USE_EPOLL_KEY, USE_EPOLL_DEFAULT, getDefaultLog()); + } + static void setUseEpoll(RaftProperties properties, boolean useEpoll) { + setBoolean(properties::setBoolean, USE_EPOLL_KEY, useEpoll); + } + interface TLS { String PREFIX = GrpcConfigKeys.PREFIX + ".tls"; @@ -155,6 +164,16 @@ static GrpcTlsConfig tlsConf(Parameters parameters) { static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) { parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS); } + + String WORKER_GROUP_SIZE_KEY = PREFIX + ".worker-group.size"; + int WORKER_GROUP_SIZE_DEFAULT = 0; + static int workerGroupSize(RaftProperties properties) { + return getInt(properties::getInt, WORKER_GROUP_SIZE_KEY, + WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(0), requireMax(65536)); + } + static void setWorkerGroupSize(RaftProperties properties, int size) { + setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, size); + } } interface Server { @@ -291,6 +310,26 @@ static int stubPoolSize(RaftProperties properties) { static void setStubPoolSize(RaftProperties properties, int size) { setInt(properties::setInt, STUB_POOL_SIZE_KEY, size); } + + String BOSS_GROUP_SIZE_KEY = PREFIX + ".boss-group.size"; + int BOSS_GROUP_SIZE_DEFAULT = 0; + static int bossGroupSize(RaftProperties properties) { + return getInt(properties::getInt, BOSS_GROUP_SIZE_KEY, + BOSS_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(0), requireMax(65536)); + } + static void setBossGroupSize(RaftProperties properties, int size) { + setInt(properties::setInt, BOSS_GROUP_SIZE_KEY, size); + } + + String WORKER_GROUP_SIZE_KEY = PREFIX + ".worker-group.size"; + int WORKER_GROUP_SIZE_DEFAULT = 0; + static int workerGroupSize(RaftProperties properties) { + return getInt(properties::getInt, WORKER_GROUP_SIZE_KEY, + WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(0), requireMax(65536)); + } + static void setWorkerGroupSize(RaftProperties properties, int size) { + setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, size); + } } String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max"; diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java index 1053cab80e..f3ca3f5faf 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java @@ -146,6 +146,6 @@ public GrpcClientRpc newRaftClientRpc(ClientId clientId, RaftProperties properti checkPooledByteBufAllocatorUseCacheForAllThreads(LOG::debug); final SslContexts forClient = forClientSupplier.get(); - return new GrpcClientRpc(clientId, properties, forClient.adminSslContext, forClient.clientSslContext); + return GrpcClientRpc.create(clientId, properties, forClient.adminSslContext, forClient.clientSslContext); } } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java index 0eaec6b962..e958c9e000 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java @@ -22,6 +22,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcUtil; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.grpc.metrics.intercept.client.MetricClientInterceptor; import org.apache.ratis.proto.RaftProtos.GroupInfoReplyProto; import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto; @@ -51,9 +52,12 @@ import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType; import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.MemoizedSupplier; +import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TimeoutExecutor; @@ -94,14 +98,16 @@ public class GrpcClientProtocolClient implements Closeable { private final AtomicReference unorderedStreamObservers = new AtomicReference<>(); private final MetricClientInterceptor metricClientInterceptor; + private final MemoizedSupplier clientWorkers; GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties, - SslContext adminSslContext, SslContext clientSslContext) { + SslContext adminSslContext, SslContext clientSslContext, MemoizedSupplier clientWorkers) { this.name = JavaUtils.memoize(() -> id + "->" + target.getId()); this.target = target; final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug); this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug); metricClientInterceptor = new MetricClientInterceptor(getName()); + this.clientWorkers = clientWorkers; final String clientAddress = Optional.ofNullable(target.getClientAddress()) .filter(x -> !x.isEmpty()).orElse(target.getAddress()); @@ -135,6 +141,12 @@ private ManagedChannel buildChannel(String address, SslContext sslContext, channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } + if (clientWorkers != null) { + final EventLoopGroup eventLoopGroup = clientWorkers.get(); + channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) + .eventLoopGroup(eventLoopGroup); + } + return channelBuilder.flowControlWindow(flowControlWindow.getSizeInt()) .maxInboundMessageSize(maxMessageSize.getSizeInt()) .intercept(metricClientInterceptor) @@ -156,6 +168,12 @@ public void close() { metricClientInterceptor.close(); } + EventLoopGroup getClientWorkersForTesting() { + Preconditions.assertTrue(clientWorkers != null); + Preconditions.assertTrue(clientWorkers.isInitialized()); + return clientWorkers.get(); + } + RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException { return blockingCall(() -> adminBlockingStub .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index 65175dc2a1..b12b5afe45 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -29,6 +29,7 @@ import org.apache.ratis.thirdparty.io.grpc.Status; import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto; import org.apache.ratis.proto.RaftProtos.GroupListRequestProto; import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto; @@ -41,6 +42,8 @@ import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.MemoizedSupplier; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.util.PeerProxyMap; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -53,18 +56,30 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -public class GrpcClientRpc extends RaftClientRpcWithProxy { +public final class GrpcClientRpc extends RaftClientRpcWithProxy { public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class); + public static GrpcClientRpc create(ClientId clientId, RaftProperties properties, + SslContext adminSslContext, SslContext clientSslContext) { + final int workerGroupSize = GrpcConfigKeys.Client.workerGroupSize(properties); + final MemoizedSupplier eventLoopGroup = workerGroupSize > 0 ? MemoizedSupplier.valueOf( + () -> NettyUtils.newEventLoopGroup( + clientId + "-client-workers", workerGroupSize, GrpcConfigKeys.useEpoll(properties))) : null; + return new GrpcClientRpc(clientId, properties, adminSslContext, clientSslContext, eventLoopGroup); + } + private final ClientId clientId; private final int maxMessageSize; private final TimeDuration requestTimeoutDuration; private final TimeDuration watchRequestTimeoutDuration; + private final MemoizedSupplier clientWorkers; - public GrpcClientRpc(ClientId clientId, RaftProperties properties, - SslContext adminSslContext, SslContext clientSslContext) { + private GrpcClientRpc(ClientId clientId, RaftProperties properties, + SslContext adminSslContext, SslContext clientSslContext, MemoizedSupplier clientWorkers) { super(new PeerProxyMap<>(clientId.toString(), - p -> new GrpcClientProtocolClient(clientId, p, properties, adminSslContext, clientSslContext))); + p -> new GrpcClientProtocolClient(clientId, p, properties, adminSslContext, clientSslContext, clientWorkers))); + this.clientWorkers = clientWorkers; + this.clientId = clientId; this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt(); this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties); @@ -213,6 +228,17 @@ private RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest reques return proto; } + @Override + public void close() { + try { + super.close(); + } finally { + if (clientWorkers != null && clientWorkers.isInitialized()) { + NettyUtils.shutdownGracefully(clientWorkers.get()); + } + } + } + @Override public boolean shouldReconnect(Throwable e) { final Throwable cause = e.getCause(); diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java index d2748c7be2..6dc0d32527 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java @@ -18,6 +18,7 @@ package org.apache.ratis.grpc.server; import org.apache.ratis.grpc.GrpcUtil; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.grpc.util.StreamObserverWithTimeout; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.util.ServerStringUtils; @@ -31,6 +32,7 @@ import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceBlockingStub; import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -57,12 +59,15 @@ class GrpcServerProtocolClient implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolClient.class); //visible for using in log / error messages AND to use in instrumented tests private final RaftPeerId raftPeerId; + private final EventLoopGroup eventLoopGroup; GrpcServerProtocolClient(RaftPeer target, int connections, int flowControlWindow, - TimeDuration requestTimeout, SslContext sslContext, boolean separateHBChannel) { + TimeDuration requestTimeout, SslContext sslContext, boolean separateHBChannel, + EventLoopGroup eventLoopGroup) { raftPeerId = target.getId(); LOG.info("Build channel for {}", target); useSeparateHBChannel = separateHBChannel; + this.eventLoopGroup = eventLoopGroup; channel = buildChannel(target, flowControlWindow, sslContext); blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel); asyncStub = RaftServerProtocolServiceGrpc.newStub(channel); @@ -75,7 +80,8 @@ class GrpcServerProtocolClient implements Closeable { } GrpcStubPool newGrpcStubPool(String address, SslContext sslContext, int connections) { - return new GrpcStubPool<>(connections, address, sslContext, RaftServerProtocolServiceGrpc::newStub, 16); + return new GrpcStubPool<>(connections, address, sslContext, RaftServerProtocolServiceGrpc::newStub, 16, + eventLoopGroup); } private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow, SslContext sslContext) { @@ -90,6 +96,10 @@ private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow, SslC channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } channelBuilder.disableRetry(); + if (eventLoopGroup != null) { + channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) + .eventLoopGroup(eventLoopGroup); + } return channelBuilder.flowControlWindow(flowControlWindow).build(); } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java index d554ca583a..3b31753b86 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java @@ -21,6 +21,7 @@ import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.metrics.MessageMetrics; import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.protocol.AdminAsynchronousProtocol; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; @@ -36,6 +37,7 @@ import org.apache.ratis.thirdparty.io.grpc.Server; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.proto.RaftProtos.*; @@ -115,6 +117,10 @@ public static final class Builder { private TimeDuration requestTimeoutDuration; private boolean separateHeartbeatChannel; + private EventLoopGroup serverBosses; + private EventLoopGroup serverWorkers; + private EventLoopGroup clientWorkers; + private Builder() {} public Builder setServer(RaftServer raftServer) { @@ -151,9 +157,9 @@ public Builder setCustomizer(Customizer customizer) { return this; } - private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer target) { + private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer target, EventLoopGroup eventLoopGroup) { return new GrpcServerProtocolClient(target, serverStubPoolSize, flowControlWindow.getSizeInt(), - requestTimeoutDuration, serverSslContextForClient, separateHeartbeatChannel); + requestTimeoutDuration, serverSslContextForClient, separateHeartbeatChannel, eventLoopGroup); } private ExecutorService newExecutor() { @@ -164,8 +170,7 @@ private ExecutorService newExecutor() { server.getId() + "-request-"); } - private GrpcClientProtocolService newGrpcClientProtocolService( - ExecutorService executor) { + private GrpcClientProtocolService newGrpcClientProtocolService(ExecutorService executor) { return new GrpcClientProtocolService(server::getId, server, executor); } @@ -202,6 +207,17 @@ private NettyServerBuilder newNettyServerBuilder(String hostname, int port, SslC .maxInboundMessageSize(messageSizeMax.getSizeInt()) .flowControlWindow(flowControlWindow.getSizeInt()); + final EventLoopGroup channelGroup = serverBosses != null ? serverBosses : serverWorkers; + if (channelGroup != null) { + nettyServerBuilder.channelType(NettyUtils.getServerChannelClass(channelGroup)); + } + if (serverBosses != null) { + nettyServerBuilder.bossEventLoopGroup(serverBosses); + } + if (serverWorkers != null) { + nettyServerBuilder.workerEventLoopGroup(serverWorkers); + } + if (sslContext != null) { LOG.info("Setting TLS for {}", address); nettyServerBuilder.sslContext(sslContext); @@ -235,7 +251,26 @@ Server newServer(GrpcClientProtocolService client, ServerInterceptor interceptor } public GrpcServicesImpl build() { - return new GrpcServicesImpl(this); + final RaftProperties props = server.getProperties(); + final String id = server.getId() + ""; + final boolean useEpoll = GrpcConfigKeys.useEpoll(props); + try { + final int bossGroupSize = GrpcConfigKeys.Server.bossGroupSize(props); + final int serverWorkerGroupSize = GrpcConfigKeys.Server.workerGroupSize(props); + if (bossGroupSize > 0 || serverWorkerGroupSize > 0) { + serverBosses = NettyUtils.newEventLoopGroup(id + "-boss", bossGroupSize, useEpoll); + serverWorkers = NettyUtils.newEventLoopGroup(id + "-server-workers", serverWorkerGroupSize, useEpoll); + } + + final int clientWorkerGroupSize = GrpcConfigKeys.Client.workerGroupSize(props); + if (clientWorkerGroupSize > 0) { + clientWorkers = NettyUtils.newEventLoopGroup(id + "-client-workers", clientWorkerGroupSize, useEpoll); + } + return new GrpcServicesImpl(this); + } catch (Throwable t) { + NettyUtils.shutdownGracefully(clientWorkers, serverWorkers, serverBosses); + throw t; + } } public Builder setAdminSslContext(SslContext adminSslContext) { @@ -274,9 +309,17 @@ public static Builder newBuilder() { private final GrpcClientProtocolService clientProtocolService; private final MetricServerInterceptor serverInterceptor; + private final EventLoopGroup serverBosses; + private final EventLoopGroup serverWorkers; + private final EventLoopGroup clientWorkers; private GrpcServicesImpl(Builder b) { - super(b.server::getId, id -> new PeerProxyMap<>(id.toString(), b::newGrpcServerProtocolClient)); + super(b.server::getId, id -> new PeerProxyMap<>(id.toString(), + peer -> b.newGrpcServerProtocolClient(peer, b.clientWorkers))); + + this.serverBosses = b.serverBosses; + this.serverWorkers = b.serverWorkers; + this.clientWorkers = b.clientWorkers; this.executor = b.newExecutor(); this.clientProtocolService = b.newGrpcClientProtocolService(executor); @@ -375,6 +418,8 @@ public void closeImpl() { } catch (IOException e) { LOG.warn("{}: Failed to close proxies", getId(), e); } + + NettyUtils.shutdownGracefully(clientWorkers, serverWorkers, serverBosses); } @Override diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java index 9667661d07..84a4651f9d 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java @@ -18,11 +18,13 @@ package org.apache.ratis.grpc.server; import org.apache.ratis.grpc.GrpcUtil; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType; import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; import org.apache.ratis.thirdparty.io.grpc.stub.AbstractStub; import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.ratis.thirdparty.io.netty.channel.WriteBufferWaterMark; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.util.MemoizedSupplier; @@ -41,7 +43,7 @@ final class GrpcStubPool> { public static final Logger LOG = LoggerFactory.getLogger(GrpcStubPool.class); - static ManagedChannel buildManagedChannel(String address, SslContext sslContext) { + static ManagedChannel buildManagedChannel(String address, SslContext sslContext, EventLoopGroup eventLoopGroup) { NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(address) .keepAliveTime(10, TimeUnit.MINUTES) .keepAliveWithoutCalls(false) @@ -53,6 +55,10 @@ static ManagedChannel buildManagedChannel(String address, SslContext sslContext) } else { channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } + if (eventLoopGroup != null) { + channelBuilder.channelType(NettyUtils.getSocketChannelClass(eventLoopGroup)) + .eventLoopGroup(eventLoopGroup); + } ManagedChannel ch = channelBuilder.build(); ch.getState(true); return ch; @@ -63,8 +69,9 @@ static final class Stub> { private final S stub; private final Semaphore permits; - Stub(String address, SslContext sslContext, Function stubFactory, int maxInflight) { - this.ch = buildManagedChannel(address, sslContext); + Stub(String address, SslContext sslContext, Function stubFactory, int maxInflight, + EventLoopGroup eventLoopGroup) { + this.ch = buildManagedChannel(address, sslContext, eventLoopGroup); this.stub = stubFactory.apply(ch); this.permits = new Semaphore(maxInflight); } @@ -85,11 +92,12 @@ void shutdown() { private final List>> pool; GrpcStubPool(int connections, String address, SslContext sslContext, Function stubFactory, - int maxInflightPerConn) { + int maxInflightPerConn, EventLoopGroup eventLoopGroup) { Preconditions.assertTrue(connections > 1, "connections must be > 1"); final List>> tmpPool = new ArrayList<>(connections); for (int i = 0; i < connections; i++) { - tmpPool.add(MemoizedSupplier.valueOf(() -> new Stub<>(address, sslContext, stubFactory, maxInflightPerConn))); + tmpPool.add(MemoizedSupplier.valueOf( + () -> new Stub<>(address, sslContext, stubFactory, maxInflightPerConn, eventLoopGroup))); } this.pool = Collections.unmodifiableList(tmpPool); } diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java index 56ca6b030c..3c908f67a6 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java @@ -27,6 +27,7 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.NetUtils; +import org.apache.ratis.util.NettyUtils; import java.io.Closeable; import java.net.InetSocketAddress; diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java index e72d6c6772..d2eb38859a 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java @@ -35,6 +35,7 @@ import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.util.PeerProxyMap; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java index 2bfeea31e1..5b673d5189 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java @@ -28,7 +28,7 @@ import org.apache.ratis.io.WriteOption; import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.netty.NettyDataStreamUtils; -import org.apache.ratis.netty.NettyUtils; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.protocol.ClientInvocationId; import org.apache.ratis.protocol.DataStreamReply; import org.apache.ratis.protocol.DataStreamRequest; diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index bc57343fb2..f7d2805e8a 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -20,7 +20,7 @@ import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.netty.NettyRpcProxy; -import org.apache.ratis.netty.NettyUtils; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.protocol.GroupInfoReply; import org.apache.ratis.protocol.GroupListReply; import org.apache.ratis.protocol.RaftClientReply; diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java index 24303d867e..643ed15a0f 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java @@ -27,7 +27,7 @@ import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf; import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.netty.NettyDataStreamUtils; -import org.apache.ratis.netty.NettyUtils; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.DataStreamPacket; diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java index 247a7bcac1..156ab410d3 100644 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java +++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamSslWithRpcTypeGrpcAndDataStreamTypeNetty.java @@ -19,7 +19,7 @@ import org.apache.ratis.conf.Parameters; import org.apache.ratis.netty.NettyConfigKeys; -import org.apache.ratis.netty.NettyUtils; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.security.SecurityTestUtils; import org.apache.ratis.security.TlsConf; import org.apache.ratis.util.JavaUtils; diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java new file mode 100644 index 0000000000..fdee06ebec --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcEventLoops.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.util.NettyUtils; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.ratis.thirdparty.io.netty.channel.epoll.Epoll; +import org.apache.ratis.thirdparty.io.netty.util.concurrent.EventExecutor; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestGrpcEventLoops extends BaseTest { + + @Test + public void testNewEventLoopGroupWithThreadCount() { + final int threads = 3; + final EventLoopGroup group = NettyUtils.newEventLoopGroup("test-elg", threads, false); + try { + Assertions.assertNotNull(group); + int count = 0; + for (EventExecutor ignored : group) { + count++; + } + Assertions.assertEquals(threads, count); + } finally { + NettyUtils.shutdownGracefully(group); + } + } + + @Test + public void testChannelTypeMatchesEpollAvailability() { + final EventLoopGroup group = NettyUtils.newEventLoopGroup("test-epoll", 1, true); + try { + if (Epoll.isAvailable()) { + Assertions.assertEquals("EpollServerSocketChannel", + NettyUtils.getServerChannelClass(group).getSimpleName()); + Assertions.assertEquals("EpollSocketChannel", + NettyUtils.getSocketChannelClass(group).getSimpleName()); + } else { + Assertions.assertEquals("NioServerSocketChannel", + NettyUtils.getServerChannelClass(group).getSimpleName()); + Assertions.assertEquals("NioSocketChannel", + NettyUtils.getSocketChannelClass(group).getSimpleName()); + } + } finally { + NettyUtils.shutdownGracefully(group); + } + } + + @Test + public void testConfigKeyDefaults() { + final RaftProperties properties = new RaftProperties(); + final int expectedWorker = GrpcConfigKeys.Server.WORKER_GROUP_SIZE_DEFAULT; + Assertions.assertEquals(expectedWorker, GrpcConfigKeys.Server.workerGroupSize(properties)); + Assertions.assertEquals(expectedWorker, GrpcConfigKeys.Client.workerGroupSize(properties)); + Assertions.assertEquals(0, expectedWorker); + Assertions.assertEquals(0, GrpcConfigKeys.Server.bossGroupSize(properties)); + Assertions.assertTrue(GrpcConfigKeys.useEpoll(properties)); + } + + @Test + public void testConfigKeyRoundtrip() { + final RaftProperties properties = new RaftProperties(); + GrpcConfigKeys.Server.setWorkerGroupSize(properties, 4); + GrpcConfigKeys.Server.setBossGroupSize(properties, 1); + GrpcConfigKeys.Client.setWorkerGroupSize(properties, 2); + GrpcConfigKeys.setUseEpoll(properties, false); + Assertions.assertEquals(4, GrpcConfigKeys.Server.workerGroupSize(properties)); + Assertions.assertEquals(1, GrpcConfigKeys.Server.bossGroupSize(properties)); + Assertions.assertEquals(2, GrpcConfigKeys.Client.workerGroupSize(properties)); + Assertions.assertFalse(GrpcConfigKeys.useEpoll(properties)); + } + + @Test + public void testShutdownNullIsNoop() { + NettyUtils.shutdownGracefully((EventLoopGroup) null); + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java new file mode 100644 index 0000000000..6b85e61d1d --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcWorkerEventLoopThreads.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftClientReply; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.apache.ratis.RaftTestUtil.waitForLeader; + +/** + * Verify a cluster comes up and processes requests when the worker + * event-loop thread count is capped via + * {@link GrpcConfigKeys.Server#WORKER_GROUP_SIZE_KEY} and + * {@link GrpcConfigKeys.Client#WORKER_GROUP_SIZE_KEY}. + * + *

Regression: RATIS-2529 — gRPC worker threads permanently inflate to + * {@code availableProcessors * 2} after follower restart catch-up. + */ +public class TestGrpcWorkerEventLoopThreads extends BaseTest { + + @Test + public void testClusterWithCappedWorkerEventLoopThreads() throws Exception { + final String[] ids = {"s0", "s1", "s2"}; + final RaftProperties properties = new RaftProperties(); + GrpcConfigKeys.Server.setWorkerGroupSize(properties, 2); + GrpcConfigKeys.Client.setWorkerGroupSize(properties, 1); + + try (MiniRaftClusterWithGrpc cluster = new MiniRaftClusterWithGrpc(ids, properties, null)) { + cluster.start(); + waitForLeader(cluster); + try (RaftClient client = cluster.createClient()) { + final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("hello")); + Assertions.assertTrue(reply.isSuccess()); + } + } + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java index b5247cf63d..d48ad39f30 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java @@ -356,16 +356,17 @@ void testRequestMetrics(MiniRaftClusterWithGrpc cluster) throws Exception { // Block stateMachine flush data, so that 2nd request will not be // completed, and so it will not be removed from pending request map. List clients = new ArrayList<>(); + final List> replies = new ArrayList<>(); try { RaftClient client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry()); clients.add(client); - client.async().send(new SimpleMessage("2nd Message")); + replies.add(client.async().send(new SimpleMessage("2nd Message"))); for (int i = 0; i < 10; i++) { client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry()); clients.add(client); - client.async().send(new SimpleMessage("message " + i)); + replies.add(client.async().send(new SimpleMessage("message " + i))); } // Because we have passed 11 requests, and the element queue size is 10. @@ -373,6 +374,7 @@ void testRequestMetrics(MiniRaftClusterWithGrpc cluster) throws Exception { .getNumRequestQueueLimitHits().getCount() == 1, 300, 5000); stateMachine.unblockFlushStateMachineData(); + RaftTestUtil.waitFor(() -> replies.stream().allMatch(CompletableFuture::isDone), 300, 5000); // Send a message with 1025kb , our byte size limit is 1024kb (1mb) , so it should fail // and byte size counter limit will be hit. diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java b/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java new file mode 100644 index 0000000000..76c8475450 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/client/TestGrpcClientEventLoops.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.client; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.ratis.thirdparty.io.netty.util.concurrent.EventExecutor; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +public class TestGrpcClientEventLoops extends BaseTest { + @Test + public void testClientWorkerEventLoopGroupSharedByPeerProxies() throws Exception { + final RaftProperties properties = new RaftProperties(); + GrpcConfigKeys.Client.setWorkerGroupSize(properties, 1); + GrpcConfigKeys.setUseEpoll(properties, false); + + final RaftPeer p0 = newPeer("s0", 15000); + final RaftPeer p1 = newPeer("s1", 15001); + EventLoopGroup clientWorkers = null; + try (GrpcClientRpc rpc = GrpcClientRpc.create(ClientId.randomId(), properties, null, null)) { + rpc.addRaftPeers(Arrays.asList(p0, p1)); + final GrpcClientProtocolClient c0 = rpc.getProxies().getProxy(p0.getId()); + final GrpcClientProtocolClient c1 = rpc.getProxies().getProxy(p1.getId()); + + clientWorkers = c0.getClientWorkersForTesting(); + Assertions.assertSame(clientWorkers, c1.getClientWorkersForTesting()); + Assertions.assertEquals(1, countEventExecutors(clientWorkers)); + Assertions.assertFalse(clientWorkers.isShuttingDown()); + } + + Assertions.assertNotNull(clientWorkers); + Assertions.assertTrue(clientWorkers.isShuttingDown() || clientWorkers.isShutdown() || clientWorkers.isTerminated()); + } + + private static RaftPeer newPeer(String id, int port) { + return RaftPeer.newBuilder().setId(id).setAddress("127.0.0.1:" + port).build(); + } + + private static int countEventExecutors(EventLoopGroup group) { + int count = 0; + for (EventExecutor ignored : group) { + count++; + } + return count; + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java index 2507220a8e..6b83ab7115 100644 --- a/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java +++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestNettyRpcProxy.java @@ -34,6 +34,7 @@ import org.apache.ratis.thirdparty.io.netty.channel.SimpleChannelInboundHandler; import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.NettyUtils; import org.junit.jupiter.api.Test; import java.lang.reflect.Field; diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestTlsConfWithNetty.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestTlsConfWithNetty.java index dbdcf1ebd8..5d73d9ea3a 100644 --- a/ratis-test/src/test/java/org/apache/ratis/netty/TestTlsConfWithNetty.java +++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestTlsConfWithNetty.java @@ -38,6 +38,7 @@ import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.NettyUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; diff --git a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/TestSecureRatisShell.java b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/TestSecureRatisShell.java index 21e9fe229a..7b7de79d21 100644 --- a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/TestSecureRatisShell.java +++ b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/TestSecureRatisShell.java @@ -24,7 +24,7 @@ import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.grpc.MiniRaftClusterWithGrpc; -import org.apache.ratis.netty.NettyUtils; +import org.apache.ratis.util.NettyUtils; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.security.SecurityTestUtils; import org.apache.ratis.util.Slf4jUtils;