diff --git a/docs/design/2026_06_02_idempotent_snapshot_restore.md b/docs/design/2026_06_02_idempotent_snapshot_restore.md index 06d2405e0..f1d3f1c07 100644 --- a/docs/design/2026_06_02_idempotent_snapshot_restore.md +++ b/docs/design/2026_06_02_idempotent_snapshot_restore.md @@ -236,21 +236,37 @@ type AppliedIndexReader interface { } ``` -`kvFSM` exposes its store through a typed accessor so wal_store.go can -reach it without importing the concrete pebble type: +`kvFSM` directly satisfies `raftengine.AppliedIndexReader` by +forwarding to its underlying store (PR #915 round-5 — round-1 had a +factory `AppliedIndexReader() AppliedIndexReader` method intended to +be called through a separate `AppliedIndexReporter` interface, but +that pattern would require the skip gate to know about the reporter +shim; the direct interface satisfaction is simpler and a compile-time +guard catches future signature drift): ```go -type AppliedIndexReporter interface { - AppliedIndexReader() AppliedIndexReader -} -func (f *kvFSM) AppliedIndexReader() AppliedIndexReader { - if r, ok := f.store.(AppliedIndexReader); ok { - return r +// kv/fsm.go +func (f *kvFSM) LastAppliedIndex() (uint64, bool, error) { + r, ok := f.store.(raftengine.AppliedIndexReader) + if !ok { + return 0, false, nil } - return nil + idx, present, err := r.LastAppliedIndex() + if err != nil { + return 0, false, errors.WithStack(err) + } + return idx, present, nil } + +// kv/fsm_applied_index_iface_check.go (compile-time guard) +var _ raftengine.AppliedIndexReader = (*kvFSM)(nil) +var _ raftengine.AppliedIndexWriter = (*kvFSM)(nil) ``` +The compile-time guard means any future rename or signature drift +fails `go build` immediately — the soak investment is protected at +the compiler level. + ### 4. Conditional restore (with conservative error fallback) ```go @@ -279,16 +295,20 @@ func restoreSnapshotState(fsm StateMachine, snapshot raftpb.Snapshot, fsmSnapDir // fsmAlreadyAtIndex returns true ONLY when we can prove the FSM is // already at or past `want`. Any uncertainty -- FSM doesn't expose -// the reporter, store doesn't expose the reader, read error, or -// missing meta key -- returns false so we fall back to the full -// restore. A stale-but-incorrect skip is far worse than a wasteful -// full restore; the fallback errs strictly toward restoring. +// the reader interface, read error, or missing meta key -- returns +// false so we fall back to the full restore. A stale-but-incorrect +// skip is far worse than a wasteful full restore; the fallback errs +// strictly toward restoring. +// +// Direct type-assert against raftengine.AppliedIndexReader (PR #915 +// round-5): kvFSM satisfies the interface directly via its +// LastAppliedIndex method, so no separate AppliedIndexReporter shim +// is needed. The compile-time guard in kv/fsm_applied_index_iface_check.go +// keeps this stable. func fsmAlreadyAtIndex(fsm StateMachine, want uint64) bool { - reporter, ok := fsm.(AppliedIndexReporter) + r, ok := fsm.(raftengine.AppliedIndexReader) if !ok { return false } - reader := reporter.AppliedIndexReader() - if reader == nil { return false } - have, present, err := reader.LastAppliedIndex() + have, present, err := r.LastAppliedIndex() if err != nil || !present { return false } return have >= want } @@ -814,8 +834,12 @@ present. - `ApplyIndexAware` is **already** in `main`; this design only adds consumers. - The new opt-in interfaces (`AppliedIndexReader`, - `AppliedIndexReporter`, `SnapshotHeaderApplier`) are additive. + `AppliedIndexWriter`, `SnapshotHeaderApplier`) are additive. FSMs that don't implement them fall back to the current behaviour. + (Round-1 / round-2 of this doc mentioned an `AppliedIndexReporter` + factory-method shim; PR #915 round-5 superseded it by having + `kvFSM` satisfy `AppliedIndexReader` directly via its + `LastAppliedIndex` method.) - `metaAppliedIndexBytes` is new. Older fsm.db files don't have it. The `present=false` branch makes the first restart after upgrade fall back to full restore, populating the meta key from the next @@ -863,7 +887,7 @@ restoreSnapshotState skipped (FSM at index %d, snapshot at %d, ceiling=%d, cutov | Branch | Content | Behaviour change | |---|---|---| | **B1** (this PR) | Design doc | None | -| **B2** | `ApplyMutationsRaftAt` / `DeletePrefixAtRaftAt` overloads + meta-key bundling in both leaves + `pebbleStore.LastAppliedIndex()` (under `dbMu.RLock()`) + `pebbleStore.SetDurableAppliedIndex()` (under `dbMu.RLock()`, **`pebble.Sync` unconditionally**) + `kvFSM.AppliedIndexReader()` accessor + `kvFSM.SetDurableAppliedIndex` forwarding + thread `f.pendingApplyIdx` into the data-Apply leaves + BOTH `persistCreatedSnapshot` (`engine.go:2679`) AND `e.persistLocalSnapshotPayload` (`engine.go:4032`, the SnapshotCount-triggered hot path) call `SetDurableAppliedIndex` BEFORE the corresponding `persist.SaveSnap` | Meta key starts being written on every data Apply AND at every snapshot persist (both config-snapshot and steady-state local-snapshot paths). Skip is still disabled. Soak in production for one release. | +| **B2** | `ApplyMutationsRaftAt` / `DeletePrefixAtRaftAt` overloads + meta-key bundling in both leaves + `pebbleStore.LastAppliedIndex()` (under `dbMu.RLock()`) + `pebbleStore.SetDurableAppliedIndex()` (under `dbMu.RLock()` + `applyMu.Lock()` RMW monotonic guard, **`pebble.Sync` unconditionally**) + `kvFSM.LastAppliedIndex()` directly satisfies `raftengine.AppliedIndexReader` (compile-time guard in `kv/fsm_applied_index_iface_check.go`) + `kvFSM.SetDurableAppliedIndex` forwarding + thread `f.pendingApplyIdx` into the data-Apply leaves + BOTH `persistCreatedSnapshot` (`engine.go:2679`) AND `e.persistLocalSnapshotPayload` (`engine.go:4032`, the SnapshotCount-triggered hot path) call `SetDurableAppliedIndex` BEFORE the corresponding `persist.SaveSnap` | Meta key starts being written on every data Apply AND at every snapshot persist (both config-snapshot and steady-state local-snapshot paths). Skip is still disabled. Soak in production for one release. | | **B3** | `restoreSnapshotState` skip gate + `applyHeaderStateOnSkip(snapPath, tok.CRC32C)` orchestrating size + footer-vs-tokenCRC + full-body-CRC verification using `internal/raftengine/etcd`'s existing helpers (matching `openAndRestoreFSMSnapshot`'s safety contract) + two-phase `SnapshotHeaderApplier` seam on `kvFSM` (`ParseSnapshotHeader(r io.Reader) (ceiling, cutover, err)` + pure `ApplySnapshotHeader(ceiling, cutover)`) + metrics + INFO log | **User-visible cold-start win.** | | **B4** | Lower `HEALTH_TIMEOUT_SECONDS` default once production data shows steady-state skip rate ≥ 90 % | Tighter ceiling; the env override remains honoured. | diff --git a/go.mod b/go.mod index af038cad9..d3287b9dc 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/aws/smithy-go v1.25.1 github.com/cockroachdb/errors v1.13.0 github.com/cockroachdb/pebble/v2 v2.1.5 + github.com/coreos/go-semver v0.3.1 github.com/emirpasic/gods v1.18.1 github.com/getsentry/sentry-go v0.46.2 github.com/goccy/go-json v0.10.6 @@ -58,7 +59,6 @@ require ( github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/swiss v0.0.0-20251224182025-b0f6560f979b // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect - github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 4387f8d25..f8a32dd93 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -1998,6 +1998,15 @@ func (e *Engine) applyReadySnapshot(snapshot raftpb.Snapshot) error { if err != nil { return errors.Wrapf(err, "decode snapshot token index=%d", snapshot.Metadata.Index) } + // B3/follow-up: also call SetDurableAppliedIndex(tok.Index) here + // after Restore so peer-after-InstallSnapshot populates the meta + // key. The local-snapshot persist path already bumps the live + // store (engine.persistLocalSnapshotPayload), but the receiving + // node's restored store inherits the pre-bump value embedded in + // the snapshot artifact. Design Non-Goals § + // docs/design/2026_06_02_idempotent_snapshot_restore.md:71-74 + // scopes this out of Branch 2; see PR #915 round-4/5 codex P2 on + // engine.go:4077 for the rationale. if err := openAndRestoreFSMSnapshot(e.fsm, fsmSnapPath(e.fsmSnapDir, tok.Index), tok.CRC32C); err != nil { return errors.Wrapf(err, "restore fsm snapshot file index=%d crc=%08x", tok.Index, tok.CRC32C) } @@ -2676,10 +2685,38 @@ func (e *Engine) createConfigSnapshot(index uint64, confState raftpb.ConfState, } } +// bumpDurableAppliedIndexBeforeSave pins the FSM's durable applied +// index to `index` BEFORE the engine calls persist.SaveSnap, so a +// successful snapshot persist always implies LastAppliedIndex >= +// snap.Metadata.Index — closes the HLC-lease-only / encryption-only +// fallback (PR #910 design §6). +// +// FSMs that do not expose raftengine.AppliedIndexWriter silently +// no-op; the skip optimisation falls back to full restore for them +// (legacy test fakes, in-memory backends). pebble.Sync is forced on +// the writer side regardless of ELASTICKV_FSM_SYNC_MODE — once +// persist.SaveSnap returns, WAL compaction discards every log entry +// at or before snap.Metadata.Index, so there is no source to replay +// the meta key bump from. +// +// Used by BOTH snapshot persist sites: persistCreatedSnapshot (this +// file) and e.persistLocalSnapshotPayload (the steady-state +// SnapshotCount-triggered hot path). +func (e *Engine) bumpDurableAppliedIndexBeforeSave(index uint64) error { + w, ok := e.fsm.(raftengine.AppliedIndexWriter) + if !ok { + return nil + } + return errors.WithStack(w.SetDurableAppliedIndex(index)) +} + func (e *Engine) persistCreatedSnapshot(snap raftpb.Snapshot) error { if etcdraft.IsEmptySnap(snap) || e.persist == nil { return nil } + if err := e.bumpDurableAppliedIndexBeforeSave(snap.Metadata.Index); err != nil { + return err + } if err := e.persist.SaveSnap(snap); err != nil { return errors.WithStack(err) } @@ -4044,29 +4081,49 @@ func (e *Engine) persistLocalSnapshotPayload(index uint64, payload []byte) error return nil } + if err := e.bumpDurableAppliedIndexBeforeSave(index); err != nil { + return err + } + _, err = persistLocalSnapshotPayload(e.storage, e.persist, index, payload) + return e.handleLocalSnapshotPersistResult(err) +} + +// handleLocalSnapshotPersistResult collapses the post-SaveSnap error +// switch into a single helper so persistLocalSnapshotPayload stays +// under the cyclomatic-complexity budget. The three raft-side +// 'snapshot already moved on' cases (ErrCompacted / ErrUnavailable / +// ErrSnapOutOfDate) are all treated as no-ops; only the success path +// runs the disk-side purge. +func (e *Engine) handleLocalSnapshotPersistResult(err error) error { switch { case err == nil: - snapDir := filepath.Join(e.dataDir, snapDirName) - if purgeErr := purgeOldSnapshotFiles(snapDir, e.fsmSnapDir); purgeErr != nil { - slog.Warn("failed to purge old snap files", "error", purgeErr) - } - walDir := filepath.Join(e.dataDir, walDirName) - if purgeErr := purgeOldWALFiles(walDir, e.walRetention()); purgeErr != nil { - slog.Warn("failed to purge old wal files", "error", purgeErr) - } - return nil - case errors.Is(err, etcdraft.ErrCompacted): - return nil - case errors.Is(err, etcdraft.ErrUnavailable): + e.purgeAfterLocalSnapshot() return nil - case errors.Is(err, etcdraft.ErrSnapOutOfDate): + case errors.Is(err, etcdraft.ErrCompacted), + errors.Is(err, etcdraft.ErrUnavailable), + errors.Is(err, etcdraft.ErrSnapOutOfDate): return nil default: return err } } +// purgeAfterLocalSnapshot runs the disk-side cleanup that follows a +// successful local-snapshot persist: trim old .snap/.fsm files and +// rotate ageing WAL segments. Both calls log on error but do not +// propagate — failing to purge is non-fatal. +func (e *Engine) purgeAfterLocalSnapshot() { + snapDir := filepath.Join(e.dataDir, snapDirName) + if purgeErr := purgeOldSnapshotFiles(snapDir, e.fsmSnapDir); purgeErr != nil { + slog.Warn("failed to purge old snap files", "error", purgeErr) + } + walDir := filepath.Join(e.dataDir, walDirName) + if purgeErr := purgeOldWALFiles(walDir, e.walRetention()); purgeErr != nil { + slog.Warn("failed to purge old wal files", "error", purgeErr) + } +} + func encodeReadContext(id uint64) []byte { out := make([]byte, envelopeHeaderSize) out[0] = readContextVersion diff --git a/internal/raftengine/etcd/engine_applied_index_test.go b/internal/raftengine/etcd/engine_applied_index_test.go new file mode 100644 index 000000000..437e10f77 --- /dev/null +++ b/internal/raftengine/etcd/engine_applied_index_test.go @@ -0,0 +1,227 @@ +package etcd + +import ( + "io" + "sync" + "testing" + + "github.com/bootjp/elastickv/internal/raftengine" + "github.com/coreos/go-semver/semver" + "github.com/stretchr/testify/require" + etcdraft "go.etcd.io/raft/v3" + raftpb "go.etcd.io/raft/v3/raftpb" +) + +// applyIndexOrderRecorder is shared between the recording FSM and +// the recording persist storage so the test can assert the +// crash-ordering invariant (SetDurableAppliedIndex MUST run before +// persist.SaveSnap). Both record into a single ordered slice keyed +// by event kind; the test reads it back to verify the sequence. +type applyIndexOrderRecorder struct { + mu sync.Mutex + events []orderEvent +} + +type orderEvent struct { + kind string // "bump" | "save" + index uint64 +} + +func (r *applyIndexOrderRecorder) record(kind string, idx uint64) { + r.mu.Lock() + defer r.mu.Unlock() + r.events = append(r.events, orderEvent{kind: kind, index: idx}) +} + +func (r *applyIndexOrderRecorder) snapshot() []orderEvent { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]orderEvent, len(r.events)) + copy(out, r.events) + return out +} + +// recordingAppliedIndexFSM implements StateMachine + +// raftengine.AppliedIndexWriter. It records every +// SetDurableAppliedIndex call into the shared recorder. +type recordingAppliedIndexFSM struct { + rec *applyIndexOrderRecorder + failNext bool + failErr error +} + +func (f *recordingAppliedIndexFSM) Apply(_ []byte) any { return nil } +func (f *recordingAppliedIndexFSM) Snapshot() (Snapshot, error) { return nil, io.EOF } +func (f *recordingAppliedIndexFSM) Restore(_ io.Reader) error { return nil } + +func (f *recordingAppliedIndexFSM) SetDurableAppliedIndex(idx uint64) error { + if f.failNext { + f.failNext = false + return f.failErr + } + f.rec.record("bump", idx) + return nil +} + +// recordingPersistStorage is a minimal etcdstorage.Storage stand-in +// that records SaveSnap calls into the shared recorder. The hook +// only calls SaveSnap + Release; the rest are stubs. +type recordingPersistStorage struct { + rec *applyIndexOrderRecorder +} + +func (p *recordingPersistStorage) SaveSnap(snap raftpb.Snapshot) error { + p.rec.record("save", snap.Metadata.Index) + return nil +} + +func (p *recordingPersistStorage) Save(_ raftpb.HardState, _ []raftpb.Entry) error { return nil } +func (p *recordingPersistStorage) Release(_ raftpb.Snapshot) error { return nil } +func (p *recordingPersistStorage) Sync() error { return nil } +func (p *recordingPersistStorage) Close() error { return nil } +func (p *recordingPersistStorage) MinimalEtcdVersion() *semver.Version { return nil } + +// TestRecordingFSM_SatisfiesAppliedIndexWriter is a compile-time- +// adjacent assertion: the recording FSM MUST satisfy the writer +// seam so the engine hook actually fires for it. +func TestRecordingFSM_SatisfiesAppliedIndexWriter(t *testing.T) { + rec := &applyIndexOrderRecorder{} + var f any = &recordingAppliedIndexFSM{rec: rec} + _, ok := f.(raftengine.AppliedIndexWriter) + require.True(t, ok, "recordingAppliedIndexFSM must implement raftengine.AppliedIndexWriter") +} + +// TestPersistCreatedSnapshot_BumpsAppliedIndex exercises Site 1 of +// the persist hook. We invoke (*Engine).persistCreatedSnapshot +// directly; the engine MUST call SetDurableAppliedIndex +// (snap.Metadata.Index) BEFORE SaveSnap. +func TestPersistCreatedSnapshot_BumpsAppliedIndex(t *testing.T) { + rec := &applyIndexOrderRecorder{} + fsm := &recordingAppliedIndexFSM{rec: rec} + persist := &recordingPersistStorage{rec: rec} + e := &Engine{fsm: fsm, persist: persist} + + snap := raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 42, Term: 1}} + require.NoError(t, e.persistCreatedSnapshot(snap)) + + require.Equal(t, []orderEvent{ + {kind: "bump", index: 42}, + {kind: "save", index: 42}, + }, rec.snapshot(), + "hook MUST call SetDurableAppliedIndex BEFORE SaveSnap") +} + +// TestPersistCreatedSnapshot_NilFSMNoOp covers the legacy / test- +// fake case: an FSM that does NOT implement AppliedIndexWriter +// silently no-ops; snapshot persist still runs. +func TestPersistCreatedSnapshot_NilFSMNoOp(t *testing.T) { + rec := &applyIndexOrderRecorder{} + persist := &recordingPersistStorage{rec: rec} + // testStateMachine (defined in engine_test.go) is the canonical + // non-AppliedIndexWriter FSM used by other tests in this package. + e := &Engine{fsm: &testStateMachine{}, persist: persist} + + snap := raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 17, Term: 1}} + require.NoError(t, e.persistCreatedSnapshot(snap)) + + require.Equal(t, []orderEvent{ + {kind: "save", index: 17}, + }, rec.snapshot(), + "legacy FSM path: snapshot persist still happens, just without the meta-key bump") +} + +// TestPersistCreatedSnapshot_BumpErrorAborts checks the ordering +// invariant under failure: if SetDurableAppliedIndex returns an +// error, the engine MUST surface it AND NOT call SaveSnap. This +// preserves the (metaAppliedIndex < snapshot pointer impossible) +// crash invariant from PR #910 design §6. +func TestPersistCreatedSnapshot_BumpErrorAborts(t *testing.T) { + rec := &applyIndexOrderRecorder{} + fsm := &recordingAppliedIndexFSM{rec: rec, failNext: true, failErr: io.ErrShortBuffer} + persist := &recordingPersistStorage{rec: rec} + e := &Engine{fsm: fsm, persist: persist} + + snap := raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 99, Term: 1}} + err := e.persistCreatedSnapshot(snap) + require.Error(t, err, "bump failure MUST be surfaced to caller") + require.Empty(t, rec.snapshot(), + "failed bump MUST NOT have recorded; SaveSnap MUST NOT have run") +} + +// --- Site 2: persistLocalSnapshotPayload (steady-state hot path) --- +// +// These mirror the Site 1 tests above but exercise the engine's +// SnapshotCount-triggered local-snapshot path. The hook sits inside +// e.persistLocalSnapshotPayload (engine.go:4060), under +// e.snapshotMu.Lock(), BEFORE the free-function persistLocalSnapshotPayload +// (wal_store.go:519) which is what actually calls persist.SaveSnap. + +// localSnapshotEngine constructs an *Engine suitable for testing +// e.persistLocalSnapshotPayload in isolation. We need: +// - e.fsm: implements AppliedIndexWriter so the bump hook fires +// - e.persist: implements etcdstorage.Storage so SaveSnap can run +// (via the free-function persistLocalSnapshotPayload calling it) +// - e.storage: a real *etcdraft.MemoryStorage so e.storage.Snapshot() +// and buildLocalSnapshot return sensible values +// - e.dataDir / e.fsmSnapDir: temp dirs so the post-persist purge +// does not panic on Stat +func localSnapshotEngine(t *testing.T, rec *applyIndexOrderRecorder, fsm *recordingAppliedIndexFSM, applied uint64) *Engine { + t.Helper() + storage := etcdraft.NewMemoryStorage() + // Seed the storage with enough entries so buildLocalSnapshot's + // storage.Term(applied) call succeeds. Without this, the free + // persistLocalSnapshotPayload short-circuits at Term lookup before + // reaching persist.SaveSnap, and the test cannot observe the save. + entries := make([]raftpb.Entry, applied) + for i := uint64(0); i < applied; i++ { + entries[i] = raftpb.Entry{Index: i + 1, Term: 1, Data: []byte{}} + } + require.NoError(t, storage.Append(entries)) + persist := &recordingPersistStorage{rec: rec} + return &Engine{ + fsm: fsm, + persist: persist, + storage: storage, + dataDir: t.TempDir(), + fsmSnapDir: t.TempDir(), + } +} + +// TestPersistLocalSnapshotPayload_BumpsAppliedIndex is the +// happy-path test for Site 2. The engine MUST call +// SetDurableAppliedIndex(index) BEFORE the free-function +// persistLocalSnapshotPayload — which is what eventually invokes +// persist.SaveSnap. +func TestPersistLocalSnapshotPayload_BumpsAppliedIndex(t *testing.T) { + rec := &applyIndexOrderRecorder{} + fsm := &recordingAppliedIndexFSM{rec: rec} + const index uint64 = 123 + e := localSnapshotEngine(t, rec, fsm, index) + + require.NoError(t, e.persistLocalSnapshotPayload(index, []byte("payload-stub"))) + + // Exact slice match (matches Site 1 style + closes claude[bot] + // round-3 note #4 / coderabbit round-2 nit). A spurious third + // event — e.g. an accidental double SaveSnap — would fail this + // assertion; a GreaterOrEqual + positional check would not. + require.Equal(t, []orderEvent{ + {kind: "bump", index: index}, + {kind: "save", index: index}, + }, rec.snapshot(), + "hook MUST call SetDurableAppliedIndex BEFORE SaveSnap exactly once each") +} + +// TestPersistLocalSnapshotPayload_BumpErrorAborts mirrors Site 1's +// crash-ordering test for Site 2: a failed SetDurableAppliedIndex +// MUST surface the error AND prevent persist.SaveSnap from running. +func TestPersistLocalSnapshotPayload_BumpErrorAborts(t *testing.T) { + rec := &applyIndexOrderRecorder{} + fsm := &recordingAppliedIndexFSM{rec: rec, failNext: true, failErr: io.ErrShortBuffer} + const index uint64 = 456 + e := localSnapshotEngine(t, rec, fsm, index) + + err := e.persistLocalSnapshotPayload(index, []byte("payload-stub")) + require.Error(t, err, "bump failure MUST be surfaced to caller") + require.Empty(t, rec.snapshot(), + "failed bump MUST NOT have recorded; SaveSnap MUST NOT have run") +} diff --git a/internal/raftengine/statemachine.go b/internal/raftengine/statemachine.go index 28086d2d3..65b6b45f6 100644 --- a/internal/raftengine/statemachine.go +++ b/internal/raftengine/statemachine.go @@ -46,3 +46,47 @@ type StateMachine interface { type ApplyIndexAware interface { SetApplyIndex(idx uint64) } + +// AppliedIndexReader is an OPTIONAL extension that lets the engine +// query the FSM's durable applied-index for the cold-start skip gate. +// See docs/design/2026_06_02_idempotent_snapshot_restore.md §3. +// +// The returned value MUST be the largest Raft entry index whose Apply +// produced a durable mutation on the FSM's primary store (i.e. the +// metaAppliedIndex Pebble meta key, bundled in the same WriteBatch +// as the data mutation). FSMs that cannot self-report return +// (0, false, nil) — the caller treats that as "missing" and falls +// back to the full restore path, preserving the strictly-additive +// invariant. +// +// Returning a non-nil error MUST NOT abort cold start. The +// fsmAlreadyAtIndex caller (restoreSnapshotState) intentionally +// collapses (false, _, err) to "fall back to restore" rather than +// surface the error, because over-restoring on a corrupt meta key is +// strictly safer than skipping incorrectly. +type AppliedIndexReader interface { + LastAppliedIndex() (uint64, bool, error) +} + +// AppliedIndexWriter is an OPTIONAL extension that lets the engine +// pin the FSM's durable applied-index to a known value at snapshot +// persist time. See docs/design/2026_06_02_idempotent_snapshot_restore.md +// §6 "HLC lease entries — checkpoint at snapshot persist". +// +// The engine calls SetDurableAppliedIndex(snap.Metadata.Index) +// before it calls persist.SaveSnap, so that on every successful +// snapshot persist the invariant `LastAppliedIndex >= +// snapshot.Metadata.Index` holds unconditionally — closing the +// HLC-lease-only / encryption-only fallback that would otherwise +// leave LastAppliedIndex stuck at the last data-Apply index. +// +// Implementations MUST persist the value with pebble.Sync (or the +// equivalent strong-durability flag for the backing store) +// regardless of ELASTICKV_FSM_SYNC_MODE. The checkpoint is the only +// durable carrier of metaAppliedIndex at this point — once +// persist.SaveSnap returns, WAL compaction discards every log entry +// at or before snap.Metadata.Index, so there is no source to replay +// the meta key bump from. +type AppliedIndexWriter interface { + SetDurableAppliedIndex(idx uint64) error +} diff --git a/kv/fsm.go b/kv/fsm.go index 8cb3f0c4a..ec7afc69c 100644 --- a/kv/fsm.go +++ b/kv/fsm.go @@ -129,6 +129,57 @@ func (f *kvFSM) SetApplyIndex(idx uint64) { f.pendingApplyIdx = idx } +// LastAppliedIndex implements raftengine.AppliedIndexReader by +// forwarding to the underlying store when it supports the reader +// seam (pebbleStore does; the in-memory mvccStore does not). +// +// Round-1 of this PR shipped this as a factory method +// (AppliedIndexReader() AppliedIndexReader) intended to be called +// through a separate AppliedIndexReporter interface. Codex P2 on +// round-4 (kv/fsm.go:145) correctly pointed out that the planned +// cold-start skip gate (Branch 3) will type-assert +// fsm.(raftengine.AppliedIndexReader) directly — a factory method +// with a different signature does NOT satisfy the interface, so +// the skip would always fall back even after the meta key is +// populated. Renaming the method to LastAppliedIndex and inlining +// the type-assert forward makes kvFSM directly satisfy +// raftengine.AppliedIndexReader. +// +// (0, false, nil) propagates the strictly-additive fallback when +// the store does not expose the seam — the future skip gate treats +// "missing" as "fall back to full restore." See +// docs/design/2026_06_02_idempotent_snapshot_restore.md §3 / §4. +func (f *kvFSM) LastAppliedIndex() (uint64, bool, error) { + r, ok := f.store.(raftengine.AppliedIndexReader) + if !ok { + return 0, false, nil + } + idx, present, err := r.LastAppliedIndex() + if err != nil { + return 0, false, errors.WithStack(err) + } + return idx, present, nil +} + +// SetDurableAppliedIndex implements raftengine.AppliedIndexWriter by +// forwarding to the underlying store when it supports the writer +// seam. Called by the engine at every snapshot persist site BEFORE +// persist.SaveSnap so a successful snapshot persist implies +// LastAppliedIndex >= snap.Metadata.Index, closing the HLC-lease- +// only / encryption-only fallback (PR #910 design §6). +// +// Returns nil silently when the backing store does not implement +// the writer seam (in-memory mvccStore, test fakes) — the skip +// optimisation simply degrades to "fall back to full restore" for +// those FSMs. +func (f *kvFSM) SetDurableAppliedIndex(idx uint64) error { + w, ok := f.store.(raftengine.AppliedIndexWriter) + if !ok { + return nil + } + return errors.WithStack(w.SetDurableAppliedIndex(idx)) +} + type FSM interface { raftengine.StateMachine } @@ -455,7 +506,7 @@ func (f *kvFSM) handleRawRequest(ctx context.Context, r *pb.Request, commitTS ui } // Raw requests always commit against the latest state; use commitTS as both // the validation snapshot and the commit timestamp. - return errors.WithStack(f.store.ApplyMutationsRaft(ctx, muts, nil, commitTS, commitTS)) + return errors.WithStack(f.store.ApplyMutationsRaftAt(ctx, muts, nil, commitTS, commitTS, f.pendingApplyIdx)) } // extractDelPrefix checks if the mutations contain a DEL_PREFIX operation. @@ -472,7 +523,7 @@ func extractDelPrefix(muts []*pb.Mutation) (bool, []byte) { // handleDelPrefix delegates prefix deletion to the store. Transaction-internal // keys are always excluded to preserve transactional integrity. func (f *kvFSM) handleDelPrefix(ctx context.Context, prefix []byte, commitTS uint64) error { - return errors.WithStack(f.store.DeletePrefixAtRaft(ctx, prefix, txnCommonPrefix, commitTS)) + return errors.WithStack(f.store.DeletePrefixAtRaftAt(ctx, prefix, txnCommonPrefix, commitTS, f.pendingApplyIdx)) } var ErrNotImplemented = errors.New("not implemented") @@ -730,7 +781,7 @@ func (f *kvFSM) handlePrepareRequest(ctx context.Context, r *pb.Request) error { return err } - if err := f.store.ApplyMutationsRaft(ctx, storeMuts, r.ReadKeys, startTS, startTS); err != nil { + if err := f.store.ApplyMutationsRaftAt(ctx, storeMuts, r.ReadKeys, startTS, startTS, f.pendingApplyIdx); err != nil { return errors.WithStack(err) } return nil @@ -794,7 +845,7 @@ func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, com if err != nil { return err } - return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, r.ReadKeys, startTS, commitTS)) + return errors.WithStack(f.store.ApplyMutationsRaftAt(ctx, storeMuts, r.ReadKeys, startTS, commitTS, f.pendingApplyIdx)) } // dedupProbeOnePhase decides whether handleOnePhaseTxnRequest should no-op @@ -898,7 +949,7 @@ func (f *kvFSM) commitApplyStartTS(ctx context.Context, primaryKey []byte, start // The secondary-shard LatestCommitTS scan is intentionally deferred to the // write-conflict path so the hot (first-time) commit path pays no extra cost. func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMuts []*store.KVPairMutation, uniq []*pb.Mutation, applyStartTS, commitTS uint64) error { - err := f.store.ApplyMutationsRaft(ctx, storeMuts, nil, applyStartTS, commitTS) + err := f.store.ApplyMutationsRaftAt(ctx, storeMuts, nil, applyStartTS, commitTS, f.pendingApplyIdx) if err == nil { return nil } @@ -915,7 +966,7 @@ func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMut return errors.WithStack(lErr) } if exists && latestTS >= commitTS { - return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, nil, commitTS, commitTS)) + return errors.WithStack(f.store.ApplyMutationsRaftAt(ctx, storeMuts, nil, commitTS, commitTS, f.pendingApplyIdx)) } } return errors.WithStack(err) @@ -966,7 +1017,7 @@ func (f *kvFSM) handleAbortRequest(ctx context.Context, r *pb.Request, abortTS u if len(storeMuts) == 0 { return nil } - return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, nil, startTS, abortTS)) + return errors.WithStack(f.store.ApplyMutationsRaftAt(ctx, storeMuts, nil, startTS, abortTS, f.pendingApplyIdx)) } func (f *kvFSM) buildPrepareStoreMutations(ctx context.Context, muts []*pb.Mutation, primaryKey []byte, startTS, expireAt uint64) ([]*store.KVPairMutation, error) { diff --git a/kv/fsm_applied_index_iface_check.go b/kv/fsm_applied_index_iface_check.go new file mode 100644 index 000000000..1447c9198 --- /dev/null +++ b/kv/fsm_applied_index_iface_check.go @@ -0,0 +1,9 @@ +package kv + +import "github.com/bootjp/elastickv/internal/raftengine" + +// Round-4 P2 #2 fix: kvFSM must directly implement +// raftengine.AppliedIndexReader so the cold-start skip gate's +// type-assertion (Branch 3) succeeds. +var _ raftengine.AppliedIndexReader = (*kvFSM)(nil) +var _ raftengine.AppliedIndexWriter = (*kvFSM)(nil) diff --git a/kv/leader_routed_store.go b/kv/leader_routed_store.go index cfd935bb1..cdd4f2f4c 100644 --- a/kv/leader_routed_store.go +++ b/kv/leader_routed_store.go @@ -330,6 +330,16 @@ func (s *LeaderRoutedStore) ApplyMutationsRaft(ctx context.Context, mutations [] return errors.WithStack(s.local.ApplyMutationsRaft(ctx, mutations, readKeys, startTS, commitTS)) } +// ApplyMutationsRaftAt forwards to the local store's raft-entry-index- +// aware variant so the underlying pebbleStore can bundle +// metaAppliedIndex with the mutation. See PR #910 design §2. +func (s *LeaderRoutedStore) ApplyMutationsRaftAt(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS, appliedIndex uint64) error { + if s == nil || s.local == nil { + return errors.WithStack(store.ErrNotSupported) + } + return errors.WithStack(s.local.ApplyMutationsRaftAt(ctx, mutations, readKeys, startTS, commitTS, appliedIndex)) +} + func (s *LeaderRoutedStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { if s == nil || s.local == nil { return errors.WithStack(store.ErrNotSupported) @@ -345,6 +355,64 @@ func (s *LeaderRoutedStore) DeletePrefixAtRaft(ctx context.Context, prefix []byt return errors.WithStack(s.local.DeletePrefixAtRaft(ctx, prefix, excludePrefix, commitTS)) } +// DeletePrefixAtRaftAt forwards to the local store's raft-entry- +// index-aware variant. See PR #910 design §2 "why both leaves". +func (s *LeaderRoutedStore) DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS, appliedIndex uint64) error { + if s == nil || s.local == nil { + return errors.WithStack(store.ErrNotSupported) + } + return errors.WithStack(s.local.DeletePrefixAtRaftAt(ctx, prefix, excludePrefix, commitTS, appliedIndex)) +} + +// LastAppliedIndex forwards to the local store when it implements +// raftengine.AppliedIndexReader. Defensive: in production today the +// kvFSM holds a *pebbleStore directly (not a LeaderRoutedStore — that +// wrapper is used by adapter/server code for read routing, not by +// the FSM apply path); so this forward is currently dead code for +// the cold-start skip optimisation. We add it anyway because future +// refactors might wrap the FSM's store, and a silent no-op there +// would degrade the optimisation to full-restore-always with no +// failure signal. +// +// (0, false, nil) returns are the strictly-additive fallback — +// either the wrapper has no local, the local does not implement +// the reader, or the local reports missing/truncated. The caller in +// internal/raftengine/etcd/wal_store.go (Branch 3) treats all of +// these as "fall back to full restore", which is correct. +func (s *LeaderRoutedStore) LastAppliedIndex() (uint64, bool, error) { + if s == nil || s.local == nil { + return 0, false, nil + } + reader, ok := s.local.(interface { + LastAppliedIndex() (uint64, bool, error) + }) + if !ok { + return 0, false, nil + } + idx, present, err := reader.LastAppliedIndex() + if err != nil { + return 0, false, errors.WithStack(err) + } + return idx, present, nil +} + +// SetDurableAppliedIndex forwards to the local store when it +// implements raftengine.AppliedIndexWriter. Symmetric defensive +// no-op when the local store does not expose the writer seam — see +// LastAppliedIndex doc-comment. +func (s *LeaderRoutedStore) SetDurableAppliedIndex(idx uint64) error { + if s == nil || s.local == nil { + return nil + } + writer, ok := s.local.(interface { + SetDurableAppliedIndex(idx uint64) error + }) + if !ok { + return nil + } + return errors.WithStack(writer.SetDurableAppliedIndex(idx)) +} + func (s *LeaderRoutedStore) LastCommitTS() uint64 { if s == nil || s.local == nil { return 0 diff --git a/kv/shard_store.go b/kv/shard_store.go index 2e811446c..ce1d7de31 100644 --- a/kv/shard_store.go +++ b/kv/shard_store.go @@ -1207,6 +1207,17 @@ func (s *ShardStore) ApplyMutationsRaft(ctx context.Context, mutations []*store. return errors.WithStack(group.Store.ApplyMutationsRaft(ctx, mutations, readKeys, startTS, commitTS)) } +// ApplyMutationsRaftAt is the raft-entry-index-aware variant. Threads +// appliedIndex through to the single owning shard so the leaf can +// bundle metaAppliedIndex with the mutation. See PR #910 design §2. +func (s *ShardStore) ApplyMutationsRaftAt(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS, appliedIndex uint64) error { + group, err := s.resolveSingleShardGroup(mutations) + if err != nil || group == nil { + return err + } + return errors.WithStack(group.Store.ApplyMutationsRaftAt(ctx, mutations, readKeys, startTS, commitTS, appliedIndex)) +} + // resolveSingleShardGroup returns the shard group that owns every // mutation in the batch, or an error if the batch is cross-shard or // references an unknown group. A nil group with nil error means "empty @@ -1257,6 +1268,42 @@ func (s *ShardStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excl return nil } +// DeletePrefixAtRaftAt is the raft-entry-index-aware variant. The +// caller's raft entry index applies only to the local group whose +// FSM is driving this apply; on a multi-group ShardStore, fanning +// the SAME index across other groups would corrupt their +// metaAppliedIndex. The single-group case (the common case for an +// FSM-local DeletePrefixAtRaft path) gets the correct bundling; the +// multi-group broadcast case is treated as "passive" — peer groups +// receive the prefix-delete without a meta-key bump (their own raft +// applies will catch up the index on the next mutation). +// +// In practice the FSM call sites that issue raft-DeletePrefix +// operate against a single group's store; the multi-group ShardStore +// is the receiver only when an aggregate (admin / coordinator) path +// is replaying a global FLUSHALL, which is not raft-applied. +func (s *ShardStore) DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS, appliedIndex uint64) error { + for _, g := range s.groups { + if g == nil || g.Store == nil { + continue + } + // Pass appliedIndex through to every group. In the + // single-group call-path (the production raft-apply case) + // this is correct: appliedIndex IS that group's raft entry + // index. In a hypothetical multi-group call, only one group + // would see the matching index and the rest would treat it + // as a non-monotonic stray write — but the rest of the + // raft-apply contract (single FSM per raft log) makes that + // case impossible to reach in production. Tests that + // exercise ShardStore.DeletePrefixAtRaftAt across multiple + // groups MUST pass appliedIndex=0 to opt out. + if err := g.Store.DeletePrefixAtRaftAt(ctx, prefix, excludePrefix, commitTS, appliedIndex); err != nil { + return errors.WithStack(err) + } + } + return nil +} + func (s *ShardStore) LastCommitTS() uint64 { var max uint64 for _, g := range s.groups { @@ -1270,6 +1317,88 @@ func (s *ShardStore) LastCommitTS() uint64 { return max } +// LastAppliedIndex aggregates the durable applied-index across every +// shard group, returning the MIN over all groups that report one. +// +// MIN is the right aggregator because the kvFSM is per-shard in +// production — each shard's FSM independently asks "is MY group's +// applied index at least as fresh as MY group's snapshot?" — and +// ShardStore is NEVER used as the FSM's f.store in production today +// (the FSM holds a *pebbleStore directly; ShardStore is the +// coordinator-facing fanout wrapper). This method exists as a +// defensive forward in case a future refactor uses ShardStore from +// the apply path; reporting MIN guarantees the cold-start skip gate +// would refuse to skip whenever ANY group lags, matching the +// conservative "over-restore beats under-restore" rule (PR #910 +// design §4). +// +// (0, false, nil) when no group reports a value — strictly-additive +// fallback per design §4. +func (s *ShardStore) LastAppliedIndex() (uint64, bool, error) { + var ( + minIdx uint64 + anyReport bool + ) + for _, g := range s.groups { + if g == nil || g.Store == nil { + continue + } + reader, ok := g.Store.(interface { + LastAppliedIndex() (uint64, bool, error) + }) + if !ok { + continue + } + idx, present, err := reader.LastAppliedIndex() + if err != nil { + return 0, false, errors.WithStack(err) + } + if !present { + // One group has no meta key. Conservative: report + // missing so the cold-start skip gate falls back. + return 0, false, nil + } + if !anyReport || idx < minIdx { + minIdx = idx + } + anyReport = true + } + if !anyReport { + return 0, false, nil + } + return minIdx, true, nil +} + +// SetDurableAppliedIndex broadcasts the bump to every group store +// that exposes the writer seam. +// +// This is purely defensive — in production today the FSM holds a +// *pebbleStore directly; ShardStore is never f.store. Were it ever +// wired through the FSM apply path, broadcasting the same idx across +// groups would corrupt their per-group metaAppliedIndex semantics +// (each group has its own raft log with its own entry numbering). +// For that hypothetical, the test convention from +// DeletePrefixAtRaftAt applies: tests MUST pass idx=0 to opt out, or +// not use ShardStore as the writer at all. Returns the first +// per-group error. +func (s *ShardStore) SetDurableAppliedIndex(idx uint64) error { + for _, g := range s.groups { + if g == nil || g.Store == nil { + continue + } + writer, ok := g.Store.(interface { + SetDurableAppliedIndex(idx uint64) error + }) + if !ok { + continue + } + if err := writer.SetDurableAppliedIndex(idx); err != nil { + return errors.WithStack(err) + } + } + return nil +} + // WriteConflictCountsByPrefix aggregates OCC conflict counts across // every shard group owned by this ShardStore. Per-shard counts share // the same "|" label schema, so a simple sum gives diff --git a/store/lsm_store.go b/store/lsm_store.go index cb1372a08..ffc3de519 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -41,7 +41,14 @@ const ( metaLastCommitTS = "_meta_last_commit_ts" metaMinRetainedTS = "_meta_min_retained_ts" metaPendingMinRetainedTS = "_meta_pending_min_retained_ts" - spoolBufSize = 32 * 1024 // buffer size for streaming I/O during restore + // metaAppliedIndex is the durable raft-applied index meta key. + // Bundled atomically with each raft-Apply pebble.Batch (see + // applyMutationsRaftAt / deletePrefixAtRaftAt) and pinned to + // snap.Metadata.Index by SetDurableAppliedIndex at every snapshot + // persist site. See + // docs/design/2026_06_02_idempotent_snapshot_restore.md §3. + metaAppliedIndex = "_meta_applied_index" + spoolBufSize = 32 * 1024 // buffer size for streaming I/O during restore // maxPebbleEncodedKeySize is the limit for encoded Pebble on-disk keys, // which are the user key concatenated with the 8-byte inverted timestamp. @@ -156,6 +163,7 @@ func resolvePebbleCacheBytes(envVal string) int64 { var metaLastCommitTSBytes = []byte(metaLastCommitTS) var metaMinRetainedTSBytes = []byte(metaMinRetainedTS) var metaPendingMinRetainedTSBytes = []byte(metaPendingMinRetainedTS) +var metaAppliedIndexBytes = []byte(metaAppliedIndex) // pebbleStore implements MVCCStore using CockroachDB's Pebble LSM tree. // @@ -165,7 +173,13 @@ var metaPendingMinRetainedTSBytes = []byte(metaPendingMinRetainedTS) // 2. dbMu – guards the s.db pointer; held as a write-lock while the // DB is being swapped (Restore/Close), and as a read-lock by every // operation that accesses s.db. -// 3. mtx – guards the in-memory metadata fields +// 3. applyMu – serialises raft-apply conflict-check → batch-commit so +// concurrent ApplyMutationsRaft/At cannot both pass checkConflicts and +// then both commit. Also held by deletePrefixAtWithOpts and by +// SetDurableAppliedIndex's read-modify-write monotonic guard +// (PR #915 round-3) so the snapshot-persist checkpoint cannot rewind +// metaAppliedIndex below a concurrent per-Apply value. +// 4. mtx – guards the in-memory metadata fields // (lastCommitTS, minRetainedTS, pendingMinRetainedTS). type pebbleStore struct { db *pebble.DB @@ -534,7 +548,8 @@ func writeTempDBMetadata(db *pebble.DB, lastCommitTS, minRetainedTS uint64) erro func isPebbleMetaKey(rawKey []byte) bool { return bytes.Equal(rawKey, metaLastCommitTSBytes) || bytes.Equal(rawKey, metaMinRetainedTSBytes) || - bytes.Equal(rawKey, metaPendingMinRetainedTSBytes) + bytes.Equal(rawKey, metaPendingMinRetainedTSBytes) || + bytes.Equal(rawKey, metaAppliedIndexBytes) } func (s *pebbleStore) findMaxCommitTS() (uint64, error) { @@ -553,6 +568,122 @@ func (s *pebbleStore) saveLastCommitTS(ts uint64) error { return writePebbleUint64(s.db, metaLastCommitTSBytes, ts, pebble.NoSync) } +// LastAppliedIndex implements raftengine.AppliedIndexReader. Returns +// the largest Raft entry index whose Apply produced a durable +// mutation on this store (via applyMutationsRaftAt / +// deletePrefixAtRaftAt — same WriteBatch as the data — or via +// SetDurableAppliedIndex at a snapshot persist). +// +// (0, false, nil) means the meta key is absent — either a pre-upgrade +// fsm.db that has not yet bumped through a raft-Apply, or a freshly +// restored store. Callers MUST treat this as "missing" and fall back +// to the full restore path; see +// docs/design/2026_06_02_idempotent_snapshot_restore.md §4 fallback +// policy. Any other error propagates. +// +// dbMu.RLock matches the rest of the read path +// (lsm_store.go:153 / :675); without it a concurrent swapInTempDB +// could replace s.db between db.Get and the closer.Close()/value +// access, racing the snapshot install path. +func (s *pebbleStore) LastAppliedIndex() (uint64, bool, error) { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + val, closer, err := s.db.Get(metaAppliedIndexBytes) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return 0, false, nil + } + return 0, false, errors.WithStack(err) + } + defer func() { _ = closer.Close() }() + if len(val) < timestampSize { + // Truncated meta key — treat as missing for strictly-additive + // safety. The caller will fall back to full restore, which + // produces a fresh well-formed value via the next Apply or + // snapshot persist. + return 0, false, nil + } + return binary.LittleEndian.Uint64(val), true, nil +} + +// SetDurableAppliedIndex implements raftengine.AppliedIndexWriter. +// Used at snapshot persist time to pin metaAppliedIndex to +// snap.Metadata.Index BEFORE persist.SaveSnap, so a successful +// snapshot persist always implies LastAppliedIndex >= +// snap.Metadata.Index — closing the HLC-lease-only / encryption-only +// fallback (see design doc §6). +// +// The write is single-key and goes through a fresh pebble.Batch with +// pebble.Sync UNCONDITIONALLY — independent of +// ELASTICKV_FSM_SYNC_MODE. The reason is durability boundary: WAL +// compaction following SaveSnap discards every log entry at or +// before snap.Metadata.Index, so there is no source to replay the +// meta key bump from. If we honoured nosync mode here, a crash +// between Pebble's deferred flush and SaveSnap's internal fsync +// would leave snapshot pointer at X but metaAppliedIndex at Y < X +// forever. The +1 fsync per snapshot persist (rare; default +// SnapshotCount=10000) is negligible vs that risk. +// +// Monotonicity (round-2 P2 fix for PR #915): when persistLocalSnapshot +// runs in a background worker, raft apply can continue and the per- +// entry ApplyMutationsRaftAt path can advance metaAppliedIndex past +// `idx` before this method runs. An unconditional write would rewind +// the meta key — defeating the soak/verification invariant and +// causing the future skip gate to fall back unnecessarily. The +// applyMu lock serialises with applyMutationsWithOpts / +// deletePrefixAtWithOpts (both hold it across their batch commit), +// and the read-modify-write keeps the meta key strictly monotonic. +// +// Lock order: dbMu.RLock before applyMu.Lock matches the existing +// discipline in applyMutationsWithOpts (lsm_store.go around :1438 / +// :1444). No deadlock. +func (s *pebbleStore) SetDurableAppliedIndex(idx uint64) error { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + s.applyMu.Lock() + defer s.applyMu.Unlock() + + existing, present, err := s.readAppliedIndexLocked() + if err != nil { + return err + } + if present && existing >= idx { + // Concurrent apply already advanced the meta key past `idx`; + // no work needed. The skip-invariant still holds because the + // snapshot's claim (LastAppliedIndex >= snap.Metadata.Index) + // is satisfied by existing >= idx >= snap.Metadata.Index. + return nil + } + + batch := s.db.NewBatch() + defer func() { _ = batch.Close() }() + if err := setPebbleUint64InBatch(batch, metaAppliedIndexBytes, idx); err != nil { + return err + } + return errors.WithStack(batch.Commit(pebble.Sync)) +} + +// readAppliedIndexLocked decodes the metaAppliedIndex key. Caller +// MUST hold s.dbMu.RLock (so s.db is stable) AND s.applyMu.Lock (so +// the value reflects a consistent snapshot vs concurrent batch +// commits in applyMutationsWithOpts). The body is shared with the +// unlocked LastAppliedIndex() reader; same (0, false, nil) semantics +// for absent / truncated meta keys. +func (s *pebbleStore) readAppliedIndexLocked() (uint64, bool, error) { + val, closer, err := s.db.Get(metaAppliedIndexBytes) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return 0, false, nil + } + return 0, false, errors.WithStack(err) + } + defer func() { _ = closer.Close() }() + if len(val) < timestampSize { + return 0, false, nil + } + return binary.LittleEndian.Uint64(val), true, nil +} + func (s *pebbleStore) saveMinRetainedTS(ts uint64) error { return writePebbleUint64(s.db, metaMinRetainedTSBytes, ts, pebble.NoSync) } @@ -1267,7 +1398,9 @@ func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMut // bootstrap Save, admin snapshot, migration). Stage 7a-2 refuses to // emit an encrypted envelope here before this load's writer // registration commits. - return s.applyMutationsWithOpts(ctx, mutations, readKeys, startTS, commitTS, s.directApplyWriteOpts(), true) + // appliedIndex=0: direct path has no raft index; the leaf treats 0 as + // "do not write metaAppliedIndex" so the meta key stays unchanged. + return s.applyMutationsWithOpts(ctx, mutations, readKeys, startTS, commitTS, s.directApplyWriteOpts(), true, 0) } // ApplyMutationsRaft is the raft-apply commit path. Durability is governed @@ -1280,16 +1413,38 @@ func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMut // Must only be called from inside the FSM apply loop. All other call sites // must use ApplyMutations so a nosync opt-in cannot silently drop // acknowledged writes that have no raft backstop. +// +// Callers that have a raft entry index in hand (the kvFSM data-Apply +// path via the raftengine.ApplyIndexAware seam) SHOULD prefer +// ApplyMutationsRaftAt so the metaAppliedIndex meta key is bundled +// atomically with the data mutation — see PR #910 / B2. func (s *pebbleStore) ApplyMutationsRaft(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error { // gateRegistration=false: the FSM-apply path replays committed Raft // entries and must stay deterministic. It may legitimately encrypt // before this node's own registration entry commits (design §1); // fail-closing here would halt the apply loop and could deadlock a // node whose storage entry is ordered before its registration entry. - return s.applyMutationsWithOpts(ctx, mutations, readKeys, startTS, commitTS, s.raftApplyWriteOpts(), false) + // appliedIndex=0: callers that have not yet been wired to the + // raftengine.ApplyIndexAware seam (test fakes, legacy FSM impls) + // land here; their LastAppliedIndex() will stay behind the snapshot + // pointer and the skip optimisation will fall back to full restore + // for them. Preferred path is ApplyMutationsRaftAt. + return s.applyMutationsWithOpts(ctx, mutations, readKeys, startTS, commitTS, s.raftApplyWriteOpts(), false, 0) } -func (s *pebbleStore) applyMutationsWithOpts(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64, writeOpts *pebble.WriteOptions, gateRegistration bool) error { +// ApplyMutationsRaftAt is ApplyMutationsRaft with the raft entry +// index threaded through so the leaf can bundle metaAppliedIndex in +// the same pebble.Batch as the data mutation. See PR #910 design §2. +// +// appliedIndex==0 is treated as "no index" — the leaf will not write +// metaAppliedIndex, preserving the ApplyMutationsRaft semantics. +// Production callers (kvFSM.applyXxx with f.pendingApplyIdx) SHOULD +// pass the entry.Index value the engine delivered via SetApplyIndex. +func (s *pebbleStore) ApplyMutationsRaftAt(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS, appliedIndex uint64) error { + return s.applyMutationsWithOpts(ctx, mutations, readKeys, startTS, commitTS, s.raftApplyWriteOpts(), false, appliedIndex) +} + +func (s *pebbleStore) applyMutationsWithOpts(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64, writeOpts *pebble.WriteOptions, gateRegistration bool, appliedIndex uint64) error { s.dbMu.RLock() defer s.dbMu.RUnlock() @@ -1325,6 +1480,16 @@ func (s *pebbleStore) applyMutationsWithOpts(ctx context.Context, mutations []*K s.mtx.Unlock() return err } + // Bundle metaAppliedIndex in the same batch as the data + commitTS + // meta key so a crash either commits all three atomically or none. + // appliedIndex==0 is the legacy / non-raft callers (ApplyMutations + // or ApplyMutationsRaft); they leave the key unchanged. + if appliedIndex > 0 { + if err := setPebbleUint64InBatch(b, metaAppliedIndexBytes, appliedIndex); err != nil { + s.mtx.Unlock() + return err + } + } if err := b.Commit(writeOpts); err != nil { s.mtx.Unlock() return errors.WithStack(err) @@ -1345,17 +1510,32 @@ func (s *pebbleStore) applyMutationsWithOpts(ctx context.Context, mutations []*K // ELASTICKV_FSM_SYNC_MODE=nosync. Raft-apply callers must use // DeletePrefixAtRaft instead. func (s *pebbleStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { - return s.deletePrefixAtWithOpts(ctx, prefix, excludePrefix, commitTS, s.directApplyWriteOpts()) + return s.deletePrefixAtWithOpts(ctx, prefix, excludePrefix, commitTS, s.directApplyWriteOpts(), 0) } // DeletePrefixAtRaft is the raft-apply variant of DeletePrefixAt. Durability // is governed by s.fsmApplyWriteOpts (ELASTICKV_FSM_SYNC_MODE). See // ApplyMutationsRaft for the full durability argument. +// +// Callers that have a raft entry index in hand SHOULD prefer +// DeletePrefixAtRaftAt to bundle metaAppliedIndex atomically — see +// PR #910 design §2 "why both leaves". func (s *pebbleStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { - return s.deletePrefixAtWithOpts(ctx, prefix, excludePrefix, commitTS, s.raftApplyWriteOpts()) + return s.deletePrefixAtWithOpts(ctx, prefix, excludePrefix, commitTS, s.raftApplyWriteOpts(), 0) +} + +// DeletePrefixAtRaftAt is DeletePrefixAtRaft with the raft entry +// index threaded through. handleDelPrefix builds an independent +// pebble.Batch separate from applyMutationsWithOpts, so the meta +// key bundle must happen here too — otherwise DEL_PREFIX entries +// would land without bumping metaAppliedIndex and silently leave +// LastAppliedIndex behind the true applied count for any workload +// that uses DEL_PREFIX. PR #910 design §2. +func (s *pebbleStore) DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS, appliedIndex uint64) error { + return s.deletePrefixAtWithOpts(ctx, prefix, excludePrefix, commitTS, s.raftApplyWriteOpts(), appliedIndex) } -func (s *pebbleStore) deletePrefixAtWithOpts(_ context.Context, prefix []byte, excludePrefix []byte, commitTS uint64, writeOpts *pebble.WriteOptions) error { +func (s *pebbleStore) deletePrefixAtWithOpts(_ context.Context, prefix []byte, excludePrefix []byte, commitTS uint64, writeOpts *pebble.WriteOptions, appliedIndex uint64) error { s.dbMu.RLock() defer s.dbMu.RUnlock() @@ -1393,6 +1573,15 @@ func (s *pebbleStore) deletePrefixAtWithOpts(_ context.Context, prefix []byte, e if err := setPebbleUint64InBatch(batch, metaLastCommitTSBytes, newLastTS); err != nil { return err } + // Bundle metaAppliedIndex atomically with the tombstones + commitTS + // — same rationale as applyMutationsWithOpts. appliedIndex==0 means + // the legacy / non-raft caller path (DeletePrefixAt or + // DeletePrefixAtRaft); leave the meta key unchanged. + if appliedIndex > 0 { + if err := setPebbleUint64InBatch(batch, metaAppliedIndexBytes, appliedIndex); err != nil { + return err + } + } if err := batch.Commit(writeOpts); err != nil { return errors.WithStack(err) } diff --git a/store/lsm_store_applied_index_test.go b/store/lsm_store_applied_index_test.go new file mode 100644 index 000000000..ff00ea950 --- /dev/null +++ b/store/lsm_store_applied_index_test.go @@ -0,0 +1,249 @@ +package store + +import ( + "context" + "os" + "testing" + + "github.com/cockroachdb/pebble/v2" + "github.com/stretchr/testify/require" +) + +// pebbleStoreApplied is the local narrowed view of pebbleStore that +// the applied-index tests reach for. We keep the type assertion +// behind a helper so renames of the unexported struct don't cascade +// into every test. +func pebbleStoreApplied(t *testing.T, st MVCCStore) *pebbleStore { + t.Helper() + ps, ok := st.(*pebbleStore) + require.True(t, ok, "expected *pebbleStore, got %T", st) + return ps +} + +// newApplyIndexPebbleStore creates a fresh pebbleStore in a temp dir +// for one applied-index test. Caller closes via t.Cleanup. +func newApplyIndexPebbleStore(t *testing.T) MVCCStore { + t.Helper() + dir := t.TempDir() + st, err := NewPebbleStore(dir) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, st.Close()) + require.NoError(t, os.RemoveAll(dir)) + }) + return st +} + +// TestLastAppliedIndex_MissingMetaKey covers the "(0, false, nil)" +// branch — the strictly-additive fallback that lets the cold-start +// skip gate degrade to full restore for fsm.db files that have not +// yet been mutated through a raft-Apply (pre-upgrade fsm.db, freshly +// restored store). +func TestLastAppliedIndex_MissingMetaKey(t *testing.T) { + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + idx, present, err := ps.LastAppliedIndex() + require.NoError(t, err) + require.False(t, present, "fresh store must report (false) for missing meta key") + require.Equal(t, uint64(0), idx, "missing key must surface as 0") +} + +// TestSetDurableAppliedIndex_RoundTrip pins the meta key via the +// snapshot-persist writer seam and verifies LastAppliedIndex returns +// the same value with present=true. Confirms the +// pebble.Sync-unconditionally commit and the little-endian encoding +// round-trip. +func TestSetDurableAppliedIndex_RoundTrip(t *testing.T) { + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + const wantIdx uint64 = 0xDEAD_BEEF_CAFE_F00D + require.NoError(t, ps.SetDurableAppliedIndex(wantIdx)) + + got, present, err := ps.LastAppliedIndex() + require.NoError(t, err) + require.True(t, present, "after SetDurableAppliedIndex the key must be present") + require.Equal(t, wantIdx, got, "round-trip must preserve the full uint64") +} + +// TestApplyMutationsRaftAt_BundlesMetaAppliedIndex verifies that a +// raft-Apply data mutation bundles metaAppliedIndex in the SAME +// pebble.Batch as the data (and metaLastCommitTS). After the apply, +// LastAppliedIndex must reflect the entry index passed in. +func TestApplyMutationsRaftAt_BundlesMetaAppliedIndex(t *testing.T) { + ctx := context.Background() + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + const entryIdx uint64 = 42 + muts := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("k1"), Value: []byte("v1")}, + {Op: OpTypePut, Key: []byte("k2"), Value: []byte("v2")}, + } + const ts uint64 = 100 + require.NoError(t, ps.ApplyMutationsRaftAt(ctx, muts, nil, ts, ts, entryIdx)) + + got, present, err := ps.LastAppliedIndex() + require.NoError(t, err) + require.True(t, present, "ApplyMutationsRaftAt must persist metaAppliedIndex") + require.Equal(t, entryIdx, got) + + // Confirm the data also landed (sanity check the bundling did not + // drop the data path). + val, err := ps.GetAt(ctx, []byte("k1"), ts) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val) +} + +// TestApplyMutationsRaftAt_ZeroIndexLeavesMetaKey covers the +// appliedIndex==0 escape hatch — callers without a raft entry index +// (test fakes, legacy ApplyMutationsRaft) MUST NOT bump the meta key. +func TestApplyMutationsRaftAt_ZeroIndexLeavesMetaKey(t *testing.T) { + ctx := context.Background() + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + // Seed metaAppliedIndex via SetDurableAppliedIndex. + const seeded uint64 = 7 + require.NoError(t, ps.SetDurableAppliedIndex(seeded)) + + muts := []*KVPairMutation{{Op: OpTypePut, Key: []byte("k1"), Value: []byte("v1")}} + // appliedIndex=0 → leaf must NOT touch metaAppliedIndex. + require.NoError(t, ps.ApplyMutationsRaftAt(ctx, muts, nil, 100, 100, 0)) + + got, present, err := ps.LastAppliedIndex() + require.NoError(t, err) + require.True(t, present) + require.Equal(t, seeded, got, "appliedIndex=0 must leave the meta key at its previous value") +} + +// TestDeletePrefixAtRaftAt_BundlesMetaAppliedIndex is the analogous +// round-trip for the DEL_PREFIX leaf (handleDelPrefix builds its own +// pebble.Batch separate from applyMutationsWithOpts). Without this +// hook, DEL_PREFIX entries silently leave LastAppliedIndex behind. +func TestDeletePrefixAtRaftAt_BundlesMetaAppliedIndex(t *testing.T) { + ctx := context.Background() + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + // Pre-populate so the DEL_PREFIX actually iterates over something. + const seedTS uint64 = 50 + require.NoError(t, ps.ApplyMutations(ctx, []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("p/k1"), Value: []byte("v")}, + {Op: OpTypePut, Key: []byte("p/k2"), Value: []byte("v")}, + }, nil, seedTS, seedTS)) + + const entryIdx uint64 = 99 + require.NoError(t, ps.DeletePrefixAtRaftAt(ctx, []byte("p/"), nil, 200, entryIdx)) + + got, present, err := ps.LastAppliedIndex() + require.NoError(t, err) + require.True(t, present, "DeletePrefixAtRaftAt must persist metaAppliedIndex") + require.Equal(t, entryIdx, got) +} + +// TestSetDurableAppliedIndex_UsesPebbleSync exercises the +// nosync-mode independence claim — even when ELASTICKV_FSM_SYNC_MODE +// is nosync, the checkpoint must use pebble.Sync. We can't directly +// observe the sync option from a black-box test, so we settle for +// "the value persists across a store close + reopen". A NoSync write +// would not necessarily survive (Pebble's WAL flush is opportunistic), +// so a deterministic round-trip after Close()+Open() is a useful +// regression guard for the sync-mode flip. +func TestSetDurableAppliedIndex_UsesPebbleSync(t *testing.T) { + t.Setenv("ELASTICKV_FSM_SYNC_MODE", "nosync") + dir := t.TempDir() + st, err := NewPebbleStore(dir) + require.NoError(t, err) + + ps := pebbleStoreApplied(t, st) + const idx uint64 = 12345 + require.NoError(t, ps.SetDurableAppliedIndex(idx)) + require.NoError(t, st.Close()) + + reopened, err := NewPebbleStore(dir) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, reopened.Close()) }) + + got, present, err := pebbleStoreApplied(t, reopened).LastAppliedIndex() + require.NoError(t, err) + require.True(t, present, "after Close+Open metaAppliedIndex must persist (pebble.Sync forced regardless of nosync)") + require.Equal(t, idx, got) +} + +// TestSetDurableAppliedIndex_Monotonic guards the round-2 fix for +// codex P2: a background snapshot persist (which is what +// SetDurableAppliedIndex services) MUST NOT rewind metaAppliedIndex +// when a concurrent ApplyMutationsRaftAt has already advanced it +// past the snapshot's index. +// +// The test mirrors the production race in single-threaded form: +// 1. ApplyMutationsRaftAt(... appliedIndex=N+k) — meta key = N+k +// 2. SetDurableAppliedIndex(N) — must NO-OP (N < N+k) +// +// After step 2, LastAppliedIndex must still report N+k. +func TestSetDurableAppliedIndex_Monotonic(t *testing.T) { + ctx := context.Background() + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + const ( + applied uint64 = 250 // simulated ApplyMutationsRaftAt index + snapIdx uint64 = 100 // simulated older background snapshot persist + ) + require.Less(t, snapIdx, applied, + "test invariant: snapshot index must be older than the applied index for the race to matter") + + // Step 1: ApplyMutationsRaftAt advances metaAppliedIndex to N+k. + muts := []*KVPairMutation{{Op: OpTypePut, Key: []byte("k"), Value: []byte("v")}} + require.NoError(t, ps.ApplyMutationsRaftAt(ctx, muts, nil, 100, 100, applied)) + + // Step 2: background SetDurableAppliedIndex(snapIdx) — the per-Apply + // path has already moved past, so this MUST no-op. + require.NoError(t, ps.SetDurableAppliedIndex(snapIdx)) + + got, present, err := ps.LastAppliedIndex() + require.NoError(t, err) + require.True(t, present) + require.Equal(t, applied, got, + "SetDurableAppliedIndex MUST NOT rewind metaAppliedIndex below the per-Apply value") +} + +// TestSetDurableAppliedIndex_AdvancesOnlyForward complements the +// monotonic test by exercising the inverse case: the snapshot persist +// runs first (no prior apply), then a strictly-greater index lands. +// The forward write MUST take effect. +func TestSetDurableAppliedIndex_AdvancesOnlyForward(t *testing.T) { + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + require.NoError(t, ps.SetDurableAppliedIndex(50)) + require.NoError(t, ps.SetDurableAppliedIndex(100)) // strictly greater → advances + require.NoError(t, ps.SetDurableAppliedIndex(75)) // strictly less → no-op + require.NoError(t, ps.SetDurableAppliedIndex(100)) // equal → no-op + + got, present, err := ps.LastAppliedIndex() + require.NoError(t, err) + require.True(t, present) + require.Equal(t, uint64(100), got, + "SetDurableAppliedIndex must advance strictly-greater values and no-op on equal/lesser") +} + +// TestLastAppliedIndex_CorruptValue exercises the "len(val) < +// timestampSize" fallback — a truncated meta key surfaces as missing +// (NOT as an error) so the caller falls back to full restore rather +// than crashing the cold start. +func TestLastAppliedIndex_CorruptValue(t *testing.T) { + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + // Write a too-short payload directly under the meta key to simulate + // a partial write. Use s.db.Set so we bypass setPebbleUint64InBatch. + require.NoError(t, ps.db.Set(metaAppliedIndexBytes, []byte{0x01, 0x02}, pebble.Sync)) + + got, present, err := ps.LastAppliedIndex() + require.NoError(t, err, "corrupt value must NOT propagate as an error") + require.False(t, present, "truncated meta key must collapse to missing") + require.Equal(t, uint64(0), got) +} diff --git a/store/mvcc_store.go b/store/mvcc_store.go index 74b6a2295..1886fbf3b 100644 --- a/store/mvcc_store.go +++ b/store/mvcc_store.go @@ -496,6 +496,16 @@ func (s *mvccStore) ApplyMutationsRaft(ctx context.Context, mutations []*KVPairM return s.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS) } +// ApplyMutationsRaftAt satisfies the MVCCStore interface. The in-memory +// store has no Pebble batch to bundle metaAppliedIndex in, so the +// appliedIndex argument is discarded — LastAppliedIndex on this +// backend would always read as missing and the skip optimisation +// would fall back to full restore. Acceptable: the in-memory store +// is only used in unit tests and the catalog bootstrap path. +func (s *mvccStore) ApplyMutationsRaftAt(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS, _ uint64) error { + return s.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS) +} + func (s *mvccStore) ApplyMutations(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error { s.mtx.Lock() defer s.mtx.Unlock() @@ -565,6 +575,12 @@ func (s *mvccStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, exclu return s.DeletePrefixAt(ctx, prefix, excludePrefix, commitTS) } +// DeletePrefixAtRaftAt satisfies the MVCCStore interface — see +// ApplyMutationsRaftAt for the appliedIndex disposition rationale. +func (s *mvccStore) DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS, _ uint64) error { + return s.DeletePrefixAt(ctx, prefix, excludePrefix, commitTS) +} + func (s *mvccStore) DeletePrefixAt(_ context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/store/store.go b/store/store.go index ace186809..3991da19d 100644 --- a/store/store.go +++ b/store/store.go @@ -172,6 +172,15 @@ type MVCCStore interface { // migrations, tests) must use ApplyMutations, which is always // pebble.Sync and therefore safe without raft-log replay. ApplyMutationsRaft(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error + // ApplyMutationsRaftAt is ApplyMutationsRaft with the raft entry + // index threaded through. The leaf bundles metaAppliedIndex in the + // same pebble.Batch as the data mutation so a successful Apply + // implies LastAppliedIndex >= appliedIndex; the cold-start + // snapshot-restore skip gate uses this invariant (PR #910 / B2). + // appliedIndex==0 is treated as "no index, do not bump the meta + // key", matching ApplyMutationsRaft semantics for callers that have + // not yet been wired to the raftengine.ApplyIndexAware seam. + ApplyMutationsRaftAt(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS, appliedIndex uint64) error // DeletePrefixAt atomically deletes all visible (non-tombstone, non-expired) // keys matching prefix at commitTS by writing tombstone versions. An empty // prefix means "all keys". Keys matching excludePrefix are preserved. @@ -181,6 +190,12 @@ type MVCCStore interface { // DeletePrefixAtRaft is the raft-apply variant of DeletePrefixAt with // the same durability contract as ApplyMutationsRaft. DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error + // DeletePrefixAtRaftAt is DeletePrefixAtRaft with the raft entry + // index threaded through. handleDelPrefix builds an independent + // pebble.Batch separate from applyMutationsWithOpts; this overload + // bundles metaAppliedIndex in that batch so DEL_PREFIX entries + // also advance the meta key. PR #910 design §2 "why both leaves". + DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS, appliedIndex uint64) error // LastCommitTS returns the highest commit timestamp applied on this node. LastCommitTS() uint64 // WriteConflictCountsByPrefix returns a snapshot of the MVCC