Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1252,18 +1252,17 @@ void replyPendingRequest(TermIndex termIndex, RaftClientReply reply, RetryCacheI
final PendingRequest pending = pendingRequests.remove(termIndex);
leaderTracer.removePendingRequest(pending);

final LongSupplier replyMethod = () -> {
final Runnable replyMethod = () -> {
cacheEntry.updateResult(reply);
if (pending != null) {
pending.setReply(reply);
}
return termIndex.getIndex();
};

if (readIndexType == Type.REPLIED_INDEX) {
replyFlusher.hold(replyMethod);
replyFlusher.hold(termIndex.getIndex(), replyMethod);
} else {
replyMethod.getAsLong();
replyMethod.run();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ void tracePendingRequest(PendingRequest pending) {
}

void removePendingRequest(PendingRequest pending) {
if (pending == null) {
return;
}
appendEntriesSpans.remove(pending.getTermIndex().getIndex());
}

Expand All @@ -96,4 +99,4 @@ SpanContextProto traceAppendEntries(List<LogEntryProto> entries) {
void close() {
appendEntriesSpans.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;

/**
* Implements the reply flush logic as part of the leader batch write when RepliedIndex is used.
Expand All @@ -39,16 +38,33 @@ public class ReplyFlusher {
private static final String CLASS_NAME = JavaUtils.getClassSimpleName(RaftServerImpl.class);
public static final String FLUSH = CLASS_NAME + ".flush";

private static class HeldReply {
private final long logIndex;
private final Runnable replyMethod;

HeldReply(long logIndex, Runnable replyMethod) {
this.logIndex = logIndex;
this.replyMethod = replyMethod;
}

long getLogIndex() {
return logIndex;
}

void complete() {
replyMethod.run();
}
}

static class Replies {
/** When a {@link LongSupplier} is invoked, it completes a write reply and return the log index. */
private LinkedList<LongSupplier> list = new LinkedList<>();
private LinkedList<HeldReply> list = new LinkedList<>();

synchronized void add(LongSupplier replyMethod) {
list.add(replyMethod);
synchronized void add(HeldReply reply) {
list.add(reply);
}

synchronized LinkedList<LongSupplier> getAndSetNewList() {
final LinkedList<LongSupplier> old = list;
synchronized LinkedList<HeldReply> getAndSetNewList() {
final LinkedList<HeldReply> old = list;
list = new LinkedList<>();
return old;
}
Expand Down Expand Up @@ -79,8 +95,8 @@ long getRepliedIndex() {
}

/** Hold a write reply for later batch flushing */
void hold(LongSupplier replyMethod) {
replies.add(replyMethod);
void hold(long logIndex, Runnable replyMethod) {
replies.add(new HeldReply(logIndex, replyMethod));
}

void start(long startIndex) {
Expand Down Expand Up @@ -112,16 +128,24 @@ private void run() {
private void flush() {
CodeInjectionForTesting.execute(FLUSH, id, null);

final LinkedList<LongSupplier> toFlush = replies.getAndSetNewList();
final LinkedList<HeldReply> toFlush = replies.getAndSetNewList();
if (toFlush.isEmpty()) {
return;
}
long maxIndex = toFlush.removeLast().getAsLong();
for (LongSupplier held : toFlush) {
maxIndex = Math.max(maxIndex, held.getAsLong());

final int numReplies = toFlush.size();
final HeldReply last = toFlush.removeLast();
long maxIndex = last.getLogIndex();
for (HeldReply held : toFlush) {
maxIndex = Math.max(maxIndex, held.getLogIndex());
}
repliedIndex.updateToMax(maxIndex, s ->
LOG.debug("{}: flushed {} replies, {}", id, toFlush.size(), s));
LOG.debug("{}: flushed {} replies, {}", id, numReplies, s));

last.complete();
for (HeldReply held : toFlush) {
held.complete();
}
}

/** Stop the reply flusher daemon. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;

import org.apache.ratis.util.TimeDuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class TestReplyFlusher {
@Test
public void testAdvanceRepliedIndexBeforeCompletingReplies() throws Exception {
final ReplyFlusher flusher = new ReplyFlusher(getClass().getSimpleName(), 0,
TimeDuration.valueOf(10, TimeUnit.MILLISECONDS));
final CompletableFuture<Long> repliedIndexWhenCompleted = new CompletableFuture<>();

flusher.hold(7, () -> repliedIndexWhenCompleted.complete(flusher.getRepliedIndex()));
flusher.start(0);
try {
Assertions.assertEquals(7, repliedIndexWhenCompleted.get(5, TimeUnit.SECONDS));
} finally {
flusher.stop();
}
}
}