From ab07122cb006899f7728ee650540459ab95c445c Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Fri, 29 May 2026 13:27:39 +0800 Subject: [PATCH] RATIS-2547. Advance repliedIndex before completing write replies --- .../ratis/server/impl/LeaderStateImpl.java | 7 ++- .../ratis/server/impl/LeaderTracer.java | 5 +- .../ratis/server/impl/ReplyFlusher.java | 52 ++++++++++++++----- .../ratis/server/impl/TestReplyFlusher.java | 42 +++++++++++++++ 4 files changed, 87 insertions(+), 19 deletions(-) create mode 100644 ratis-server/src/test/java/org/apache/ratis/server/impl/TestReplyFlusher.java diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 5e5cdaee25..7e4c97e82b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -1252,18 +1252,17 @@ void replyPendingRequest(TermIndex termIndex, RaftClientReply reply, RetryCacheI final PendingRequest pending = pendingRequests.remove(termIndex); leaderTracer.removePendingRequest(pending); - final LongSupplier replyMethod = () -> { + final Runnable replyMethod = () -> { cacheEntry.updateResult(reply); if (pending != null) { pending.setReply(reply); } - return termIndex.getIndex(); }; if (readIndexType == Type.REPLIED_INDEX) { - replyFlusher.hold(replyMethod); + replyFlusher.hold(termIndex.getIndex(), replyMethod); } else { - replyMethod.getAsLong(); + replyMethod.run(); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderTracer.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderTracer.java index 0f4a43e055..766566cb53 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderTracer.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderTracer.java @@ -81,6 +81,9 @@ void tracePendingRequest(PendingRequest pending) { } void removePendingRequest(PendingRequest pending) { + if (pending == null) { + return; + } appendEntriesSpans.remove(pending.getTermIndex().getIndex()); } @@ -96,4 +99,4 @@ SpanContextProto traceAppendEntries(List entries) { void close() { appendEntriesSpans.clear(); } -} \ No newline at end of file +} diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java index 47e9967c11..109fbe5cf8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java @@ -28,7 +28,6 @@ import java.util.LinkedList; import java.util.concurrent.TimeUnit; -import java.util.function.LongSupplier; /** * Implements the reply flush logic as part of the leader batch write when RepliedIndex is used. @@ -39,16 +38,33 @@ public class ReplyFlusher { private static final String CLASS_NAME = JavaUtils.getClassSimpleName(RaftServerImpl.class); public static final String FLUSH = CLASS_NAME + ".flush"; + private static class HeldReply { + private final long logIndex; + private final Runnable replyMethod; + + HeldReply(long logIndex, Runnable replyMethod) { + this.logIndex = logIndex; + this.replyMethod = replyMethod; + } + + long getLogIndex() { + return logIndex; + } + + void complete() { + replyMethod.run(); + } + } + static class Replies { - /** When a {@link LongSupplier} is invoked, it completes a write reply and return the log index. */ - private LinkedList list = new LinkedList<>(); + private LinkedList list = new LinkedList<>(); - synchronized void add(LongSupplier replyMethod) { - list.add(replyMethod); + synchronized void add(HeldReply reply) { + list.add(reply); } - synchronized LinkedList getAndSetNewList() { - final LinkedList old = list; + synchronized LinkedList getAndSetNewList() { + final LinkedList old = list; list = new LinkedList<>(); return old; } @@ -79,8 +95,8 @@ long getRepliedIndex() { } /** Hold a write reply for later batch flushing */ - void hold(LongSupplier replyMethod) { - replies.add(replyMethod); + void hold(long logIndex, Runnable replyMethod) { + replies.add(new HeldReply(logIndex, replyMethod)); } void start(long startIndex) { @@ -112,16 +128,24 @@ private void run() { private void flush() { CodeInjectionForTesting.execute(FLUSH, id, null); - final LinkedList toFlush = replies.getAndSetNewList(); + final LinkedList toFlush = replies.getAndSetNewList(); if (toFlush.isEmpty()) { return; } - long maxIndex = toFlush.removeLast().getAsLong(); - for (LongSupplier held : toFlush) { - maxIndex = Math.max(maxIndex, held.getAsLong()); + + final int numReplies = toFlush.size(); + final HeldReply last = toFlush.removeLast(); + long maxIndex = last.getLogIndex(); + for (HeldReply held : toFlush) { + maxIndex = Math.max(maxIndex, held.getLogIndex()); } repliedIndex.updateToMax(maxIndex, s -> - LOG.debug("{}: flushed {} replies, {}", id, toFlush.size(), s)); + LOG.debug("{}: flushed {} replies, {}", id, numReplies, s)); + + last.complete(); + for (HeldReply held : toFlush) { + held.complete(); + } } /** Stop the reply flusher daemon. */ diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReplyFlusher.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReplyFlusher.java new file mode 100644 index 0000000000..2be5c75f75 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReplyFlusher.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.util.TimeDuration; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class TestReplyFlusher { + @Test + public void testAdvanceRepliedIndexBeforeCompletingReplies() throws Exception { + final ReplyFlusher flusher = new ReplyFlusher(getClass().getSimpleName(), 0, + TimeDuration.valueOf(10, TimeUnit.MILLISECONDS)); + final CompletableFuture repliedIndexWhenCompleted = new CompletableFuture<>(); + + flusher.hold(7, () -> repliedIndexWhenCompleted.complete(flusher.getRepliedIndex())); + flusher.start(0); + try { + Assertions.assertEquals(7, repliedIndexWhenCompleted.get(5, TimeUnit.SECONDS)); + } finally { + flusher.stop(); + } + } +}