From ee03331171bc8ff000e6b7be25d518886354a537 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Fri, 29 May 2026 13:31:47 +0800 Subject: [PATCH 1/6] RATIS-2548. Stabilize timing-sensitive Ratis tests --- .../InstallSnapshotNotificationTests.java | 2 +- .../apache/ratis/LinearizableReadTests.java | 32 ++++++++++-- .../java/org/apache/ratis/RaftBasicTests.java | 50 ++++++++++--------- .../apache/ratis/RaftLogTruncateTests.java | 16 +++--- ...tLinearizableReadRepliedIndexWithGrpc.java | 9 ++-- .../sh/ElectionCommandIntegrationTest.java | 11 ++-- 6 files changed, 76 insertions(+), 44 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java index 931bf6317f..47e87352cf 100644 --- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java @@ -309,7 +309,7 @@ private void testRestartFollower(CLUSTER cluster) throws Exception { LOG.info("{}: newLeaderNextIndex = {}", leaderId, newLeaderNextIndex); Assertions.assertTrue(newLeaderNextIndex > oldLeaderNextIndex); Assertions.assertEquals(newLeaderNextIndex, follower.getRaftLog().getNextIndex()); - }, 10, ONE_SECOND, "followerNextIndex", LOG); + }, 60, ONE_SECOND, "followerNextIndex", LOG); } @Test diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 832457e9de..b0397e894b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -20,6 +20,8 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.exceptions.ReadIndexException; @@ -30,6 +32,7 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; import org.apache.ratis.server.impl.MiniRaftCluster; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.function.CheckedConsumer; @@ -179,8 +182,28 @@ public void testFollowerLinearizableReadParallel() throws Exception { runWithNewCluster(LinearizableReadTests::runTestFollowerReadOnlyParallel); } + private static long getLogEntryIndex(RaftServer.Division leader, Message message, long startIndex) throws Exception { + final long nextIndex = leader.getRaftLog().getNextIndex(); + for (long index = startIndex; index < nextIndex; index++) { + final LogEntryProto entry = leader.getRaftLog().get(index); + if (entry != null && entry.hasStateMachineLogEntry() + && message.getContent().equals(entry.getStateMachineLogEntry().getLogData())) { + return index; + } + } + throw new AssertionError("Failed to find " + message + " from index " + startIndex + " to " + nextIndex); + } + + private static void waitForCommitIndex(RaftServer.Division leader, long index) throws Exception { + JavaUtils.attempt(() -> { + final long commitIndex = leader.getRaftLog().getLastCommittedIndex(); + Assertions.assertTrue(commitIndex >= index, () -> "commitIndex=" + commitIndex + " < index=" + index); + }, 10, HUNDRED_MILLIS, "commitIndex >= " + index, null); + } + static void runTestFollowerReadOnlyParallel(C cluster) throws Exception { - final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); + final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = leader.getId(); final List followers = cluster.getFollowers(); Assertions.assertEquals(2, followers.size()); @@ -199,9 +222,12 @@ static void runTestFollowerReadOnlyParallel(C cluste assertReplyExact(count, leaderClient.io().send(INCREMENT)); count++; + final long nextIndex = leader.getRaftLog().getNextIndex(); writeReplies.add(new Reply(count, leaderClient.async().send(WAIT_AND_INCREMENT))); - // sleep to let the commitIndex/appliedIndex get updated. - Thread.sleep(100); + final long waitAndIncrementIndex = JavaUtils.attemptRepeatedly( + () -> getLogEntryIndex(leader, WAIT_AND_INCREMENT, nextIndex), + 10, HUNDRED_MILLIS, "WAIT_AND_INCREMENT entry", null); + waitForCommitIndex(leader, waitAndIncrementIndex); // WAIT_AND_INCREMENT will delay 500ms to update the count, the read must wait for it. assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index c71b57e826..8e008990da 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -135,38 +135,40 @@ static void runTestBasicAppendEntries( killAndRestartLeader = CompletableFuture.completedFuture(null); } - log.info(cluster.printServers()); - final SimpleMessage[] messages = SimpleMessage.create(numMessages); - try (final RaftClient client = cluster.createClient()) { - final AtomicInteger asyncReplyCount = new AtomicInteger(); - final CompletableFuture f = new CompletableFuture<>(); + try { + log.info(cluster.printServers()); + + try (final RaftClient client = cluster.createClient()) { + final AtomicInteger asyncReplyCount = new AtomicInteger(); + final CompletableFuture f = new CompletableFuture<>(); - for (SimpleMessage message : messages) { + for (SimpleMessage message : messages) { + if (async) { + client.async().send(message).thenAcceptAsync(reply -> { + if (!reply.isSuccess()) { + f.completeExceptionally( + new AssertionError("Failed with reply " + reply)); + } else if (asyncReplyCount.incrementAndGet() == messages.length) { + f.complete(null); + } + }); + } else { + final RaftClientReply reply = client.io().send(message); + Assertions.assertTrue(reply.isSuccess()); + } + } if (async) { - client.async().send(message).thenAcceptAsync(reply -> { - if (!reply.isSuccess()) { - f.completeExceptionally( - new AssertionError("Failed with reply " + reply)); - } else if (asyncReplyCount.incrementAndGet() == messages.length) { - f.complete(null); - } - }); - } else { - final RaftClientReply reply = client.io().send(message); - Assertions.assertTrue(reply.isSuccess()); + f.join(); + Assertions.assertEquals(messages.length, asyncReplyCount.get()); } } - if (async) { - f.join(); - Assertions.assertEquals(messages.length, asyncReplyCount.get()); - } + Thread.sleep(cluster.getTimeoutMax().toIntExact(TimeUnit.MILLISECONDS) + 100); + } finally { + CompletableFuture.allOf(killAndRestartFollower, killAndRestartLeader).join(); } - Thread.sleep(cluster.getTimeoutMax().toIntExact(TimeUnit.MILLISECONDS) + 100); log.info(cluster.printAllLogs()); - killAndRestartFollower.join(); - killAndRestartLeader.join(); final List divisions = cluster.getServerAliveStream().collect(Collectors.toList()); diff --git a/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java b/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java index 9ea78e47cd..5c47536881 100644 --- a/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java +++ b/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java @@ -142,7 +142,7 @@ void runTestLogTruncate(MiniRaftCluster cluster, RaftServer.Division oldLeader, for (RaftPeer peer : cluster.getGroup().getPeers()) { final RaftServer.Division division = cluster.getDivision(peer.getId()); assertLogEntries(division, oldLeaderTerm, firstBatch); - assertEmptyTransactionContextMap(division); + waitForEmptyTransactionContextMap(division); } // kill a majority of followers @@ -221,15 +221,12 @@ void runTestLogTruncate(MiniRaftCluster cluster, RaftServer.Division oldLeader, for (RaftPeer peer : cluster.getGroup().getPeers()) { final RaftServer.Division division = cluster.getDivision(peer.getId()); assertLogEntries(division, oldLeaderTerm, expectedMessages); - final String name = "assertEmptyTransactionContextMap:" + division.getId(); - JavaUtils.attempt(() -> assertEmptyTransactionContextMap(division), - 10, HUNDRED_MILLIS, name, LOG); - - } + waitForEmptyTransactionContextMap(division); + } if (!exceptions.isEmpty()) { LOG.info("{} exceptions", exceptions.size()); - for(int i = 0 ; i < exceptions.size(); i++) { + for (int i = 0; i < exceptions.size(); i++) { LOG.info("exception {})", i, exceptions.get(i)); } Assertions.fail(); @@ -241,6 +238,11 @@ static void assertEmptyTransactionContextMap(RaftServer.Division d) { Assertions.assertTrue(map.isEmpty(), () -> d.getId() + " TransactionContextMap is non-empty: " + map); } + void waitForEmptyTransactionContextMap(RaftServer.Division d) throws InterruptedException { + final String name = "assertEmptyTransactionContextMap:" + d.getId(); + JavaUtils.attempt(() -> assertEmptyTransactionContextMap(d), 10, HUNDRED_MILLIS, name, LOG); + } + static void assertEntriesInTransactionContextMap(RaftServer.Division division, SimpleMessage[] existing, SimpleMessage[] nonExisting) { final RaftLog log = division.getRaftLog(); diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java index f08346fc02..795257d1fb 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java @@ -26,6 +26,7 @@ import org.apache.ratis.server.impl.MiniRaftCluster; import org.apache.ratis.server.impl.ReplyFlusher; import org.apache.ratis.util.CodeInjectionForTesting; +import org.apache.ratis.util.JavaUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -89,11 +90,9 @@ static void runTestFollowerReadOnlyParallelRepliedIn f0Replies.add(new Reply(0, f0Client.async().sendReadOnly(QUERY, f0))); f1Replies.add(new Reply(0, f1Client.async().sendReadOnly(QUERY, f1))); - // sleep in order to make sure - // (1) the count is incremented, and - // (2) the reads will wait for the repliedIndex. - Thread.sleep(100); - assertEquals(count, leaderStateMachine.getCount()); + // Wait until the leader state machine has applied the write while the ReplyFlusher remains blocked. + JavaUtils.attempt(() -> assertEquals(count, leaderStateMachine.getCount()), + 10, HUNDRED_MILLIS, "leaderStateMachine count " + count, null); } for (int i = 0; i < n; i++) { diff --git a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java index a61bb19f73..68c1ff5d4f 100644 --- a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java +++ b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java @@ -150,13 +150,16 @@ public void testElectionStepDownCommand() throws Exception { void runTestElectionStepDownCommand(MiniRaftCluster cluster) throws Exception { final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); String sb = getClusterAddress(cluster); - RaftServer.Division newLeader = cluster.getFollowers().get(0); final StringPrintStream out = new StringPrintStream(); RatisShell shell = new RatisShell(out.getPrintStream()); - Assertions.assertNotEquals(cluster.getLeader().getId(), newLeader.getId()); Assertions.assertEquals(2, cluster.getFollowers().size()); - int ret = shell.run("election", "stepDown", "-peers", sb.toString()); + int ret = shell.run("election", "pause", "-peers", sb.toString(), "-address", + leader.getPeer().getAddress()); + Assertions.assertEquals(0, ret); + + ret = shell.run("election", "stepDown", "-peers", sb.toString()); Assertions.assertEquals(0, ret); - Assertions.assertEquals(3, cluster.getFollowers().size()); + JavaUtils.attempt(() -> Assertions.assertNotEquals(leader.getId(), RaftTestUtil.waitForLeader(cluster).getId()), + 10, TimeDuration.valueOf(1, TimeUnit.SECONDS), "testElectionStepDownCommand", LOG); } } From 83627dc0da5315cf4559854032c69845d57934ce Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Fri, 29 May 2026 15:30:39 +0800 Subject: [PATCH 2/6] RATIS-2548. Stabilize async append kill-leader test --- .../java/org/apache/ratis/RaftBasicTests.java | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 8e008990da..2c4160da3c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -51,6 +51,7 @@ import org.slf4j.event.Level; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Set; @@ -127,13 +128,7 @@ static void runTestBasicAppendEntries( final CompletableFuture killAndRestartFollower = killAndRestartServer( cluster.getFollowers().get(0).getId(), 0, 1000, cluster, log); - final CompletableFuture killAndRestartLeader; - if (killLeader) { - log.info("killAndRestart leader " + leader.getId()); - killAndRestartLeader = killAndRestartServer(leader.getId(), 2000, 4000, cluster, log); - } else { - killAndRestartLeader = CompletableFuture.completedFuture(null); - } + CompletableFuture killAndRestartLeader = CompletableFuture.completedFuture(null); final SimpleMessage[] messages = SimpleMessage.create(numMessages); @@ -141,29 +136,28 @@ static void runTestBasicAppendEntries( log.info(cluster.printServers()); try (final RaftClient client = cluster.createClient()) { - final AtomicInteger asyncReplyCount = new AtomicInteger(); - final CompletableFuture f = new CompletableFuture<>(); + final List> asyncReplies = new ArrayList<>(); for (SimpleMessage message : messages) { if (async) { - client.async().send(message).thenAcceptAsync(reply -> { - if (!reply.isSuccess()) { - f.completeExceptionally( - new AssertionError("Failed with reply " + reply)); - } else if (asyncReplyCount.incrementAndGet() == messages.length) { - f.complete(null); - } - }); + asyncReplies.add(client.async().send(message)); } else { final RaftClientReply reply = client.io().send(message); Assertions.assertTrue(reply.isSuccess()); } } if (async) { - f.join(); - Assertions.assertEquals(messages.length, asyncReplyCount.get()); + CompletableFuture.allOf(asyncReplies.toArray(new CompletableFuture[0])).join(); + asyncReplies.forEach(f -> { + final RaftClientReply reply = f.join(); + Assertions.assertTrue(reply.isSuccess(), () -> "Failed with reply " + reply); + }); } } + if (killLeader) { + log.info("killAndRestart leader " + leader.getId()); + killAndRestartLeader = killAndRestartServer(leader.getId(), 0, 4000, cluster, log); + } Thread.sleep(cluster.getTimeoutMax().toIntExact(TimeUnit.MILLISECONDS) + 100); } finally { CompletableFuture.allOf(killAndRestartFollower, killAndRestartLeader).join(); From 0a56010e26f09f4fbae4c8fc936c75ad053122a5 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Fri, 29 May 2026 15:38:45 +0800 Subject: [PATCH 3/6] RATIS-2548. Preserve single-mode leadership checks --- .../java/org/apache/ratis/server/impl/LeaderStateImpl.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 5e5cdaee25..2c01fccb3d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -1154,6 +1154,10 @@ public boolean checkLeadership() { final RaftConfigurationImpl conf = server.getRaftConf(); + if (conf.isSingleMode(server.getId())) { + return true; + } + if (conf.hasMajority(activePeers, server.getId())) { // leadership check passed return true; From 82718bc2ad38b6f0c0f49137b883b69bb525b720 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Sun, 31 May 2026 12:36:52 +0800 Subject: [PATCH 4/6] RATIS-2548. Address review comments --- .../ratis/server/impl/LeaderStateImpl.java | 4 - .../java/org/apache/ratis/RaftBasicTests.java | 90 +++++++++++++------ .../sh/ElectionCommandIntegrationTest.java | 4 +- 3 files changed, 66 insertions(+), 32 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 2c01fccb3d..5e5cdaee25 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -1154,10 +1154,6 @@ public boolean checkLeadership() { final RaftConfigurationImpl conf = server.getRaftConf(); - if (conf.isSingleMode(server.getId())) { - return true; - } - if (conf.hasMajority(activePeers, server.getId())) { // leadership check passed return true; diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 2c4160da3c..9fff8640a6 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -115,6 +115,41 @@ static CompletableFuture killAndRestartServer( return future; } + private static List findLogEntriesContaining( + RaftLog log, long expectedTerm, SimpleMessage[] expectedMessages) { + final List entries = RaftTestUtil.getStateMachineLogEntries(log, s -> {}); + final List matched = new ArrayList<>(expectedMessages.length); + int e = 0; + for (SimpleMessage expected : expectedMessages) { + boolean found = false; + for (; e < entries.size(); e++) { + final LogEntryProto entry = entries.get(e); + if (entry.getTerm() >= expectedTerm + && expected.getContent().equals(entry.getStateMachineLogEntry().getLogData())) { + matched.add(entry); + e++; + found = true; + break; + } + } + Assertions.assertTrue(found, () -> "Failed to find " + expected + " in entries " + entries); + } + return matched; + } + + private static void assertLogEntriesContaining( + RaftServer.Division server, long expectedTerm, SimpleMessage[] expectedMessages, int numAttempts, Logger log) + throws Exception { + final String name = server.getId() + " assertLogEntriesContaining"; + JavaUtils.attempt(() -> { + RaftTestUtil.assertLogEntries( + findLogEntriesContaining(server.getRaftLog(), expectedTerm, expectedMessages), + expectedTerm, expectedMessages); + return null; + }, + numAttempts, TimeDuration.ONE_SECOND, () -> name, log); + } + static void runTestBasicAppendEntries( boolean async, boolean killLeader, int numMessages, MiniRaftCluster cluster, Logger log) throws Exception { @@ -128,46 +163,49 @@ static void runTestBasicAppendEntries( final CompletableFuture killAndRestartFollower = killAndRestartServer( cluster.getFollowers().get(0).getId(), 0, 1000, cluster, log); - CompletableFuture killAndRestartLeader = CompletableFuture.completedFuture(null); + final CompletableFuture killAndRestartLeader; + if (killLeader) { + log.info("killAndRestart leader " + leader.getId()); + killAndRestartLeader = killAndRestartServer(leader.getId(), 2000, 4000, cluster, log); + } else { + killAndRestartLeader = CompletableFuture.completedFuture(null); + } - final SimpleMessage[] messages = SimpleMessage.create(numMessages); + log.info(cluster.printServers()); - try { - log.info(cluster.printServers()); + final SimpleMessage[] messages = SimpleMessage.create(numMessages); - try (final RaftClient client = cluster.createClient()) { - final List> asyncReplies = new ArrayList<>(); + try (final RaftClient client = cluster.createClient()) { + final List> asyncReplies = new ArrayList<>(); - for (SimpleMessage message : messages) { - if (async) { - asyncReplies.add(client.async().send(message)); - } else { - final RaftClientReply reply = client.io().send(message); - Assertions.assertTrue(reply.isSuccess()); - } - } + for (SimpleMessage message : messages) { if (async) { - CompletableFuture.allOf(asyncReplies.toArray(new CompletableFuture[0])).join(); - asyncReplies.forEach(f -> { - final RaftClientReply reply = f.join(); - Assertions.assertTrue(reply.isSuccess(), () -> "Failed with reply " + reply); - }); + asyncReplies.add(client.async().send(message)); + } else { + final RaftClientReply reply = client.io().send(message); + Assertions.assertTrue(reply.isSuccess()); } } - if (killLeader) { - log.info("killAndRestart leader " + leader.getId()); - killAndRestartLeader = killAndRestartServer(leader.getId(), 0, 4000, cluster, log); + if (async) { + asyncReplies.forEach(f -> { + final RaftClientReply reply = f.join(); + Assertions.assertTrue(reply.isSuccess(), () -> "Failed with reply " + reply); + }); } - Thread.sleep(cluster.getTimeoutMax().toIntExact(TimeUnit.MILLISECONDS) + 100); - } finally { - CompletableFuture.allOf(killAndRestartFollower, killAndRestartLeader).join(); } + Thread.sleep(cluster.getTimeoutMax().toIntExact(TimeUnit.MILLISECONDS) + 100); + killAndRestartFollower.join(); + killAndRestartLeader.join(); log.info(cluster.printAllLogs()); final List divisions = cluster.getServerAliveStream().collect(Collectors.toList()); for(RaftServer.Division impl: divisions) { - RaftTestUtil.assertLogEntries(impl, term, messages, 50, log); + if (killLeader) { + assertLogEntriesContaining(impl, term, messages, 50, log); + } else { + RaftTestUtil.assertLogEntries(impl, term, messages, 50, log); + } } } diff --git a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java index 68c1ff5d4f..4a7ec0ef01 100644 --- a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java +++ b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java @@ -153,11 +153,11 @@ void runTestElectionStepDownCommand(MiniRaftCluster cluster) throws Exception { final StringPrintStream out = new StringPrintStream(); RatisShell shell = new RatisShell(out.getPrintStream()); Assertions.assertEquals(2, cluster.getFollowers().size()); - int ret = shell.run("election", "pause", "-peers", sb.toString(), "-address", + int ret = shell.run("election", "pause", "-peers", sb, "-address", leader.getPeer().getAddress()); Assertions.assertEquals(0, ret); - ret = shell.run("election", "stepDown", "-peers", sb.toString()); + ret = shell.run("election", "stepDown", "-peers", sb); Assertions.assertEquals(0, ret); JavaUtils.attempt(() -> Assertions.assertNotEquals(leader.getId(), RaftTestUtil.waitForLeader(cluster).getId()), 10, TimeDuration.valueOf(1, TimeUnit.SECONDS), "testElectionStepDownCommand", LOG); From 51b60c343223f8d675309ed6ddbc373dd0c5b2d9 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Sun, 31 May 2026 13:29:32 +0800 Subject: [PATCH 5/6] RATIS-2548. Stabilize test state machine snapshots --- .../impl/SimpleStateMachine4Testing.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java index 1ffbdbcb99..a4a7002c38 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java @@ -268,14 +268,16 @@ public long takeSnapshot() { // TODO: snapshot should be written to a tmp file, then renamed final File snapshotFile = storage.getSnapshotFile(termIndex.getTerm(), endIndex); LOG.debug("Taking a snapshot with {}, file:{}", termIndex, snapshotFile); + final LogEntryProto[] entries; + synchronized (indexMap) { + entries = indexMap.values().stream() + .filter(entry -> entry.getIndex() <= endIndex) + .toArray(LogEntryProto[]::new); + } try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(snapshotFile, false, segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) { - for (final LogEntryProto entry : indexMap.values()) { - if (entry.getIndex() > endIndex) { - break; - } else { - out.write(entry); - } + for (final LogEntryProto entry : entries) { + out.write(entry); } out.flush(); } catch (IOException e) { From fb3ee4eefc2f238194be35e5d4a2f737e0a6ff9c Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Sun, 31 May 2026 14:08:48 +0800 Subject: [PATCH 6/6] RATIS-2548. Stabilize snapshot and leader election tests --- .../server/TestManualRestoreSnapshot.java | 10 ++--- .../server/impl/LeaderElectionTests.java | 43 ++++++++++--------- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/server/TestManualRestoreSnapshot.java b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/server/TestManualRestoreSnapshot.java index 0480a5364a..f1075ae08c 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/server/TestManualRestoreSnapshot.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/server/TestManualRestoreSnapshot.java @@ -112,15 +112,15 @@ void run(MiniRaftCluster cluster) throws Exception { final StateMachine stateMachine = restartedFollower.getStateMachine(); final SnapshotInfo info = stateMachine.getLatestSnapshot(); LOG.info("{} restarted snapshot info {} from {}", followerId, info, stateMachine); + assertNotNull(info); + assertTrue(info.getTermIndex().equals(applied), () -> info + " != " + applied); JavaUtils.attemptUntilTrue(() -> { System.out.println(cluster.printServers()); - final TermIndex leaderLastApplied = leaderStateMachine.getLastAppliedTermIndex(); - LOG.info("Leader {} last applied {}", leader.getId(), leaderLastApplied); final TermIndex followerLastApplied = stateMachine.getLastAppliedTermIndex(); - LOG.info("Follower {} last applied {}", followerId, followerLastApplied); - return followerLastApplied.equals(leaderLastApplied); - }, 10, TimeDuration.ONE_SECOND, "followerLastApplied", LOG); + LOG.info("Follower {} last applied {}, snapshot {}", followerId, followerLastApplied, applied); + return followerLastApplied != null && followerLastApplied.getIndex() >= applied.getIndex(); + }, 10, TimeDuration.ONE_SECOND, "followerSnapshotApplied", LOG); sendMessages(cluster, 7); } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index 3a47d127c5..07b4792844 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -148,29 +148,32 @@ public void testAddServerForWaitReady() throws IOException, InterruptedException final MiniRaftCluster cluster = newCluster(3); cluster.start(); RaftTestUtil.waitForLeader(cluster); - try (RaftClient client = cluster.createClient()) { - for (int i = 0; i < 10; ++i) { - RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message_" + i)); - assertTrue(reply.isSuccess()); - } - // add 3 new servers and wait longer time + try { CodeInjectionForTesting.put(RaftServerImpl.START_COMPLETE, new SleepCode(2000)); - final PeerChanges peerChanges = cluster.addNewPeers(2, true); - LOG.info("add new 3 servers"); - LOG.info(cluster.printServers()); - RaftClientReply reply = client.admin().setConfiguration(SetConfigurationRequest.Arguments.newBuilder() - .setServersInNewConf(peerChanges.getAddedPeers()) - .setMode(SetConfigurationRequest.Mode.ADD).build()); - assertTrue(reply.isSuccess()); - for (RaftServer server : cluster.getServers()) { - RaftServerProxy proxy = (RaftServerProxy) server; - proxy.getImpls().forEach(s -> { - assertTrue(s.isRunning()); - }); + try (RaftClient client = cluster.createClient()) { + for (int i = 0; i < 10; ++i) { + RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message_" + i)); + assertTrue(reply.isSuccess()); + } + // add 3 new servers and wait longer time + final PeerChanges peerChanges = cluster.addNewPeers(2, true); + LOG.info("add new 3 servers"); + LOG.info(cluster.printServers()); + RaftClientReply reply = client.admin().setConfiguration(SetConfigurationRequest.Arguments.newBuilder() + .setServersInNewConf(peerChanges.getAddedPeers()) + .setMode(SetConfigurationRequest.Mode.ADD).build()); + assertTrue(reply.isSuccess()); + JavaUtils.attempt(() -> { + for (RaftServer server : cluster.getServers()) { + RaftServerProxy proxy = (RaftServerProxy) server; + proxy.getImpls().forEach(s -> assertTrue(s.isRunning())); + } + }, 10, HUNDRED_MILLIS, "all server impls running", LOG); } + } finally { + cluster.shutdown(); + CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE); } - cluster.shutdown();; - CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE); } @Test