diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java index e70ac92340..a98aa0be3a 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java @@ -26,6 +26,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -40,7 +42,6 @@ import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.Node; import com.alipay.sofa.jraft.RaftGroupService; -import com.alipay.sofa.jraft.ReplicatorGroup; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.conf.Configuration; import com.alipay.sofa.jraft.core.Replicator; @@ -48,13 +49,11 @@ import com.alipay.sofa.jraft.entity.Task; import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.option.NodeOptions; -import com.alipay.sofa.jraft.option.RaftOptions; import com.alipay.sofa.jraft.option.RpcOptions; import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; import com.alipay.sofa.jraft.rpc.RpcServer; import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer; import com.alipay.sofa.jraft.util.Endpoint; -import com.alipay.sofa.jraft.util.ThreadId; import com.alipay.sofa.jraft.util.internal.ThrowUtil; import io.netty.channel.ChannelHandler; @@ -86,8 +85,12 @@ public synchronized boolean init(PDConfig.Raft config) { } this.config = config; + // Wire configured rpc timeout into RaftRpcClient so the Bolt transport + // timeout and the future.get() caller timeout in getLeaderGrpcAddress() are consistent. raftRpcClient = new RaftRpcClient(); - raftRpcClient.init(new RpcOptions()); + RpcOptions rpcOptions = new RpcOptions(); + rpcOptions.setRpcDefaultTimeout(config.getRpcTimeout()); + raftRpcClient.init(rpcOptions); String raftPath = config.getDataPath() + "/" + groupId; new File(raftPath).mkdirs(); @@ -119,8 +122,7 @@ public synchronized boolean init(PDConfig.Raft config) { nodeOptions.setRpcConnectTimeoutMs(config.getRpcTimeout()); nodeOptions.setRpcDefaultTimeout(config.getRpcTimeout()); nodeOptions.setRpcInstallSnapshotTimeout(config.getRpcTimeout()); - // Set the raft configuration - RaftOptions raftOptions = nodeOptions.getRaftOptions(); + // TODO: tune RaftOptions for PD (see hugegraph-store PartitionEngine for reference) nodeOptions.setEnableMetrics(true); @@ -228,7 +230,7 @@ public PeerId getLeader() { } /** - * Send a message to the leader to get the grpc address; + * Send a message to the leader to get the grpc address. */ public String getLeaderGrpcAddress() throws ExecutionException, InterruptedException { if (isLeader()) { @@ -236,11 +238,34 @@ public String getLeaderGrpcAddress() throws ExecutionException, InterruptedExcep } if (raftNode.getLeaderId() == null) { - waitingForLeader(10000); + waitingForLeader(config.getRpcTimeout()); } - return raftRpcClient.getGrpcAddress(raftNode.getLeaderId().getEndpoint().toString()).get() - .getGrpcAddress(); + // Cache leader to avoid repeated getLeaderId() calls and guard against + // waitingForLeader() returning without a leader being elected. + PeerId leader = raftNode.getLeaderId(); + if (leader == null) { + throw new ExecutionException(new IllegalStateException("Leader is not ready")); + } + + try { + RaftRpcProcessor.GetMemberResponse response = raftRpcClient + .getGrpcAddress(leader.getEndpoint().toString()) + .get(config.getRpcTimeout(), TimeUnit.MILLISECONDS); + if (response != null && response.getGrpcAddress() != null) { + return response.getGrpcAddress(); + } + } catch (TimeoutException | ExecutionException e) { + // TODO: a more complete fix would need a source of truth for the leader's + // actual grpcAddress rather than deriving it from the local node's port config. + throw new ExecutionException( + String.format("Failed to resolve leader gRPC address for %s", leader), e); + } + + log.warn("Leader gRPC address field is null in RPC response for {}", leader); + throw new ExecutionException( + new IllegalStateException( + String.format("Leader gRPC address unavailable for %s", leader))); } /** @@ -366,7 +391,8 @@ private boolean peerEquals(PeerId p1, PeerId p2) { if (p1 == null || p2 == null) { return false; } - return Objects.equals(p1.getIp(), p2.getIp()) && Objects.equals(p1.getPort(), p2.getPort()); + return Objects.equals(p1.getIp(), p2.getIp()) && + Objects.equals(p1.getPort(), p2.getPort()); } private Replicator.State getReplicatorState(PeerId peerId) {