From 892540829a92ab932ed29e26ffbc58098fea92d4 Mon Sep 17 00:00:00 2001 From: bitflicker64 Date: Thu, 5 Mar 2026 13:42:24 +0530 Subject: [PATCH 1/2] fix(pd): add timeout and null-safety to getLeaderGrpcAddress() The bolt RPC call in getLeaderGrpcAddress() returns null in Docker bridge network mode, causing NPE when a follower PD node attempts to discover the leader's gRPC address. This breaks store registration and partition distribution when any node other than pd0 wins the raft leader election. Add a bounded timeout using the configured rpc-timeout, null-check the RPC response, and fall back to deriving the address from the raft endpoint IP when the RPC fails. Closes apache/hugegraph#2959 --- .../apache/hugegraph/pd/raft/RaftEngine.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java index e70ac92340..494ca674d7 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java @@ -26,6 +26,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -239,8 +241,20 @@ public String getLeaderGrpcAddress() throws ExecutionException, InterruptedExcep waitingForLeader(10000); } - return raftRpcClient.getGrpcAddress(raftNode.getLeaderId().getEndpoint().toString()).get() - .getGrpcAddress(); + try { + RaftRpcProcessor.GetMemberResponse response = raftRpcClient + .getGrpcAddress(raftNode.getLeaderId().getEndpoint().toString()) + .get(config.getRpcTimeout(), TimeUnit.MILLISECONDS); + if (response != null && response.getGrpcAddress() != null) { + return response.getGrpcAddress(); + } + } catch (TimeoutException | ExecutionException e) { + log.warn("Failed to get leader gRPC address via RPC, falling back to endpoint derivation", e); + } + + // Fallback: derive from raft endpoint IP + local gRPC port (best effort) + String leaderIp = raftNode.getLeaderId().getEndpoint().getIp(); + return leaderIp + ":" + config.getGrpcPort(); } /** From 1ed7379998cd2f4ae97a8e4f364f5692b7656a40 Mon Sep 17 00:00:00 2001 From: bitflicker64 Date: Thu, 12 Mar 2026 16:26:49 +0530 Subject: [PATCH 2/2] fix: resolve NPE and timeout issues in RaftEngine.getLeaderGrpcAddress() - Cache leader PeerId after waitingForLeader() and null-check to avoid NPE when leader election times out - Remove incorrect fallback that derived leader gRPC address from local node's port, causing silent misroutes in multi-node clusters - Wire config.getRpcTimeout() into RaftRpcClient's RpcOptions so Bolt transport timeout is consistent with future.get() caller timeout - Replace hardcoded 10000ms in waitingForLeader() with config.getRpcTimeout() - Remove unused RaftOptions variable and dead imports (ReplicatorGroup, ThreadId) Fixes #2959 Related to #2952, #2962 --- .../apache/hugegraph/pd/raft/RaftEngine.java | 40 ++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java index 494ca674d7..a98aa0be3a 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java @@ -42,7 +42,6 @@ import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.Node; import com.alipay.sofa.jraft.RaftGroupService; -import com.alipay.sofa.jraft.ReplicatorGroup; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.conf.Configuration; import com.alipay.sofa.jraft.core.Replicator; @@ -50,13 +49,11 @@ import com.alipay.sofa.jraft.entity.Task; import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.option.NodeOptions; -import com.alipay.sofa.jraft.option.RaftOptions; import com.alipay.sofa.jraft.option.RpcOptions; import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; import com.alipay.sofa.jraft.rpc.RpcServer; import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer; import com.alipay.sofa.jraft.util.Endpoint; -import com.alipay.sofa.jraft.util.ThreadId; import com.alipay.sofa.jraft.util.internal.ThrowUtil; import io.netty.channel.ChannelHandler; @@ -88,8 +85,12 @@ public synchronized boolean init(PDConfig.Raft config) { } this.config = config; + // Wire configured rpc timeout into RaftRpcClient so the Bolt transport + // timeout and the future.get() caller timeout in getLeaderGrpcAddress() are consistent. raftRpcClient = new RaftRpcClient(); - raftRpcClient.init(new RpcOptions()); + RpcOptions rpcOptions = new RpcOptions(); + rpcOptions.setRpcDefaultTimeout(config.getRpcTimeout()); + raftRpcClient.init(rpcOptions); String raftPath = config.getDataPath() + "/" + groupId; new File(raftPath).mkdirs(); @@ -121,8 +122,7 @@ public synchronized boolean init(PDConfig.Raft config) { nodeOptions.setRpcConnectTimeoutMs(config.getRpcTimeout()); nodeOptions.setRpcDefaultTimeout(config.getRpcTimeout()); nodeOptions.setRpcInstallSnapshotTimeout(config.getRpcTimeout()); - // Set the raft configuration - RaftOptions raftOptions = nodeOptions.getRaftOptions(); + // TODO: tune RaftOptions for PD (see hugegraph-store PartitionEngine for reference) nodeOptions.setEnableMetrics(true); @@ -230,7 +230,7 @@ public PeerId getLeader() { } /** - * Send a message to the leader to get the grpc address; + * Send a message to the leader to get the grpc address. */ public String getLeaderGrpcAddress() throws ExecutionException, InterruptedException { if (isLeader()) { @@ -238,23 +238,34 @@ public String getLeaderGrpcAddress() throws ExecutionException, InterruptedExcep } if (raftNode.getLeaderId() == null) { - waitingForLeader(10000); + waitingForLeader(config.getRpcTimeout()); + } + + // Cache leader to avoid repeated getLeaderId() calls and guard against + // waitingForLeader() returning without a leader being elected. + PeerId leader = raftNode.getLeaderId(); + if (leader == null) { + throw new ExecutionException(new IllegalStateException("Leader is not ready")); } try { RaftRpcProcessor.GetMemberResponse response = raftRpcClient - .getGrpcAddress(raftNode.getLeaderId().getEndpoint().toString()) + .getGrpcAddress(leader.getEndpoint().toString()) .get(config.getRpcTimeout(), TimeUnit.MILLISECONDS); if (response != null && response.getGrpcAddress() != null) { return response.getGrpcAddress(); } } catch (TimeoutException | ExecutionException e) { - log.warn("Failed to get leader gRPC address via RPC, falling back to endpoint derivation", e); + // TODO: a more complete fix would need a source of truth for the leader's + // actual grpcAddress rather than deriving it from the local node's port config. + throw new ExecutionException( + String.format("Failed to resolve leader gRPC address for %s", leader), e); } - // Fallback: derive from raft endpoint IP + local gRPC port (best effort) - String leaderIp = raftNode.getLeaderId().getEndpoint().getIp(); - return leaderIp + ":" + config.getGrpcPort(); + log.warn("Leader gRPC address field is null in RPC response for {}", leader); + throw new ExecutionException( + new IllegalStateException( + String.format("Leader gRPC address unavailable for %s", leader))); } /** @@ -380,7 +391,8 @@ private boolean peerEquals(PeerId p1, PeerId p2) { if (p1 == null || p2 == null) { return false; } - return Objects.equals(p1.getIp(), p2.getIp()) && Objects.equals(p1.getPort(), p2.getPort()); + return Objects.equals(p1.getIp(), p2.getIp()) && + Objects.equals(p1.getPort(), p2.getPort()); } private Replicator.State getReplicatorState(PeerId peerId) {