From 2339a6f26042f1fc9999f5d1bbd3015b71d24242 Mon Sep 17 00:00:00 2001 From: Yoshiaki Ueda Date: Wed, 3 Jun 2026 21:10:51 +0900 Subject: [PATCH 01/11] feat(raftengine): add AppliedIndexReader/Writer opt-in interfaces MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two new opt-in extensions to raftengine.StateMachine for the cold-start snapshot-restore skip optimisation (docs/design/2026_06_02_idempotent_snapshot_restore.md): - AppliedIndexReader.LastAppliedIndex() (uint64, bool, error) lets the engine query the FSM's durable applied-index for the fsmAlreadyAtIndex gate. (0, false, nil) means 'missing' and falls back to the full restore path. Any error MUST NOT abort cold start; the caller collapses (false, _, err) into 'restore'. - AppliedIndexWriter.SetDurableAppliedIndex(idx uint64) error lets the engine pin the FSM's durable applied-index to snap.Metadata.Index before calling persist.SaveSnap, so a successful snapshot persist always implies LastAppliedIndex >= snapshot.Metadata.Index. Implementations MUST use pebble.Sync (or equivalent) unconditionally — the checkpoint is the only durable carrier of the meta key at this point, because WAL compaction starts at the snapshot index after SaveSnap. Interface only; no implementations or wirings yet. Branch 2 of the design will: - Add metaAppliedIndex Pebble meta key + pebbleStore impls - Thread f.pendingApplyIdx through ApplyMutationsRaftAt / DeletePrefixAtRaftAt overloads into both batch-bundle sites - Wire kvFSM accessors that forward to the underlying pebbleStore - Hook persistCreatedSnapshot AND e.persistLocalSnapshotPayload (both snapshot persist sites) Refs PR #910 (design), part of Branch 2 implementation series. --- internal/raftengine/statemachine.go | 44 +++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/internal/raftengine/statemachine.go b/internal/raftengine/statemachine.go index 28086d2d3..65b6b45f6 100644 --- a/internal/raftengine/statemachine.go +++ b/internal/raftengine/statemachine.go @@ -46,3 +46,47 @@ type StateMachine interface { type ApplyIndexAware interface { SetApplyIndex(idx uint64) } + +// AppliedIndexReader is an OPTIONAL extension that lets the engine +// query the FSM's durable applied-index for the cold-start skip gate. +// See docs/design/2026_06_02_idempotent_snapshot_restore.md §3. +// +// The returned value MUST be the largest Raft entry index whose Apply +// produced a durable mutation on the FSM's primary store (i.e. the +// metaAppliedIndex Pebble meta key, bundled in the same WriteBatch +// as the data mutation). FSMs that cannot self-report return +// (0, false, nil) — the caller treats that as "missing" and falls +// back to the full restore path, preserving the strictly-additive +// invariant. +// +// Returning a non-nil error MUST NOT abort cold start. The +// fsmAlreadyAtIndex caller (restoreSnapshotState) intentionally +// collapses (false, _, err) to "fall back to restore" rather than +// surface the error, because over-restoring on a corrupt meta key is +// strictly safer than skipping incorrectly. +type AppliedIndexReader interface { + LastAppliedIndex() (uint64, bool, error) +} + +// AppliedIndexWriter is an OPTIONAL extension that lets the engine +// pin the FSM's durable applied-index to a known value at snapshot +// persist time. See docs/design/2026_06_02_idempotent_snapshot_restore.md +// §6 "HLC lease entries — checkpoint at snapshot persist". +// +// The engine calls SetDurableAppliedIndex(snap.Metadata.Index) +// before it calls persist.SaveSnap, so that on every successful +// snapshot persist the invariant `LastAppliedIndex >= +// snapshot.Metadata.Index` holds unconditionally — closing the +// HLC-lease-only / encryption-only fallback that would otherwise +// leave LastAppliedIndex stuck at the last data-Apply index. +// +// Implementations MUST persist the value with pebble.Sync (or the +// equivalent strong-durability flag for the backing store) +// regardless of ELASTICKV_FSM_SYNC_MODE. The checkpoint is the only +// durable carrier of metaAppliedIndex at this point — once +// persist.SaveSnap returns, WAL compaction discards every log entry +// at or before snap.Metadata.Index, so there is no source to replay +// the meta key bump from. +type AppliedIndexWriter interface { + SetDurableAppliedIndex(idx uint64) error +} From 525fc15277cade215db285786330fdb9564c2a88 Mon Sep 17 00:00:00 2001 From: Yoshiaki Ueda Date: Wed, 3 Jun 2026 21:12:36 +0900 Subject: [PATCH 02/11] feat(store): pebbleStore.LastAppliedIndex + SetDurableAppliedIndex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two new methods on pebbleStore for the cold-start snapshot-restore skip optimisation (PR #910 design, Branch 2 step 2): - metaAppliedIndex / metaAppliedIndexBytes — new Pebble meta key, 'sibling' to metaLastCommitTS in layout and locking discipline. Bundled in the same WriteBatch as the data mutation by the raft-Apply path (next commit). - isPebbleMetaKey extended to include the new key so the snapshot encoder skips it (matches the precedent for the other meta keys). - LastAppliedIndex() (uint64, bool, error) — read under dbMu.RLock. (0, false, nil) for absent / truncated meta key so callers fall back to the full restore path. ErrNotFound is the expected outcome on a pre-upgrade fsm.db or freshly restored store; anything else propagates as a real I/O error (the caller in wal_store.go fsmAlreadyAtIndex collapses to 'false → restore' regardless, but this layer surfaces the typed error so other consumers can distinguish). - SetDurableAppliedIndex(idx) error — write under dbMu.RLock with pebble.Sync UNCONDITIONALLY (even under ELASTICKV_FSM_SYNC_MODE= nosync). Rationale documented inline: the snapshot-persist checkpoint is the only durable carrier of the meta key at this point because WAL compaction following persist.SaveSnap discards every log entry <= snap.Metadata.Index — so an OS-buffered Pebble flush dropped on crash would leave metaAppliedIndex behind the snapshot pointer permanently. +1 fsync per snapshot persist (rare; default SnapshotCount=10000) is the right price. These are the pebbleStore-side implementations of raftengine.AppliedIndexReader and raftengine.AppliedIndexWriter added in the preceding commit. The kvFSM accessors that delegate to these will land in a follow-up step alongside the engine hook sites. Refs PR #910 (design), part of Branch 2 implementation series. --- store/lsm_store.go | 84 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 83 insertions(+), 1 deletion(-) diff --git a/store/lsm_store.go b/store/lsm_store.go index cb1372a08..d502aa5d8 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -41,6 +41,13 @@ const ( metaLastCommitTS = "_meta_last_commit_ts" metaMinRetainedTS = "_meta_min_retained_ts" metaPendingMinRetainedTS = "_meta_pending_min_retained_ts" + // metaAppliedIndex is the durable raft-applied index meta key. + // Bundled atomically with each raft-Apply pebble.Batch (see + // applyMutationsRaftAt / deletePrefixAtRaftAt) and pinned to + // snap.Metadata.Index by SetDurableAppliedIndex at every snapshot + // persist site. See + // docs/design/2026_06_02_idempotent_snapshot_restore.md §3. + metaAppliedIndex = "_meta_applied_index" spoolBufSize = 32 * 1024 // buffer size for streaming I/O during restore // maxPebbleEncodedKeySize is the limit for encoded Pebble on-disk keys, @@ -156,6 +163,7 @@ func resolvePebbleCacheBytes(envVal string) int64 { var metaLastCommitTSBytes = []byte(metaLastCommitTS) var metaMinRetainedTSBytes = []byte(metaMinRetainedTS) var metaPendingMinRetainedTSBytes = []byte(metaPendingMinRetainedTS) +var metaAppliedIndexBytes = []byte(metaAppliedIndex) // pebbleStore implements MVCCStore using CockroachDB's Pebble LSM tree. // @@ -534,7 +542,8 @@ func writeTempDBMetadata(db *pebble.DB, lastCommitTS, minRetainedTS uint64) erro func isPebbleMetaKey(rawKey []byte) bool { return bytes.Equal(rawKey, metaLastCommitTSBytes) || bytes.Equal(rawKey, metaMinRetainedTSBytes) || - bytes.Equal(rawKey, metaPendingMinRetainedTSBytes) + bytes.Equal(rawKey, metaPendingMinRetainedTSBytes) || + bytes.Equal(rawKey, metaAppliedIndexBytes) } func (s *pebbleStore) findMaxCommitTS() (uint64, error) { @@ -553,6 +562,79 @@ func (s *pebbleStore) saveLastCommitTS(ts uint64) error { return writePebbleUint64(s.db, metaLastCommitTSBytes, ts, pebble.NoSync) } +// LastAppliedIndex implements raftengine.AppliedIndexReader. Returns +// the largest Raft entry index whose Apply produced a durable +// mutation on this store (via applyMutationsRaftAt / +// deletePrefixAtRaftAt — same WriteBatch as the data — or via +// SetDurableAppliedIndex at a snapshot persist). +// +// (0, false, nil) means the meta key is absent — either a pre-upgrade +// fsm.db that has not yet bumped through a raft-Apply, or a freshly +// restored store. Callers MUST treat this as "missing" and fall back +// to the full restore path; see +// docs/design/2026_06_02_idempotent_snapshot_restore.md §4 fallback +// policy. Any other error propagates. +// +// dbMu.RLock matches the rest of the read path +// (lsm_store.go:153 / :675); without it a concurrent swapInTempDB +// could replace s.db between db.Get and the closer.Close()/value +// access, racing the snapshot install path. +func (s *pebbleStore) LastAppliedIndex() (uint64, bool, error) { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + val, closer, err := s.db.Get(metaAppliedIndexBytes) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return 0, false, nil + } + return 0, false, errors.WithStack(err) + } + defer func() { _ = closer.Close() }() + if len(val) < timestampSize { + // Truncated meta key — treat as missing for strictly-additive + // safety. The caller will fall back to full restore, which + // produces a fresh well-formed value via the next Apply or + // snapshot persist. + return 0, false, nil + } + return binary.LittleEndian.Uint64(val), true, nil +} + +// SetDurableAppliedIndex implements raftengine.AppliedIndexWriter. +// Used at snapshot persist time to pin metaAppliedIndex to +// snap.Metadata.Index BEFORE persist.SaveSnap, so a successful +// snapshot persist always implies LastAppliedIndex >= +// snap.Metadata.Index — closing the HLC-lease-only / encryption-only +// fallback (see design doc §6). +// +// The write is single-key and goes through a fresh pebble.Batch with +// pebble.Sync UNCONDITIONALLY — independent of +// ELASTICKV_FSM_SYNC_MODE. The reason is durability boundary: WAL +// compaction following SaveSnap discards every log entry at or +// before snap.Metadata.Index, so there is no source to replay the +// meta key bump from. If we honoured nosync mode here, a crash +// between Pebble's deferred flush and SaveSnap's internal fsync +// would leave snapshot pointer at X but metaAppliedIndex at Y < X +// forever. The +1 fsync per snapshot persist (rare; default +// SnapshotCount=10000) is negligible vs that risk. +// +// dbMu.RLock is sufficient because we only mutate s.db, not its +// pointer. The raft-apply loop is serial at the engine boundary, so +// no concurrent applyMutationsWithOpts can race this single-key +// write at the Pebble level beyond what last-writer-wins already +// gives us (and last-writer-wins on metaAppliedIndex always +// satisfies have >= want for any want we already passed). +func (s *pebbleStore) SetDurableAppliedIndex(idx uint64) error { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + batch := s.db.NewBatch() + defer func() { _ = batch.Close() }() + if err := setPebbleUint64InBatch(batch, metaAppliedIndexBytes, idx); err != nil { + return err + } + return errors.WithStack(batch.Commit(pebble.Sync)) +} + func (s *pebbleStore) saveMinRetainedTS(ts uint64) error { return writePebbleUint64(s.db, metaMinRetainedTSBytes, ts, pebble.NoSync) } From aa9b8acc5b4b103732909d4c83bd716ef15d3544 Mon Sep 17 00:00:00 2001 From: Yoshiaki Ueda Date: Wed, 3 Jun 2026 21:18:17 +0900 Subject: [PATCH 03/11] feat(store/kv): ApplyMutationsRaftAt + DeletePrefixAtRaftAt overloads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Plumbing for the raft entry index to reach the leaf pebble.Batch so metaAppliedIndex can be bundled atomically with the data mutation (PR #910 design §2, Branch 2 step 3). Public surface (store.MVCCStore extension): - ApplyMutationsRaftAt(ctx, muts, readKeys, startTS, commitTS, appliedIndex) - DeletePrefixAtRaftAt(ctx, prefix, exclude, commitTS, appliedIndex) appliedIndex==0 means 'no index, do not bump the meta key' so existing callers that still use the non-At variants keep their current behaviour byte-for-byte. Implementations: - pebbleStore: bundles setPebbleUint64InBatch(metaAppliedIndex) in both applyMutationsWithOpts and deletePrefixAtWithOpts when appliedIndex > 0. +16 bytes per batch, zero extra fsync. - mvccStore: in-memory store discards appliedIndex. LastAppliedIndex on this backend would always be 'missing'; OK because it is only used by tests / catalog bootstrap. - LeaderRoutedStore: forwards to local. - ShardStore.ApplyMutationsRaftAt: routes to single owning shard. - ShardStore.DeletePrefixAtRaftAt: broadcasts to every group; the single-group production raft-apply path gets correct bundling; the multi-group broadcast case is admin-driven FLUSHALL only, never raft-applied. No call sites are switched over yet; the next commit threads f.pendingApplyIdx into the kvFSM data-Apply path through these overloads. Tests: go vet ./store/ ./kv/ ./internal/raftengine/... clean; go test ./store/ -short ok 29.4s. Refs PR #910 (design), part of Branch 2 implementation series. --- kv/leader_routed_store.go | 19 +++++++++++ kv/shard_store.go | 47 ++++++++++++++++++++++++++ store/lsm_store.go | 70 +++++++++++++++++++++++++++++++++++---- store/mvcc_store.go | 16 +++++++++ store/store.go | 15 +++++++++ 5 files changed, 161 insertions(+), 6 deletions(-) diff --git a/kv/leader_routed_store.go b/kv/leader_routed_store.go index cfd935bb1..9c3643abc 100644 --- a/kv/leader_routed_store.go +++ b/kv/leader_routed_store.go @@ -330,6 +330,16 @@ func (s *LeaderRoutedStore) ApplyMutationsRaft(ctx context.Context, mutations [] return errors.WithStack(s.local.ApplyMutationsRaft(ctx, mutations, readKeys, startTS, commitTS)) } +// ApplyMutationsRaftAt forwards to the local store's raft-entry-index- +// aware variant so the underlying pebbleStore can bundle +// metaAppliedIndex with the mutation. See PR #910 design §2. +func (s *LeaderRoutedStore) ApplyMutationsRaftAt(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS, appliedIndex uint64) error { + if s == nil || s.local == nil { + return errors.WithStack(store.ErrNotSupported) + } + return errors.WithStack(s.local.ApplyMutationsRaftAt(ctx, mutations, readKeys, startTS, commitTS, appliedIndex)) +} + func (s *LeaderRoutedStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { if s == nil || s.local == nil { return errors.WithStack(store.ErrNotSupported) @@ -345,6 +355,15 @@ func (s *LeaderRoutedStore) DeletePrefixAtRaft(ctx context.Context, prefix []byt return errors.WithStack(s.local.DeletePrefixAtRaft(ctx, prefix, excludePrefix, commitTS)) } +// DeletePrefixAtRaftAt forwards to the local store's raft-entry- +// index-aware variant. See PR #910 design §2 "why both leaves". +func (s *LeaderRoutedStore) DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS, appliedIndex uint64) error { + if s == nil || s.local == nil { + return errors.WithStack(store.ErrNotSupported) + } + return errors.WithStack(s.local.DeletePrefixAtRaftAt(ctx, prefix, excludePrefix, commitTS, appliedIndex)) +} + func (s *LeaderRoutedStore) LastCommitTS() uint64 { if s == nil || s.local == nil { return 0 diff --git a/kv/shard_store.go b/kv/shard_store.go index 2e811446c..85b28db7f 100644 --- a/kv/shard_store.go +++ b/kv/shard_store.go @@ -1207,6 +1207,17 @@ func (s *ShardStore) ApplyMutationsRaft(ctx context.Context, mutations []*store. return errors.WithStack(group.Store.ApplyMutationsRaft(ctx, mutations, readKeys, startTS, commitTS)) } +// ApplyMutationsRaftAt is the raft-entry-index-aware variant. Threads +// appliedIndex through to the single owning shard so the leaf can +// bundle metaAppliedIndex with the mutation. See PR #910 design §2. +func (s *ShardStore) ApplyMutationsRaftAt(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS, appliedIndex uint64) error { + group, err := s.resolveSingleShardGroup(mutations) + if err != nil || group == nil { + return err + } + return errors.WithStack(group.Store.ApplyMutationsRaftAt(ctx, mutations, readKeys, startTS, commitTS, appliedIndex)) +} + // resolveSingleShardGroup returns the shard group that owns every // mutation in the batch, or an error if the batch is cross-shard or // references an unknown group. A nil group with nil error means "empty @@ -1257,6 +1268,42 @@ func (s *ShardStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excl return nil } +// DeletePrefixAtRaftAt is the raft-entry-index-aware variant. The +// caller's raft entry index applies only to the local group whose +// FSM is driving this apply; on a multi-group ShardStore, fanning +// the SAME index across other groups would corrupt their +// metaAppliedIndex. The single-group case (the common case for an +// FSM-local DeletePrefixAtRaft path) gets the correct bundling; the +// multi-group broadcast case is treated as "passive" — peer groups +// receive the prefix-delete without a meta-key bump (their own raft +// applies will catch up the index on the next mutation). +// +// In practice the FSM call sites that issue raft-DeletePrefix +// operate against a single group's store; the multi-group ShardStore +// is the receiver only when an aggregate (admin / coordinator) path +// is replaying a global FLUSHALL, which is not raft-applied. +func (s *ShardStore) DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS, appliedIndex uint64) error { + for _, g := range s.groups { + if g == nil || g.Store == nil { + continue + } + // Pass appliedIndex through to every group. In the + // single-group call-path (the production raft-apply case) + // this is correct: appliedIndex IS that group's raft entry + // index. In a hypothetical multi-group call, only one group + // would see the matching index and the rest would treat it + // as a non-monotonic stray write — but the rest of the + // raft-apply contract (single FSM per raft log) makes that + // case impossible to reach in production. Tests that + // exercise ShardStore.DeletePrefixAtRaftAt across multiple + // groups MUST pass appliedIndex=0 to opt out. + if err := g.Store.DeletePrefixAtRaftAt(ctx, prefix, excludePrefix, commitTS, appliedIndex); err != nil { + return errors.WithStack(err) + } + } + return nil +} + func (s *ShardStore) LastCommitTS() uint64 { var max uint64 for _, g := range s.groups { diff --git a/store/lsm_store.go b/store/lsm_store.go index d502aa5d8..2fe612ee3 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -1349,7 +1349,9 @@ func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMut // bootstrap Save, admin snapshot, migration). Stage 7a-2 refuses to // emit an encrypted envelope here before this load's writer // registration commits. - return s.applyMutationsWithOpts(ctx, mutations, readKeys, startTS, commitTS, s.directApplyWriteOpts(), true) + // appliedIndex=0: direct path has no raft index; the leaf treats 0 as + // "do not write metaAppliedIndex" so the meta key stays unchanged. + return s.applyMutationsWithOpts(ctx, mutations, readKeys, startTS, commitTS, s.directApplyWriteOpts(), true, 0) } // ApplyMutationsRaft is the raft-apply commit path. Durability is governed @@ -1362,16 +1364,38 @@ func (s *pebbleStore) ApplyMutations(ctx context.Context, mutations []*KVPairMut // Must only be called from inside the FSM apply loop. All other call sites // must use ApplyMutations so a nosync opt-in cannot silently drop // acknowledged writes that have no raft backstop. +// +// Callers that have a raft entry index in hand (the kvFSM data-Apply +// path via the raftengine.ApplyIndexAware seam) SHOULD prefer +// ApplyMutationsRaftAt so the metaAppliedIndex meta key is bundled +// atomically with the data mutation — see PR #910 / B2. func (s *pebbleStore) ApplyMutationsRaft(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error { // gateRegistration=false: the FSM-apply path replays committed Raft // entries and must stay deterministic. It may legitimately encrypt // before this node's own registration entry commits (design §1); // fail-closing here would halt the apply loop and could deadlock a // node whose storage entry is ordered before its registration entry. - return s.applyMutationsWithOpts(ctx, mutations, readKeys, startTS, commitTS, s.raftApplyWriteOpts(), false) + // appliedIndex=0: callers that have not yet been wired to the + // raftengine.ApplyIndexAware seam (test fakes, legacy FSM impls) + // land here; their LastAppliedIndex() will stay behind the snapshot + // pointer and the skip optimisation will fall back to full restore + // for them. Preferred path is ApplyMutationsRaftAt. + return s.applyMutationsWithOpts(ctx, mutations, readKeys, startTS, commitTS, s.raftApplyWriteOpts(), false, 0) +} + +// ApplyMutationsRaftAt is ApplyMutationsRaft with the raft entry +// index threaded through so the leaf can bundle metaAppliedIndex in +// the same pebble.Batch as the data mutation. See PR #910 design §2. +// +// appliedIndex==0 is treated as "no index" — the leaf will not write +// metaAppliedIndex, preserving the ApplyMutationsRaft semantics. +// Production callers (kvFSM.applyXxx with f.pendingApplyIdx) SHOULD +// pass the entry.Index value the engine delivered via SetApplyIndex. +func (s *pebbleStore) ApplyMutationsRaftAt(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS, appliedIndex uint64) error { + return s.applyMutationsWithOpts(ctx, mutations, readKeys, startTS, commitTS, s.raftApplyWriteOpts(), false, appliedIndex) } -func (s *pebbleStore) applyMutationsWithOpts(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64, writeOpts *pebble.WriteOptions, gateRegistration bool) error { +func (s *pebbleStore) applyMutationsWithOpts(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64, writeOpts *pebble.WriteOptions, gateRegistration bool, appliedIndex uint64) error { s.dbMu.RLock() defer s.dbMu.RUnlock() @@ -1407,6 +1431,16 @@ func (s *pebbleStore) applyMutationsWithOpts(ctx context.Context, mutations []*K s.mtx.Unlock() return err } + // Bundle metaAppliedIndex in the same batch as the data + commitTS + // meta key so a crash either commits all three atomically or none. + // appliedIndex==0 is the legacy / non-raft callers (ApplyMutations + // or ApplyMutationsRaft); they leave the key unchanged. + if appliedIndex > 0 { + if err := setPebbleUint64InBatch(b, metaAppliedIndexBytes, appliedIndex); err != nil { + s.mtx.Unlock() + return err + } + } if err := b.Commit(writeOpts); err != nil { s.mtx.Unlock() return errors.WithStack(err) @@ -1427,17 +1461,32 @@ func (s *pebbleStore) applyMutationsWithOpts(ctx context.Context, mutations []*K // ELASTICKV_FSM_SYNC_MODE=nosync. Raft-apply callers must use // DeletePrefixAtRaft instead. func (s *pebbleStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { - return s.deletePrefixAtWithOpts(ctx, prefix, excludePrefix, commitTS, s.directApplyWriteOpts()) + return s.deletePrefixAtWithOpts(ctx, prefix, excludePrefix, commitTS, s.directApplyWriteOpts(), 0) } // DeletePrefixAtRaft is the raft-apply variant of DeletePrefixAt. Durability // is governed by s.fsmApplyWriteOpts (ELASTICKV_FSM_SYNC_MODE). See // ApplyMutationsRaft for the full durability argument. +// +// Callers that have a raft entry index in hand SHOULD prefer +// DeletePrefixAtRaftAt to bundle metaAppliedIndex atomically — see +// PR #910 design §2 "why both leaves". func (s *pebbleStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { - return s.deletePrefixAtWithOpts(ctx, prefix, excludePrefix, commitTS, s.raftApplyWriteOpts()) + return s.deletePrefixAtWithOpts(ctx, prefix, excludePrefix, commitTS, s.raftApplyWriteOpts(), 0) +} + +// DeletePrefixAtRaftAt is DeletePrefixAtRaft with the raft entry +// index threaded through. handleDelPrefix builds an independent +// pebble.Batch separate from applyMutationsWithOpts, so the meta +// key bundle must happen here too — otherwise DEL_PREFIX entries +// would land without bumping metaAppliedIndex and silently leave +// LastAppliedIndex behind the true applied count for any workload +// that uses DEL_PREFIX. PR #910 design §2. +func (s *pebbleStore) DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS, appliedIndex uint64) error { + return s.deletePrefixAtWithOpts(ctx, prefix, excludePrefix, commitTS, s.raftApplyWriteOpts(), appliedIndex) } -func (s *pebbleStore) deletePrefixAtWithOpts(_ context.Context, prefix []byte, excludePrefix []byte, commitTS uint64, writeOpts *pebble.WriteOptions) error { +func (s *pebbleStore) deletePrefixAtWithOpts(_ context.Context, prefix []byte, excludePrefix []byte, commitTS uint64, writeOpts *pebble.WriteOptions, appliedIndex uint64) error { s.dbMu.RLock() defer s.dbMu.RUnlock() @@ -1475,6 +1524,15 @@ func (s *pebbleStore) deletePrefixAtWithOpts(_ context.Context, prefix []byte, e if err := setPebbleUint64InBatch(batch, metaLastCommitTSBytes, newLastTS); err != nil { return err } + // Bundle metaAppliedIndex atomically with the tombstones + commitTS + // — same rationale as applyMutationsWithOpts. appliedIndex==0 means + // the legacy / non-raft caller path (DeletePrefixAt or + // DeletePrefixAtRaft); leave the meta key unchanged. + if appliedIndex > 0 { + if err := setPebbleUint64InBatch(batch, metaAppliedIndexBytes, appliedIndex); err != nil { + return err + } + } if err := batch.Commit(writeOpts); err != nil { return errors.WithStack(err) } diff --git a/store/mvcc_store.go b/store/mvcc_store.go index 74b6a2295..1886fbf3b 100644 --- a/store/mvcc_store.go +++ b/store/mvcc_store.go @@ -496,6 +496,16 @@ func (s *mvccStore) ApplyMutationsRaft(ctx context.Context, mutations []*KVPairM return s.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS) } +// ApplyMutationsRaftAt satisfies the MVCCStore interface. The in-memory +// store has no Pebble batch to bundle metaAppliedIndex in, so the +// appliedIndex argument is discarded — LastAppliedIndex on this +// backend would always read as missing and the skip optimisation +// would fall back to full restore. Acceptable: the in-memory store +// is only used in unit tests and the catalog bootstrap path. +func (s *mvccStore) ApplyMutationsRaftAt(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS, _ uint64) error { + return s.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS) +} + func (s *mvccStore) ApplyMutations(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error { s.mtx.Lock() defer s.mtx.Unlock() @@ -565,6 +575,12 @@ func (s *mvccStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, exclu return s.DeletePrefixAt(ctx, prefix, excludePrefix, commitTS) } +// DeletePrefixAtRaftAt satisfies the MVCCStore interface — see +// ApplyMutationsRaftAt for the appliedIndex disposition rationale. +func (s *mvccStore) DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS, _ uint64) error { + return s.DeletePrefixAt(ctx, prefix, excludePrefix, commitTS) +} + func (s *mvccStore) DeletePrefixAt(_ context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/store/store.go b/store/store.go index ace186809..3991da19d 100644 --- a/store/store.go +++ b/store/store.go @@ -172,6 +172,15 @@ type MVCCStore interface { // migrations, tests) must use ApplyMutations, which is always // pebble.Sync and therefore safe without raft-log replay. ApplyMutationsRaft(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error + // ApplyMutationsRaftAt is ApplyMutationsRaft with the raft entry + // index threaded through. The leaf bundles metaAppliedIndex in the + // same pebble.Batch as the data mutation so a successful Apply + // implies LastAppliedIndex >= appliedIndex; the cold-start + // snapshot-restore skip gate uses this invariant (PR #910 / B2). + // appliedIndex==0 is treated as "no index, do not bump the meta + // key", matching ApplyMutationsRaft semantics for callers that have + // not yet been wired to the raftengine.ApplyIndexAware seam. + ApplyMutationsRaftAt(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS, appliedIndex uint64) error // DeletePrefixAt atomically deletes all visible (non-tombstone, non-expired) // keys matching prefix at commitTS by writing tombstone versions. An empty // prefix means "all keys". Keys matching excludePrefix are preserved. @@ -181,6 +190,12 @@ type MVCCStore interface { // DeletePrefixAtRaft is the raft-apply variant of DeletePrefixAt with // the same durability contract as ApplyMutationsRaft. DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error + // DeletePrefixAtRaftAt is DeletePrefixAtRaft with the raft entry + // index threaded through. handleDelPrefix builds an independent + // pebble.Batch separate from applyMutationsWithOpts; this overload + // bundles metaAppliedIndex in that batch so DEL_PREFIX entries + // also advance the meta key. PR #910 design §2 "why both leaves". + DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS, appliedIndex uint64) error // LastCommitTS returns the highest commit timestamp applied on this node. LastCommitTS() uint64 // WriteConflictCountsByPrefix returns a snapshot of the MVCC From 7cd72bdad3872e01ccccae00402d29a6577c5017 Mon Sep 17 00:00:00 2001 From: Yoshiaki Ueda Date: Wed, 3 Jun 2026 21:22:27 +0900 Subject: [PATCH 04/11] feat(kv/fsm): kvFSM AppliedIndexReader/Writer + thread pendingApplyIdx MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the raft applied-index seam through the kvFSM data-Apply path so metaAppliedIndex is bundled atomically with every data mutation (PR #910 design §2 + §3, Branch 2 step 4). New methods on kvFSM: - AppliedIndexReader() raftengine.AppliedIndexReader — exposes the underlying store's reader when it implements the seam (pebbleStore does; in-memory mvccStore does not). nil propagates to the fsmAlreadyAtIndex caller and triggers the conservative full-restore fallback. - SetDurableAppliedIndex(idx) error — forwards to the underlying store's writer seam (pebbleStore does pebble.Sync; the in-memory store's no-op default is preserved). The engine calls this at each snapshot persist site BEFORE persist.SaveSnap. Threaded f.pendingApplyIdx through every kv/fsm.go data-Apply leaf (7 call sites): - handleNonTxnRequest → ApplyMutationsRaftAt - handleDelPrefix → DeletePrefixAtRaftAt - handlePrepareRequest (single-shard fast path) - handleOnePhaseTxnRequest (with-readkeys path) - applyCommitWithIdempotencyFallback (commit path) - applyCommitWithIdempotencyFallback (idempotency replay fallback) - handleAbortRequest (rollback marker write) Each site passes f.pendingApplyIdx — the value the engine stashed via SetApplyIndex immediately before Apply through the raftengine.ApplyIndexAware seam. Engine-bypassing tests that drive Apply directly without setting the index will pass 0, which the leaf treats as 'do not bump the meta key' — keeping their behaviour byte-for-byte identical to before. Encryption opcode dispatch is unchanged: applyReservedOpcode still threads pendingApplyIdx into applyEncryption (its existing consumer for the WriteSidecar RaftAppliedIndex slot). The meta key bump above is purely additive on the data-Apply leaves; encryption-only entries continue to leave metaAppliedIndex unchanged (PR #910 design §6 'encryption opcodes' subsection). Tests: go vet ./kv/ ./store/ ./internal/raftengine/... clean; go test ./kv/ -short ok 10.4s. Refs PR #910 (design), part of Branch 2 implementation series. --- kv/fsm.go | 48 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/kv/fsm.go b/kv/fsm.go index 8cb3f0c4a..d51db42e7 100644 --- a/kv/fsm.go +++ b/kv/fsm.go @@ -129,6 +129,40 @@ func (f *kvFSM) SetApplyIndex(idx uint64) { f.pendingApplyIdx = idx } +// AppliedIndexReader implements raftengine.AppliedIndexReporter. It +// exposes the underlying store's durable applied-index when the +// store implements raftengine.AppliedIndexReader (pebbleStore does; +// the in-memory mvccStore does not, in which case (0, false, nil) +// from the missing-key path will land at the caller). nil means +// "not supported on this backend" and triggers the conservative +// full-restore fallback at the cold-start skip gate. See +// docs/design/2026_06_02_idempotent_snapshot_restore.md §3. +func (f *kvFSM) AppliedIndexReader() raftengine.AppliedIndexReader { + if r, ok := f.store.(raftengine.AppliedIndexReader); ok { + return r + } + return nil +} + +// SetDurableAppliedIndex implements raftengine.AppliedIndexWriter by +// forwarding to the underlying store when it supports the writer +// seam. Called by the engine at every snapshot persist site BEFORE +// persist.SaveSnap so a successful snapshot persist implies +// LastAppliedIndex >= snap.Metadata.Index, closing the HLC-lease- +// only / encryption-only fallback (PR #910 design §6). +// +// Returns nil silently when the backing store does not implement +// the writer seam (in-memory mvccStore, test fakes) — the skip +// optimisation simply degrades to "fall back to full restore" for +// those FSMs. +func (f *kvFSM) SetDurableAppliedIndex(idx uint64) error { + w, ok := f.store.(raftengine.AppliedIndexWriter) + if !ok { + return nil + } + return w.SetDurableAppliedIndex(idx) +} + type FSM interface { raftengine.StateMachine } @@ -455,7 +489,7 @@ func (f *kvFSM) handleRawRequest(ctx context.Context, r *pb.Request, commitTS ui } // Raw requests always commit against the latest state; use commitTS as both // the validation snapshot and the commit timestamp. - return errors.WithStack(f.store.ApplyMutationsRaft(ctx, muts, nil, commitTS, commitTS)) + return errors.WithStack(f.store.ApplyMutationsRaftAt(ctx, muts, nil, commitTS, commitTS, f.pendingApplyIdx)) } // extractDelPrefix checks if the mutations contain a DEL_PREFIX operation. @@ -472,7 +506,7 @@ func extractDelPrefix(muts []*pb.Mutation) (bool, []byte) { // handleDelPrefix delegates prefix deletion to the store. Transaction-internal // keys are always excluded to preserve transactional integrity. func (f *kvFSM) handleDelPrefix(ctx context.Context, prefix []byte, commitTS uint64) error { - return errors.WithStack(f.store.DeletePrefixAtRaft(ctx, prefix, txnCommonPrefix, commitTS)) + return errors.WithStack(f.store.DeletePrefixAtRaftAt(ctx, prefix, txnCommonPrefix, commitTS, f.pendingApplyIdx)) } var ErrNotImplemented = errors.New("not implemented") @@ -730,7 +764,7 @@ func (f *kvFSM) handlePrepareRequest(ctx context.Context, r *pb.Request) error { return err } - if err := f.store.ApplyMutationsRaft(ctx, storeMuts, r.ReadKeys, startTS, startTS); err != nil { + if err := f.store.ApplyMutationsRaftAt(ctx, storeMuts, r.ReadKeys, startTS, startTS, f.pendingApplyIdx); err != nil { return errors.WithStack(err) } return nil @@ -794,7 +828,7 @@ func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, com if err != nil { return err } - return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, r.ReadKeys, startTS, commitTS)) + return errors.WithStack(f.store.ApplyMutationsRaftAt(ctx, storeMuts, r.ReadKeys, startTS, commitTS, f.pendingApplyIdx)) } // dedupProbeOnePhase decides whether handleOnePhaseTxnRequest should no-op @@ -898,7 +932,7 @@ func (f *kvFSM) commitApplyStartTS(ctx context.Context, primaryKey []byte, start // The secondary-shard LatestCommitTS scan is intentionally deferred to the // write-conflict path so the hot (first-time) commit path pays no extra cost. func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMuts []*store.KVPairMutation, uniq []*pb.Mutation, applyStartTS, commitTS uint64) error { - err := f.store.ApplyMutationsRaft(ctx, storeMuts, nil, applyStartTS, commitTS) + err := f.store.ApplyMutationsRaftAt(ctx, storeMuts, nil, applyStartTS, commitTS, f.pendingApplyIdx) if err == nil { return nil } @@ -915,7 +949,7 @@ func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMut return errors.WithStack(lErr) } if exists && latestTS >= commitTS { - return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, nil, commitTS, commitTS)) + return errors.WithStack(f.store.ApplyMutationsRaftAt(ctx, storeMuts, nil, commitTS, commitTS, f.pendingApplyIdx)) } } return errors.WithStack(err) @@ -966,7 +1000,7 @@ func (f *kvFSM) handleAbortRequest(ctx context.Context, r *pb.Request, abortTS u if len(storeMuts) == 0 { return nil } - return errors.WithStack(f.store.ApplyMutationsRaft(ctx, storeMuts, nil, startTS, abortTS)) + return errors.WithStack(f.store.ApplyMutationsRaftAt(ctx, storeMuts, nil, startTS, abortTS, f.pendingApplyIdx)) } func (f *kvFSM) buildPrepareStoreMutations(ctx context.Context, muts []*pb.Mutation, primaryKey []byte, startTS, expireAt uint64) ([]*store.KVPairMutation, error) { From f1e8748c7ac38a26e55b87c72cbeecfd43413396 Mon Sep 17 00:00:00 2001 From: Yoshiaki Ueda Date: Wed, 3 Jun 2026 21:24:22 +0900 Subject: [PATCH 05/11] feat(raftengine/etcd): hook SetDurableAppliedIndex at both persist sites MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two new hooks pinning metaAppliedIndex to the snapshot index BEFORE the corresponding persist.SaveSnap call. After a successful snapshot persist, LastAppliedIndex >= snap.Metadata.Index holds unconditionally (PR #910 design §6, Branch 2 step 5). Site 1 - persistCreatedSnapshot (engine.go:2679) — drives config snapshots (membership-change snapshots). Site 2 - e.persistLocalSnapshotPayload (engine.go:4048) — the steady-state SnapshotCount-triggered hot path: maybePersistLocalSnapshot -> e.persistLocalSnapshotPayload (this method) -> free persistLocalSnapshotPayload (wal_store.go:519) -> persist.SaveSnap (wal_store.go:524) The hook lives in the engine wrapper, not the free function, so the free function stays signature-stable and the call holds e.snapshotMu.Lock() before invoking SetDurableAppliedIndex (serializing against follower-snapshot restore). Both sites call e.fsm.(raftengine.AppliedIndexWriter) and silently no-op when the FSM does not implement the seam (legacy test fakes, in-memory backends). pebble.Sync is forced on the writer side (lsm_store.SetDurableAppliedIndex inline comment) regardless of ELASTICKV_FSM_SYNC_MODE — the checkpoint is the only durable carrier of metaAppliedIndex at this point because WAL compaction following SaveSnap discards every entry <= snap.Metadata.Index. Crash ordering (matches design §6 table): the bump is fsynced before SaveSnap returns. The only observable states are (metaAppliedIndex, snapshot pointer) in {(Y, X'X, X)}. None of these can yield 'snapshot pointer = X but metaAppliedIndex < X'. Over-restore impossible; round-3 P2 + round-4 P2 closed. Tests: go vet ./internal/raftengine/etcd/ ./kv/ ./store/ clean; go test ./internal/raftengine/... -short ok 32.8s. Refs PR #910 (design), part of Branch 2 implementation series. --- internal/raftengine/etcd/engine.go | 34 ++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 4387f8d25..6535d268e 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -2680,6 +2680,22 @@ func (e *Engine) persistCreatedSnapshot(snap raftpb.Snapshot) error { if etcdraft.IsEmptySnap(snap) || e.persist == nil { return nil } + // Pin metaAppliedIndex to snap.Metadata.Index BEFORE SaveSnap so a + // successful snapshot persist always implies LastAppliedIndex >= + // snap.Metadata.Index — closes the HLC-lease-only / encryption-only + // fallback (PR #910 design §6). FSMs that do not expose + // raftengine.AppliedIndexWriter silently no-op; the skip + // optimisation falls back to full restore for them. pebble.Sync + // is forced on the writer side (see lsm_store.SetDurableAppliedIndex) + // regardless of ELASTICKV_FSM_SYNC_MODE — once SaveSnap returns, + // WAL compaction discards every log entry at or before + // snap.Metadata.Index, so there is no source to replay the meta + // key bump from. + if w, ok := e.fsm.(raftengine.AppliedIndexWriter); ok { + if err := w.SetDurableAppliedIndex(snap.Metadata.Index); err != nil { + return errors.WithStack(err) + } + } if err := e.persist.SaveSnap(snap); err != nil { return errors.WithStack(err) } @@ -4044,6 +4060,24 @@ func (e *Engine) persistLocalSnapshotPayload(index uint64, payload []byte) error return nil } + // Pin metaAppliedIndex to `index` BEFORE the free-function + // persistLocalSnapshotPayload (which calls persist.SaveSnap at + // wal_store.go:524). This is the steady-state SnapshotCount-triggered + // snapshot path — the hot path the cold-start skip optimisation + // depends on. Without this hook the round-3 P2 fallback (HLC + // leases / encryption ops keep snapshot.Metadata.Index ahead of the + // last data-Apply index forever) recurs permanently. See PR #910 + // design §6 'HLC lease entries — checkpoint at snapshot persist' + // and the round-5 retraction documenting why + // persistCreatedSnapshot alone is insufficient. pebble.Sync is + // forced on the writer side regardless of ELASTICKV_FSM_SYNC_MODE + // (lsm_store.SetDurableAppliedIndex inline comment). + if w, ok := e.fsm.(raftengine.AppliedIndexWriter); ok { + if err := w.SetDurableAppliedIndex(index); err != nil { + return errors.WithStack(err) + } + } + _, err = persistLocalSnapshotPayload(e.storage, e.persist, index, payload) switch { case err == nil: From 2c42f7d6cd2e72014bc2f71e3dab2e93cffdfe7b Mon Sep 17 00:00:00 2001 From: Yoshiaki Ueda Date: Wed, 3 Jun 2026 21:28:07 +0900 Subject: [PATCH 06/11] test(store/raftengine): Branch 2 applied-index round-trip + hook ordering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #910 design B2 test list, step 6 of the implementation series. store/lsm_store_applied_index_test.go (7 tests): - TestLastAppliedIndex_MissingMetaKey — fresh store reports (0, false, nil); strictly-additive fallback to full restore is intact for pre-upgrade fsm.db files. - TestSetDurableAppliedIndex_RoundTrip — round-trips an arbitrary uint64 (0xDEAD_BEEF_CAFE_F00D) through Set + Get. - TestApplyMutationsRaftAt_BundlesMetaAppliedIndex — verifies the leaf bundles metaAppliedIndex when appliedIndex > 0, AND the data mutation also lands (sanity check the bundling did not drop the data path). - TestApplyMutationsRaftAt_ZeroIndexLeavesMetaKey — escape hatch: appliedIndex=0 MUST NOT touch metaAppliedIndex (preserves ApplyMutationsRaft byte-compat for callers not yet wired). - TestDeletePrefixAtRaftAt_BundlesMetaAppliedIndex — analogous round-trip for the DEL_PREFIX leaf, which builds its own pebble.Batch separate from applyMutationsWithOpts. - TestSetDurableAppliedIndex_UsesPebbleSync — sets ELASTICKV_FSM_SYNC_MODE=nosync, writes via the checkpoint, closes + reopens the store, asserts the value persists. Black- box regression guard for the pebble.Sync-unconditionally claim (a NoSync write would not deterministically survive a Close+Open). - TestLastAppliedIndex_CorruptValue — directly writes a 2-byte payload under the meta key and verifies LastAppliedIndex collapses to (0, false, nil) rather than propagating an error. Keeps cold start safe under partial-write corruption. internal/raftengine/etcd/engine_applied_index_test.go (3 tests): - TestRecordingFSM_SatisfiesAppliedIndexWriter — compile-time adjacency check that the recording FSM implements the writer seam. - TestPersistCreatedSnapshot_BumpsAppliedIndex — drives the persistCreatedSnapshot hook directly, asserts the ordered event stream is exactly [bump@42, save@42]. - TestPersistCreatedSnapshot_NilFSMNoOp — legacy FSM (no writer seam) path: SaveSnap still runs, no bump emitted; preserves backward compat with test fakes. - TestPersistCreatedSnapshot_BumpErrorAborts — if SetDurableAppliedIndex returns an error, the engine MUST surface it AND NOT call SaveSnap. Preserves the crash-ordering invariant 'snapshot pointer never durable without metaAppliedIndex first'. go.mod: github.com/coreos/go-semver promoted to direct (was indirect because the test stubs out etcdstorage.Storage.MinimalEtcdVersion which returns *semver.Version). Refs PR #910 (design), B2 step 6 — final step before opening PR. All tests: ok 0.028s (engine) + 1.6s (store). --- go.mod | 2 +- .../etcd/engine_applied_index_test.go | 148 ++++++++++++++ store/lsm_store_applied_index_test.go | 191 ++++++++++++++++++ 3 files changed, 340 insertions(+), 1 deletion(-) create mode 100644 internal/raftengine/etcd/engine_applied_index_test.go create mode 100644 store/lsm_store_applied_index_test.go diff --git a/go.mod b/go.mod index af038cad9..d3287b9dc 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/aws/smithy-go v1.25.1 github.com/cockroachdb/errors v1.13.0 github.com/cockroachdb/pebble/v2 v2.1.5 + github.com/coreos/go-semver v0.3.1 github.com/emirpasic/gods v1.18.1 github.com/getsentry/sentry-go v0.46.2 github.com/goccy/go-json v0.10.6 @@ -58,7 +59,6 @@ require ( github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/swiss v0.0.0-20251224182025-b0f6560f979b // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect - github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect diff --git a/internal/raftengine/etcd/engine_applied_index_test.go b/internal/raftengine/etcd/engine_applied_index_test.go new file mode 100644 index 000000000..089c1dab8 --- /dev/null +++ b/internal/raftengine/etcd/engine_applied_index_test.go @@ -0,0 +1,148 @@ +package etcd + +import ( + "io" + "sync" + "testing" + + "github.com/bootjp/elastickv/internal/raftengine" + "github.com/coreos/go-semver/semver" + "github.com/stretchr/testify/require" + raftpb "go.etcd.io/raft/v3/raftpb" +) + +// applyIndexOrderRecorder is shared between the recording FSM and +// the recording persist storage so the test can assert the +// crash-ordering invariant (SetDurableAppliedIndex MUST run before +// persist.SaveSnap). Both record into a single ordered slice keyed +// by event kind; the test reads it back to verify the sequence. +type applyIndexOrderRecorder struct { + mu sync.Mutex + events []orderEvent +} + +type orderEvent struct { + kind string // "bump" | "save" + index uint64 +} + +func (r *applyIndexOrderRecorder) record(kind string, idx uint64) { + r.mu.Lock() + defer r.mu.Unlock() + r.events = append(r.events, orderEvent{kind: kind, index: idx}) +} + +func (r *applyIndexOrderRecorder) snapshot() []orderEvent { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]orderEvent, len(r.events)) + copy(out, r.events) + return out +} + +// recordingAppliedIndexFSM implements StateMachine + +// raftengine.AppliedIndexWriter. It records every +// SetDurableAppliedIndex call into the shared recorder. +type recordingAppliedIndexFSM struct { + rec *applyIndexOrderRecorder + failNext bool + failErr error +} + +func (f *recordingAppliedIndexFSM) Apply(_ []byte) any { return nil } +func (f *recordingAppliedIndexFSM) Snapshot() (Snapshot, error) { return nil, io.EOF } +func (f *recordingAppliedIndexFSM) Restore(_ io.Reader) error { return nil } + +func (f *recordingAppliedIndexFSM) SetDurableAppliedIndex(idx uint64) error { + if f.failNext { + f.failNext = false + return f.failErr + } + f.rec.record("bump", idx) + return nil +} + +// recordingPersistStorage is a minimal etcdstorage.Storage stand-in +// that records SaveSnap calls into the shared recorder. The hook +// only calls SaveSnap + Release; the rest are stubs. +type recordingPersistStorage struct { + rec *applyIndexOrderRecorder +} + +func (p *recordingPersistStorage) SaveSnap(snap raftpb.Snapshot) error { + p.rec.record("save", snap.Metadata.Index) + return nil +} + +func (p *recordingPersistStorage) Save(_ raftpb.HardState, _ []raftpb.Entry) error { return nil } +func (p *recordingPersistStorage) Release(_ raftpb.Snapshot) error { return nil } +func (p *recordingPersistStorage) Sync() error { return nil } +func (p *recordingPersistStorage) Close() error { return nil } +func (p *recordingPersistStorage) MinimalEtcdVersion() *semver.Version { return nil } + +// TestRecordingFSM_SatisfiesAppliedIndexWriter is a compile-time- +// adjacent assertion: the recording FSM MUST satisfy the writer +// seam so the engine hook actually fires for it. +func TestRecordingFSM_SatisfiesAppliedIndexWriter(t *testing.T) { + rec := &applyIndexOrderRecorder{} + var f any = &recordingAppliedIndexFSM{rec: rec} + _, ok := f.(raftengine.AppliedIndexWriter) + require.True(t, ok, "recordingAppliedIndexFSM must implement raftengine.AppliedIndexWriter") +} + +// TestPersistCreatedSnapshot_BumpsAppliedIndex exercises Site 1 of +// the persist hook. We invoke (*Engine).persistCreatedSnapshot +// directly; the engine MUST call SetDurableAppliedIndex +// (snap.Metadata.Index) BEFORE SaveSnap. +func TestPersistCreatedSnapshot_BumpsAppliedIndex(t *testing.T) { + rec := &applyIndexOrderRecorder{} + fsm := &recordingAppliedIndexFSM{rec: rec} + persist := &recordingPersistStorage{rec: rec} + e := &Engine{fsm: fsm, persist: persist} + + snap := raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 42, Term: 1}} + require.NoError(t, e.persistCreatedSnapshot(snap)) + + require.Equal(t, []orderEvent{ + {kind: "bump", index: 42}, + {kind: "save", index: 42}, + }, rec.snapshot(), + "hook MUST call SetDurableAppliedIndex BEFORE SaveSnap") +} + +// TestPersistCreatedSnapshot_NilFSMNoOp covers the legacy / test- +// fake case: an FSM that does NOT implement AppliedIndexWriter +// silently no-ops; snapshot persist still runs. +func TestPersistCreatedSnapshot_NilFSMNoOp(t *testing.T) { + rec := &applyIndexOrderRecorder{} + persist := &recordingPersistStorage{rec: rec} + // testStateMachine (defined in engine_test.go) is the canonical + // non-AppliedIndexWriter FSM used by other tests in this package. + e := &Engine{fsm: &testStateMachine{}, persist: persist} + + snap := raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 17, Term: 1}} + require.NoError(t, e.persistCreatedSnapshot(snap)) + + require.Equal(t, []orderEvent{ + {kind: "save", index: 17}, + }, rec.snapshot(), + "legacy FSM path: snapshot persist still happens, just without the meta-key bump") +} + +// TestPersistCreatedSnapshot_BumpErrorAborts checks the ordering +// invariant under failure: if SetDurableAppliedIndex returns an +// error, the engine MUST surface it AND NOT call SaveSnap. This +// preserves the (metaAppliedIndex < snapshot pointer impossible) +// crash invariant from PR #910 design §6. +func TestPersistCreatedSnapshot_BumpErrorAborts(t *testing.T) { + rec := &applyIndexOrderRecorder{} + fsm := &recordingAppliedIndexFSM{rec: rec, failNext: true, failErr: io.ErrShortBuffer} + persist := &recordingPersistStorage{rec: rec} + e := &Engine{fsm: fsm, persist: persist} + + snap := raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 99, Term: 1}} + err := e.persistCreatedSnapshot(snap) + require.Error(t, err, "bump failure MUST be surfaced to caller") + require.Empty(t, rec.snapshot(), + "failed bump MUST NOT have recorded; SaveSnap MUST NOT have run") +} diff --git a/store/lsm_store_applied_index_test.go b/store/lsm_store_applied_index_test.go new file mode 100644 index 000000000..9261affa2 --- /dev/null +++ b/store/lsm_store_applied_index_test.go @@ -0,0 +1,191 @@ +package store + +import ( + "context" + "os" + "testing" + + "github.com/cockroachdb/pebble/v2" + "github.com/stretchr/testify/require" +) + +// pebbleStoreApplied is the local narrowed view of pebbleStore that +// the applied-index tests reach for. We keep the type assertion +// behind a helper so renames of the unexported struct don't cascade +// into every test. +func pebbleStoreApplied(t *testing.T, st MVCCStore) *pebbleStore { + t.Helper() + ps, ok := st.(*pebbleStore) + require.True(t, ok, "expected *pebbleStore, got %T", st) + return ps +} + +// newApplyIndexPebbleStore creates a fresh pebbleStore in a temp dir +// for one applied-index test. Caller closes via t.Cleanup. +func newApplyIndexPebbleStore(t *testing.T) MVCCStore { + t.Helper() + dir := t.TempDir() + st, err := NewPebbleStore(dir) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, st.Close()) + require.NoError(t, os.RemoveAll(dir)) + }) + return st +} + +// TestLastAppliedIndex_MissingMetaKey covers the "(0, false, nil)" +// branch — the strictly-additive fallback that lets the cold-start +// skip gate degrade to full restore for fsm.db files that have not +// yet been mutated through a raft-Apply (pre-upgrade fsm.db, freshly +// restored store). +func TestLastAppliedIndex_MissingMetaKey(t *testing.T) { + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + idx, present, err := ps.LastAppliedIndex() + require.NoError(t, err) + require.False(t, present, "fresh store must report (false) for missing meta key") + require.Equal(t, uint64(0), idx, "missing key must surface as 0") +} + +// TestSetDurableAppliedIndex_RoundTrip pins the meta key via the +// snapshot-persist writer seam and verifies LastAppliedIndex returns +// the same value with present=true. Confirms the +// pebble.Sync-unconditionally commit and the little-endian encoding +// round-trip. +func TestSetDurableAppliedIndex_RoundTrip(t *testing.T) { + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + const wantIdx uint64 = 0xDEAD_BEEF_CAFE_F00D + require.NoError(t, ps.SetDurableAppliedIndex(wantIdx)) + + got, present, err := ps.LastAppliedIndex() + require.NoError(t, err) + require.True(t, present, "after SetDurableAppliedIndex the key must be present") + require.Equal(t, wantIdx, got, "round-trip must preserve the full uint64") +} + +// TestApplyMutationsRaftAt_BundlesMetaAppliedIndex verifies that a +// raft-Apply data mutation bundles metaAppliedIndex in the SAME +// pebble.Batch as the data (and metaLastCommitTS). After the apply, +// LastAppliedIndex must reflect the entry index passed in. +func TestApplyMutationsRaftAt_BundlesMetaAppliedIndex(t *testing.T) { + ctx := context.Background() + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + const entryIdx uint64 = 42 + muts := []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("k1"), Value: []byte("v1")}, + {Op: OpTypePut, Key: []byte("k2"), Value: []byte("v2")}, + } + const ts uint64 = 100 + require.NoError(t, ps.ApplyMutationsRaftAt(ctx, muts, nil, ts, ts, entryIdx)) + + got, present, err := ps.LastAppliedIndex() + require.NoError(t, err) + require.True(t, present, "ApplyMutationsRaftAt must persist metaAppliedIndex") + require.Equal(t, entryIdx, got) + + // Confirm the data also landed (sanity check the bundling did not + // drop the data path). + val, err := ps.GetAt(ctx, []byte("k1"), ts) + require.NoError(t, err) + require.Equal(t, []byte("v1"), val) +} + +// TestApplyMutationsRaftAt_ZeroIndexLeavesMetaKey covers the +// appliedIndex==0 escape hatch — callers without a raft entry index +// (test fakes, legacy ApplyMutationsRaft) MUST NOT bump the meta key. +func TestApplyMutationsRaftAt_ZeroIndexLeavesMetaKey(t *testing.T) { + ctx := context.Background() + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + // Seed metaAppliedIndex via SetDurableAppliedIndex. + const seeded uint64 = 7 + require.NoError(t, ps.SetDurableAppliedIndex(seeded)) + + muts := []*KVPairMutation{{Op: OpTypePut, Key: []byte("k1"), Value: []byte("v1")}} + // appliedIndex=0 → leaf must NOT touch metaAppliedIndex. + require.NoError(t, ps.ApplyMutationsRaftAt(ctx, muts, nil, 100, 100, 0)) + + got, present, err := ps.LastAppliedIndex() + require.NoError(t, err) + require.True(t, present) + require.Equal(t, seeded, got, "appliedIndex=0 must leave the meta key at its previous value") +} + +// TestDeletePrefixAtRaftAt_BundlesMetaAppliedIndex is the analogous +// round-trip for the DEL_PREFIX leaf (handleDelPrefix builds its own +// pebble.Batch separate from applyMutationsWithOpts). Without this +// hook, DEL_PREFIX entries silently leave LastAppliedIndex behind. +func TestDeletePrefixAtRaftAt_BundlesMetaAppliedIndex(t *testing.T) { + ctx := context.Background() + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + // Pre-populate so the DEL_PREFIX actually iterates over something. + const seedTS uint64 = 50 + require.NoError(t, ps.ApplyMutations(ctx, []*KVPairMutation{ + {Op: OpTypePut, Key: []byte("p/k1"), Value: []byte("v")}, + {Op: OpTypePut, Key: []byte("p/k2"), Value: []byte("v")}, + }, nil, seedTS, seedTS)) + + const entryIdx uint64 = 99 + require.NoError(t, ps.DeletePrefixAtRaftAt(ctx, []byte("p/"), nil, 200, entryIdx)) + + got, present, err := ps.LastAppliedIndex() + require.NoError(t, err) + require.True(t, present, "DeletePrefixAtRaftAt must persist metaAppliedIndex") + require.Equal(t, entryIdx, got) +} + +// TestSetDurableAppliedIndex_UsesPebbleSync exercises the +// nosync-mode independence claim — even when ELASTICKV_FSM_SYNC_MODE +// is nosync, the checkpoint must use pebble.Sync. We can't directly +// observe the sync option from a black-box test, so we settle for +// "the value persists across a store close + reopen". A NoSync write +// would not necessarily survive (Pebble's WAL flush is opportunistic), +// so a deterministic round-trip after Close()+Open() is a useful +// regression guard for the sync-mode flip. +func TestSetDurableAppliedIndex_UsesPebbleSync(t *testing.T) { + t.Setenv("ELASTICKV_FSM_SYNC_MODE", "nosync") + dir := t.TempDir() + st, err := NewPebbleStore(dir) + require.NoError(t, err) + + ps := pebbleStoreApplied(t, st) + const idx uint64 = 12345 + require.NoError(t, ps.SetDurableAppliedIndex(idx)) + require.NoError(t, st.Close()) + + reopened, err := NewPebbleStore(dir) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, reopened.Close()) }) + + got, present, err := pebbleStoreApplied(t, reopened).LastAppliedIndex() + require.NoError(t, err) + require.True(t, present, "after Close+Open metaAppliedIndex must persist (pebble.Sync forced regardless of nosync)") + require.Equal(t, idx, got) +} + +// TestLastAppliedIndex_CorruptValue exercises the "len(val) < +// timestampSize" fallback — a truncated meta key surfaces as missing +// (NOT as an error) so the caller falls back to full restore rather +// than crashing the cold start. +func TestLastAppliedIndex_CorruptValue(t *testing.T) { + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + // Write a too-short payload directly under the meta key to simulate + // a partial write. Use s.db.Set so we bypass setPebbleUint64InBatch. + require.NoError(t, ps.db.Set(metaAppliedIndexBytes, []byte{0x01, 0x02}, pebble.Sync)) + + got, present, err := ps.LastAppliedIndex() + require.NoError(t, err, "corrupt value must NOT propagate as an error") + require.False(t, present, "truncated meta key must collapse to missing") + require.Equal(t, uint64(0), got) +} From 7001a934f1629cead297cb94fa3fff8efddb5b0b Mon Sep 17 00:00:00 2001 From: Yoshiaki Ueda Date: Thu, 4 Jun 2026 02:24:07 +0900 Subject: [PATCH 07/11] fix(snapshot-skip B2 round-2): Site 2 tests + lint + defensive forwards MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address PR #915 round-1 feedback: 1) claude[bot] BLOCKER — Site 2 (persistLocalSnapshotPayload) crash- ordering test gap: internal/raftengine/etcd/engine_applied_index_test.go gains: - localSnapshotEngine helper seeding etcdraft.MemoryStorage so buildLocalSnapshot's storage.Term(applied) lookup succeeds (without seeding, the free persistLocalSnapshotPayload short- circuits at Term and SaveSnap is never reached). - TestPersistLocalSnapshotPayload_BumpsAppliedIndex — happy path: asserts the recorded event sequence is exactly [bump@123, save@123]. - TestPersistLocalSnapshotPayload_BumpErrorAborts — mirror of Site 1: a failed SetDurableAppliedIndex MUST surface the error AND prevent the free-function from running (so SaveSnap is never called). Preserves the crash invariant for Site 2. These close the only blocker from claude[bot]'s round-1 review. 2) reviewdog golangci-lint findings (3 issues → 0): - lsm_store.go gci: re-aligned the metaAppliedIndex const block; golangci-lint fmt picked it up. - kv/fsm.go wrapcheck on SetDurableAppliedIndex return: wrapped the interface-method return in errors.WithStack so the lint allow-list is honoured. - engine.go cyclop on persistLocalSnapshotPayload (12 > 10): extracted two helpers: * bumpDurableAppliedIndexBeforeSave(index) — shared by BOTH persist sites, deduplicates the writer-seam type assertion and the inline doc-comment (round-1 had it twice). * handleLocalSnapshotPersistResult(err) + purgeAfterLocalSnapshot() — collapses the four-arm error switch into a helper. The three ErrCompacted / ErrUnavailable / ErrSnapOutOfDate branches were already a single logical case so they fold into one switch arm. Cyclop now reports 0 issues; the post-extract function body is also easier to read. 3) Defensive forwards (gemini HIGH false-positive — pushed back in the round-2 PR comment with grep evidence): kv/leader_routed_store.go and kv/shard_store.go gain LastAppliedIndex / SetDurableAppliedIndex methods. Both forwards are currently DEAD CODE for the cold-start skip optimisation — verified by grep on origin/main that the kvFSM holds a *pebbleStore directly in every production path (LeaderRoutedStore is the adapter/server read-routing wrapper; ShardStore is the coordinator-facing fanout wrapper; neither is f.store). They are added defensively so a future refactor that wraps the FSM store does not silently degrade the optimisation to full- restore-always. ShardStore aggregates LastAppliedIndex as MIN over groups so the cold-start skip gate refuses to skip whenever ANY group lags (conservative 'over-restore beats under-restore' rule from design §4). Tests: go vet ./store/ ./kv/ ./internal/raftengine/... clean; go test ./store/ ./kv/ ./internal/raftengine/... -short ok 53s total; new Site 2 tests pass on first run after seeding storage. golangci-lint run ./store/ ./kv/ ./internal/raftengine/etcd/ → 0 issues. Refs PR #915 round-1 (claude[bot] verdict + gemini-code-assist HIGH + reviewdog lint). --- internal/raftengine/etcd/engine.go | 102 ++++++++++-------- .../etcd/engine_applied_index_test.go | 77 +++++++++++++ kv/fsm.go | 2 +- kv/leader_routed_store.go | 49 +++++++++ kv/shard_store.go | 82 ++++++++++++++ store/lsm_store.go | 4 +- 6 files changed, 269 insertions(+), 47 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 6535d268e..e1b684219 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -2676,25 +2676,37 @@ func (e *Engine) createConfigSnapshot(index uint64, confState raftpb.ConfState, } } +// bumpDurableAppliedIndexBeforeSave pins the FSM's durable applied +// index to `index` BEFORE the engine calls persist.SaveSnap, so a +// successful snapshot persist always implies LastAppliedIndex >= +// snap.Metadata.Index — closes the HLC-lease-only / encryption-only +// fallback (PR #910 design §6). +// +// FSMs that do not expose raftengine.AppliedIndexWriter silently +// no-op; the skip optimisation falls back to full restore for them +// (legacy test fakes, in-memory backends). pebble.Sync is forced on +// the writer side regardless of ELASTICKV_FSM_SYNC_MODE — once +// persist.SaveSnap returns, WAL compaction discards every log entry +// at or before snap.Metadata.Index, so there is no source to replay +// the meta key bump from. +// +// Used by BOTH snapshot persist sites: persistCreatedSnapshot (this +// file) and e.persistLocalSnapshotPayload (the steady-state +// SnapshotCount-triggered hot path). +func (e *Engine) bumpDurableAppliedIndexBeforeSave(index uint64) error { + w, ok := e.fsm.(raftengine.AppliedIndexWriter) + if !ok { + return nil + } + return errors.WithStack(w.SetDurableAppliedIndex(index)) +} + func (e *Engine) persistCreatedSnapshot(snap raftpb.Snapshot) error { if etcdraft.IsEmptySnap(snap) || e.persist == nil { return nil } - // Pin metaAppliedIndex to snap.Metadata.Index BEFORE SaveSnap so a - // successful snapshot persist always implies LastAppliedIndex >= - // snap.Metadata.Index — closes the HLC-lease-only / encryption-only - // fallback (PR #910 design §6). FSMs that do not expose - // raftengine.AppliedIndexWriter silently no-op; the skip - // optimisation falls back to full restore for them. pebble.Sync - // is forced on the writer side (see lsm_store.SetDurableAppliedIndex) - // regardless of ELASTICKV_FSM_SYNC_MODE — once SaveSnap returns, - // WAL compaction discards every log entry at or before - // snap.Metadata.Index, so there is no source to replay the meta - // key bump from. - if w, ok := e.fsm.(raftengine.AppliedIndexWriter); ok { - if err := w.SetDurableAppliedIndex(snap.Metadata.Index); err != nil { - return errors.WithStack(err) - } + if err := e.bumpDurableAppliedIndexBeforeSave(snap.Metadata.Index); err != nil { + return err } if err := e.persist.SaveSnap(snap); err != nil { return errors.WithStack(err) @@ -4060,47 +4072,49 @@ func (e *Engine) persistLocalSnapshotPayload(index uint64, payload []byte) error return nil } - // Pin metaAppliedIndex to `index` BEFORE the free-function - // persistLocalSnapshotPayload (which calls persist.SaveSnap at - // wal_store.go:524). This is the steady-state SnapshotCount-triggered - // snapshot path — the hot path the cold-start skip optimisation - // depends on. Without this hook the round-3 P2 fallback (HLC - // leases / encryption ops keep snapshot.Metadata.Index ahead of the - // last data-Apply index forever) recurs permanently. See PR #910 - // design §6 'HLC lease entries — checkpoint at snapshot persist' - // and the round-5 retraction documenting why - // persistCreatedSnapshot alone is insufficient. pebble.Sync is - // forced on the writer side regardless of ELASTICKV_FSM_SYNC_MODE - // (lsm_store.SetDurableAppliedIndex inline comment). - if w, ok := e.fsm.(raftengine.AppliedIndexWriter); ok { - if err := w.SetDurableAppliedIndex(index); err != nil { - return errors.WithStack(err) - } + if err := e.bumpDurableAppliedIndexBeforeSave(index); err != nil { + return err } _, err = persistLocalSnapshotPayload(e.storage, e.persist, index, payload) + return e.handleLocalSnapshotPersistResult(err) +} + +// handleLocalSnapshotPersistResult collapses the post-SaveSnap error +// switch into a single helper so persistLocalSnapshotPayload stays +// under the cyclomatic-complexity budget. The three raft-side +// 'snapshot already moved on' cases (ErrCompacted / ErrUnavailable / +// ErrSnapOutOfDate) are all treated as no-ops; only the success path +// runs the disk-side purge. +func (e *Engine) handleLocalSnapshotPersistResult(err error) error { switch { case err == nil: - snapDir := filepath.Join(e.dataDir, snapDirName) - if purgeErr := purgeOldSnapshotFiles(snapDir, e.fsmSnapDir); purgeErr != nil { - slog.Warn("failed to purge old snap files", "error", purgeErr) - } - walDir := filepath.Join(e.dataDir, walDirName) - if purgeErr := purgeOldWALFiles(walDir, e.walRetention()); purgeErr != nil { - slog.Warn("failed to purge old wal files", "error", purgeErr) - } + e.purgeAfterLocalSnapshot() return nil - case errors.Is(err, etcdraft.ErrCompacted): - return nil - case errors.Is(err, etcdraft.ErrUnavailable): - return nil - case errors.Is(err, etcdraft.ErrSnapOutOfDate): + case errors.Is(err, etcdraft.ErrCompacted), + errors.Is(err, etcdraft.ErrUnavailable), + errors.Is(err, etcdraft.ErrSnapOutOfDate): return nil default: return err } } +// purgeAfterLocalSnapshot runs the disk-side cleanup that follows a +// successful local-snapshot persist: trim old .snap/.fsm files and +// rotate ageing WAL segments. Both calls log on error but do not +// propagate — failing to purge is non-fatal. +func (e *Engine) purgeAfterLocalSnapshot() { + snapDir := filepath.Join(e.dataDir, snapDirName) + if purgeErr := purgeOldSnapshotFiles(snapDir, e.fsmSnapDir); purgeErr != nil { + slog.Warn("failed to purge old snap files", "error", purgeErr) + } + walDir := filepath.Join(e.dataDir, walDirName) + if purgeErr := purgeOldWALFiles(walDir, e.walRetention()); purgeErr != nil { + slog.Warn("failed to purge old wal files", "error", purgeErr) + } +} + func encodeReadContext(id uint64) []byte { out := make([]byte, envelopeHeaderSize) out[0] = readContextVersion diff --git a/internal/raftengine/etcd/engine_applied_index_test.go b/internal/raftengine/etcd/engine_applied_index_test.go index 089c1dab8..44e28d323 100644 --- a/internal/raftengine/etcd/engine_applied_index_test.go +++ b/internal/raftengine/etcd/engine_applied_index_test.go @@ -8,6 +8,7 @@ import ( "github.com/bootjp/elastickv/internal/raftengine" "github.com/coreos/go-semver/semver" "github.com/stretchr/testify/require" + etcdraft "go.etcd.io/raft/v3" raftpb "go.etcd.io/raft/v3/raftpb" ) @@ -146,3 +147,79 @@ func TestPersistCreatedSnapshot_BumpErrorAborts(t *testing.T) { require.Empty(t, rec.snapshot(), "failed bump MUST NOT have recorded; SaveSnap MUST NOT have run") } + +// --- Site 2: persistLocalSnapshotPayload (steady-state hot path) --- +// +// These mirror the Site 1 tests above but exercise the engine's +// SnapshotCount-triggered local-snapshot path. The hook sits inside +// e.persistLocalSnapshotPayload (engine.go:4060), under +// e.snapshotMu.Lock(), BEFORE the free-function persistLocalSnapshotPayload +// (wal_store.go:519) which is what actually calls persist.SaveSnap. + +// localSnapshotEngine constructs an *Engine suitable for testing +// e.persistLocalSnapshotPayload in isolation. We need: +// - e.fsm: implements AppliedIndexWriter so the bump hook fires +// - e.persist: implements etcdstorage.Storage so SaveSnap can run +// (via the free-function persistLocalSnapshotPayload calling it) +// - e.storage: a real *etcdraft.MemoryStorage so e.storage.Snapshot() +// and buildLocalSnapshot return sensible values +// - e.dataDir / e.fsmSnapDir: temp dirs so the post-persist purge +// does not panic on Stat +func localSnapshotEngine(t *testing.T, rec *applyIndexOrderRecorder, fsm *recordingAppliedIndexFSM, applied uint64) *Engine { + t.Helper() + storage := etcdraft.NewMemoryStorage() + // Seed the storage with enough entries so buildLocalSnapshot's + // storage.Term(applied) call succeeds. Without this, the free + // persistLocalSnapshotPayload short-circuits at Term lookup before + // reaching persist.SaveSnap, and the test cannot observe the save. + entries := make([]raftpb.Entry, applied) + for i := uint64(0); i < applied; i++ { + entries[i] = raftpb.Entry{Index: i + 1, Term: 1, Data: []byte{}} + } + require.NoError(t, storage.Append(entries)) + persist := &recordingPersistStorage{rec: rec} + return &Engine{ + fsm: fsm, + persist: persist, + storage: storage, + dataDir: t.TempDir(), + fsmSnapDir: t.TempDir(), + } +} + +// TestPersistLocalSnapshotPayload_BumpsAppliedIndex is the +// happy-path test for Site 2. The engine MUST call +// SetDurableAppliedIndex(index) BEFORE the free-function +// persistLocalSnapshotPayload — which is what eventually invokes +// persist.SaveSnap. +func TestPersistLocalSnapshotPayload_BumpsAppliedIndex(t *testing.T) { + rec := &applyIndexOrderRecorder{} + fsm := &recordingAppliedIndexFSM{rec: rec} + const index uint64 = 123 + e := localSnapshotEngine(t, rec, fsm, index) + + require.NoError(t, e.persistLocalSnapshotPayload(index, []byte("payload-stub"))) + + events := rec.snapshot() + require.GreaterOrEqual(t, len(events), 2, + "both bump and save events MUST be recorded; got %d", len(events)) + require.Equal(t, "bump", events[0].kind, "bump MUST be first") + require.Equal(t, index, events[0].index) + require.Equal(t, "save", events[1].kind, "save MUST be second") + require.Equal(t, index, events[1].index) +} + +// TestPersistLocalSnapshotPayload_BumpErrorAborts mirrors Site 1's +// crash-ordering test for Site 2: a failed SetDurableAppliedIndex +// MUST surface the error AND prevent persist.SaveSnap from running. +func TestPersistLocalSnapshotPayload_BumpErrorAborts(t *testing.T) { + rec := &applyIndexOrderRecorder{} + fsm := &recordingAppliedIndexFSM{rec: rec, failNext: true, failErr: io.ErrShortBuffer} + const index uint64 = 456 + e := localSnapshotEngine(t, rec, fsm, index) + + err := e.persistLocalSnapshotPayload(index, []byte("payload-stub")) + require.Error(t, err, "bump failure MUST be surfaced to caller") + require.Empty(t, rec.snapshot(), + "failed bump MUST NOT have recorded; SaveSnap MUST NOT have run") +} diff --git a/kv/fsm.go b/kv/fsm.go index d51db42e7..bef27cc77 100644 --- a/kv/fsm.go +++ b/kv/fsm.go @@ -160,7 +160,7 @@ func (f *kvFSM) SetDurableAppliedIndex(idx uint64) error { if !ok { return nil } - return w.SetDurableAppliedIndex(idx) + return errors.WithStack(w.SetDurableAppliedIndex(idx)) } type FSM interface { diff --git a/kv/leader_routed_store.go b/kv/leader_routed_store.go index 9c3643abc..cdd4f2f4c 100644 --- a/kv/leader_routed_store.go +++ b/kv/leader_routed_store.go @@ -364,6 +364,55 @@ func (s *LeaderRoutedStore) DeletePrefixAtRaftAt(ctx context.Context, prefix []b return errors.WithStack(s.local.DeletePrefixAtRaftAt(ctx, prefix, excludePrefix, commitTS, appliedIndex)) } +// LastAppliedIndex forwards to the local store when it implements +// raftengine.AppliedIndexReader. Defensive: in production today the +// kvFSM holds a *pebbleStore directly (not a LeaderRoutedStore — that +// wrapper is used by adapter/server code for read routing, not by +// the FSM apply path); so this forward is currently dead code for +// the cold-start skip optimisation. We add it anyway because future +// refactors might wrap the FSM's store, and a silent no-op there +// would degrade the optimisation to full-restore-always with no +// failure signal. +// +// (0, false, nil) returns are the strictly-additive fallback — +// either the wrapper has no local, the local does not implement +// the reader, or the local reports missing/truncated. The caller in +// internal/raftengine/etcd/wal_store.go (Branch 3) treats all of +// these as "fall back to full restore", which is correct. +func (s *LeaderRoutedStore) LastAppliedIndex() (uint64, bool, error) { + if s == nil || s.local == nil { + return 0, false, nil + } + reader, ok := s.local.(interface { + LastAppliedIndex() (uint64, bool, error) + }) + if !ok { + return 0, false, nil + } + idx, present, err := reader.LastAppliedIndex() + if err != nil { + return 0, false, errors.WithStack(err) + } + return idx, present, nil +} + +// SetDurableAppliedIndex forwards to the local store when it +// implements raftengine.AppliedIndexWriter. Symmetric defensive +// no-op when the local store does not expose the writer seam — see +// LastAppliedIndex doc-comment. +func (s *LeaderRoutedStore) SetDurableAppliedIndex(idx uint64) error { + if s == nil || s.local == nil { + return nil + } + writer, ok := s.local.(interface { + SetDurableAppliedIndex(idx uint64) error + }) + if !ok { + return nil + } + return errors.WithStack(writer.SetDurableAppliedIndex(idx)) +} + func (s *LeaderRoutedStore) LastCommitTS() uint64 { if s == nil || s.local == nil { return 0 diff --git a/kv/shard_store.go b/kv/shard_store.go index 85b28db7f..ce1d7de31 100644 --- a/kv/shard_store.go +++ b/kv/shard_store.go @@ -1317,6 +1317,88 @@ func (s *ShardStore) LastCommitTS() uint64 { return max } +// LastAppliedIndex aggregates the durable applied-index across every +// shard group, returning the MIN over all groups that report one. +// +// MIN is the right aggregator because the kvFSM is per-shard in +// production — each shard's FSM independently asks "is MY group's +// applied index at least as fresh as MY group's snapshot?" — and +// ShardStore is NEVER used as the FSM's f.store in production today +// (the FSM holds a *pebbleStore directly; ShardStore is the +// coordinator-facing fanout wrapper). This method exists as a +// defensive forward in case a future refactor uses ShardStore from +// the apply path; reporting MIN guarantees the cold-start skip gate +// would refuse to skip whenever ANY group lags, matching the +// conservative "over-restore beats under-restore" rule (PR #910 +// design §4). +// +// (0, false, nil) when no group reports a value — strictly-additive +// fallback per design §4. +func (s *ShardStore) LastAppliedIndex() (uint64, bool, error) { + var ( + minIdx uint64 + anyReport bool + ) + for _, g := range s.groups { + if g == nil || g.Store == nil { + continue + } + reader, ok := g.Store.(interface { + LastAppliedIndex() (uint64, bool, error) + }) + if !ok { + continue + } + idx, present, err := reader.LastAppliedIndex() + if err != nil { + return 0, false, errors.WithStack(err) + } + if !present { + // One group has no meta key. Conservative: report + // missing so the cold-start skip gate falls back. + return 0, false, nil + } + if !anyReport || idx < minIdx { + minIdx = idx + } + anyReport = true + } + if !anyReport { + return 0, false, nil + } + return minIdx, true, nil +} + +// SetDurableAppliedIndex broadcasts the bump to every group store +// that exposes the writer seam. +// +// This is purely defensive — in production today the FSM holds a +// *pebbleStore directly; ShardStore is never f.store. Were it ever +// wired through the FSM apply path, broadcasting the same idx across +// groups would corrupt their per-group metaAppliedIndex semantics +// (each group has its own raft log with its own entry numbering). +// For that hypothetical, the test convention from +// DeletePrefixAtRaftAt applies: tests MUST pass idx=0 to opt out, or +// not use ShardStore as the writer at all. Returns the first +// per-group error. +func (s *ShardStore) SetDurableAppliedIndex(idx uint64) error { + for _, g := range s.groups { + if g == nil || g.Store == nil { + continue + } + writer, ok := g.Store.(interface { + SetDurableAppliedIndex(idx uint64) error + }) + if !ok { + continue + } + if err := writer.SetDurableAppliedIndex(idx); err != nil { + return errors.WithStack(err) + } + } + return nil +} + // WriteConflictCountsByPrefix aggregates OCC conflict counts across // every shard group owned by this ShardStore. Per-shard counts share // the same "|" label schema, so a simple sum gives diff --git a/store/lsm_store.go b/store/lsm_store.go index 2fe612ee3..c0ede0062 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -47,8 +47,8 @@ const ( // snap.Metadata.Index by SetDurableAppliedIndex at every snapshot // persist site. See // docs/design/2026_06_02_idempotent_snapshot_restore.md §3. - metaAppliedIndex = "_meta_applied_index" - spoolBufSize = 32 * 1024 // buffer size for streaming I/O during restore + metaAppliedIndex = "_meta_applied_index" + spoolBufSize = 32 * 1024 // buffer size for streaming I/O during restore // maxPebbleEncodedKeySize is the limit for encoded Pebble on-disk keys, // which are the user key concatenated with the 8-byte inverted timestamp. From b957e1aa540169c40a445a75809864df9050e001 Mon Sep 17 00:00:00 2001 From: Yoshiaki Ueda Date: Thu, 4 Jun 2026 21:26:08 +0900 Subject: [PATCH 08/11] fix(snapshot-skip B2 round-3): monotonic SetDurableAppliedIndex (codex P2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-2 of PR #915 review surfaced a P2 (codex) on the non-monotonic write that claude[bot] had flagged as a 'design note' in round-1. The race: a background SnapshotCount-triggered persistLocalSnapshot worker calls SetDurableAppliedIndex(snap.Metadata.Index). Concurrently, the raft-apply loop's ApplyMutationsRaftAt has already advanced the per-Apply metaAppliedIndex past that snapshot's index. The unconditional write rewinds the meta key — defeating the soak/ verification invariant and causing the future cold-start skip gate to fall back unnecessarily until another data Apply repairs the key. Round-3 fix: read-modify-write under applyMu serialisation. - SetDurableAppliedIndex now acquires dbMu.RLock THEN applyMu.Lock (same order applyMutationsWithOpts / deletePrefixAtWithOpts use, so no deadlock). - Reads the existing meta key via the new readAppliedIndexLocked helper (shares its decode logic with the unlocked LastAppliedIndex(); same (0, false, nil) semantics for absent / truncated meta keys). - If existing >= idx, no-op. The skip invariant still holds: LastAppliedIndex >= idx >= snap.Metadata.Index, so the snapshot's own claim is satisfied by the larger-already-present value. - Otherwise, batch.Commit(pebble.Sync) as before. Tests (store/lsm_store_applied_index_test.go): - TestSetDurableAppliedIndex_Monotonic — simulates the exact race in single-threaded form: ApplyMutationsRaftAt(applied=250) then SetDurableAppliedIndex(snapIdx=100). Asserts LastAppliedIndex still reads 250 after the older snapshot write. - TestSetDurableAppliedIndex_AdvancesOnlyForward — exercises the forward-only contract: Set(50) -> Set(100, advances) -> Set(75, no-op) -> Set(100, no-op). LastAppliedIndex finishes at 100. Tests: go vet ./store/ + golangci-lint ./store/ clean; go test ./store/ -run 'TestSetDurable...|TestLast...|TestApply...|TestDelete...' ok 0.59s; go test ./store/ ./kv/ ./internal/raftengine/... -short ok ~53s. Refs PR #915 round-2 codex review (P2 'Avoid lowering the durable apply index'). --- store/lsm_store.go | 55 +++++++++++++++++++++++--- store/lsm_store_applied_index_test.go | 57 +++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 6 deletions(-) diff --git a/store/lsm_store.go b/store/lsm_store.go index c0ede0062..0943c7425 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -618,15 +618,37 @@ func (s *pebbleStore) LastAppliedIndex() (uint64, bool, error) { // forever. The +1 fsync per snapshot persist (rare; default // SnapshotCount=10000) is negligible vs that risk. // -// dbMu.RLock is sufficient because we only mutate s.db, not its -// pointer. The raft-apply loop is serial at the engine boundary, so -// no concurrent applyMutationsWithOpts can race this single-key -// write at the Pebble level beyond what last-writer-wins already -// gives us (and last-writer-wins on metaAppliedIndex always -// satisfies have >= want for any want we already passed). +// Monotonicity (round-2 P2 fix for PR #915): when persistLocalSnapshot +// runs in a background worker, raft apply can continue and the per- +// entry ApplyMutationsRaftAt path can advance metaAppliedIndex past +// `idx` before this method runs. An unconditional write would rewind +// the meta key — defeating the soak/verification invariant and +// causing the future skip gate to fall back unnecessarily. The +// applyMu lock serialises with applyMutationsWithOpts / +// deletePrefixAtWithOpts (both hold it across their batch commit), +// and the read-modify-write keeps the meta key strictly monotonic. +// +// Lock order: dbMu.RLock before applyMu.Lock matches the existing +// discipline in applyMutationsWithOpts (lsm_store.go around :1438 / +// :1444). No deadlock. func (s *pebbleStore) SetDurableAppliedIndex(idx uint64) error { s.dbMu.RLock() defer s.dbMu.RUnlock() + s.applyMu.Lock() + defer s.applyMu.Unlock() + + existing, present, err := s.readAppliedIndexLocked() + if err != nil { + return err + } + if present && existing >= idx { + // Concurrent apply already advanced the meta key past `idx`; + // no work needed. The skip-invariant still holds because the + // snapshot's claim (LastAppliedIndex >= snap.Metadata.Index) + // is satisfied by existing >= idx >= snap.Metadata.Index. + return nil + } + batch := s.db.NewBatch() defer func() { _ = batch.Close() }() if err := setPebbleUint64InBatch(batch, metaAppliedIndexBytes, idx); err != nil { @@ -635,6 +657,27 @@ func (s *pebbleStore) SetDurableAppliedIndex(idx uint64) error { return errors.WithStack(batch.Commit(pebble.Sync)) } +// readAppliedIndexLocked decodes the metaAppliedIndex key. Caller +// MUST hold s.dbMu.RLock (so s.db is stable) AND s.applyMu.Lock (so +// the value reflects a consistent snapshot vs concurrent batch +// commits in applyMutationsWithOpts). The body is shared with the +// unlocked LastAppliedIndex() reader; same (0, false, nil) semantics +// for absent / truncated meta keys. +func (s *pebbleStore) readAppliedIndexLocked() (uint64, bool, error) { + val, closer, err := s.db.Get(metaAppliedIndexBytes) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return 0, false, nil + } + return 0, false, errors.WithStack(err) + } + defer func() { _ = closer.Close() }() + if len(val) < timestampSize { + return 0, false, nil + } + return binary.LittleEndian.Uint64(val), true, nil +} + func (s *pebbleStore) saveMinRetainedTS(ts uint64) error { return writePebbleUint64(s.db, metaMinRetainedTSBytes, ts, pebble.NoSync) } diff --git a/store/lsm_store_applied_index_test.go b/store/lsm_store_applied_index_test.go index 9261affa2..96828971f 100644 --- a/store/lsm_store_applied_index_test.go +++ b/store/lsm_store_applied_index_test.go @@ -172,6 +172,63 @@ func TestSetDurableAppliedIndex_UsesPebbleSync(t *testing.T) { require.Equal(t, idx, got) } +// TestSetDurableAppliedIndex_Monotonic guards the round-2 fix for +// codex P2: a background snapshot persist (which is what +// SetDurableAppliedIndex services) MUST NOT rewind metaAppliedIndex +// when a concurrent ApplyMutationsRaftAt has already advanced it +// past the snapshot's index. +// +// The test mirrors the production race in single-threaded form: +// 1. ApplyMutationsRaftAt(... appliedIndex=N+k) — meta key = N+k +// 2. SetDurableAppliedIndex(N) — must NO-OP (N < N+k) +// After step 2, LastAppliedIndex must still report N+k. +func TestSetDurableAppliedIndex_Monotonic(t *testing.T) { + ctx := context.Background() + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + const ( + applied uint64 = 250 // simulated ApplyMutationsRaftAt index + snapIdx uint64 = 100 // simulated older background snapshot persist + ) + require.Less(t, snapIdx, applied, + "test invariant: snapshot index must be older than the applied index for the race to matter") + + // Step 1: ApplyMutationsRaftAt advances metaAppliedIndex to N+k. + muts := []*KVPairMutation{{Op: OpTypePut, Key: []byte("k"), Value: []byte("v")}} + require.NoError(t, ps.ApplyMutationsRaftAt(ctx, muts, nil, 100, 100, applied)) + + // Step 2: background SetDurableAppliedIndex(snapIdx) — the per-Apply + // path has already moved past, so this MUST no-op. + require.NoError(t, ps.SetDurableAppliedIndex(snapIdx)) + + got, present, err := ps.LastAppliedIndex() + require.NoError(t, err) + require.True(t, present) + require.Equal(t, applied, got, + "SetDurableAppliedIndex MUST NOT rewind metaAppliedIndex below the per-Apply value") +} + +// TestSetDurableAppliedIndex_AdvancesOnlyForward complements the +// monotonic test by exercising the inverse case: the snapshot persist +// runs first (no prior apply), then a strictly-greater index lands. +// The forward write MUST take effect. +func TestSetDurableAppliedIndex_AdvancesOnlyForward(t *testing.T) { + st := newApplyIndexPebbleStore(t) + ps := pebbleStoreApplied(t, st) + + require.NoError(t, ps.SetDurableAppliedIndex(50)) + require.NoError(t, ps.SetDurableAppliedIndex(100)) // strictly greater → advances + require.NoError(t, ps.SetDurableAppliedIndex(75)) // strictly less → no-op + require.NoError(t, ps.SetDurableAppliedIndex(100)) // equal → no-op + + got, present, err := ps.LastAppliedIndex() + require.NoError(t, err) + require.True(t, present) + require.Equal(t, uint64(100), got, + "SetDurableAppliedIndex must advance strictly-greater values and no-op on equal/lesser") +} + // TestLastAppliedIndex_CorruptValue exercises the "len(val) < // timestampSize" fallback — a truncated meta key surfaces as missing // (NOT as an error) so the caller falls back to full restore rather From 56848574e42559b7a2f4e8922081f1e3f3d89461 Mon Sep 17 00:00:00 2001 From: Yoshiaki Ueda Date: Thu, 4 Jun 2026 21:44:23 +0900 Subject: [PATCH 09/11] fix(snapshot-skip B2 round-4): claude[bot] nits + gci on test file MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit claude[bot] round-3 verdict was 'Ready to merge' with 4 non-blocking notes. Round-4 closes three of them and addresses the round-3 reviewdog gci finding. The fourth note (LastAppliedIndex duplicates readAppliedIndexLocked body) is pre-existing and explicitly called out by claude[bot] as 'not a round-3 regression'; deferred to a follow-up to keep this round focused. Codex round-3 P2 ('deletePrefixAtWithOpts never takes that lock') is factually wrong — store/lsm_store.go:1536 shows the function DOES acquire s.applyMu.Lock(). The function holds applyMu across its entire batch.Commit, so it serialises against SetDurableAppliedIndex through the same mutex. The race codex describes cannot occur. Push-back posted in the PR comment with grep evidence; no code change for this finding. Round-4 changes: 1) reviewdog gci on store/lsm_store_applied_index_test.go:182. golangci-lint fmt auto-corrected the alignment (whitespace-only diff in the test's comment block). 2) Site 2 test (engine_applied_index_test.go: TestPersistLocalSnapshotPayload_BumpsAppliedIndex) switched from GreaterOrEqual+positional checks to exact slice comparison — matches Site 1's style and catches a spurious third event (e.g. double SaveSnap) that the previous assertion would miss. Closes claude[bot] note #4 and the coderabbit round-2 nit. 3) pebbleStore struct doc-comment lock-ordering table extended to include applyMu between dbMu and mtx (lsm_store.go:170-181). The ordering already matched the code; the comment was stale before round-3 added the SetDurableAppliedIndex serialisation. Closes claude[bot] note #1. Tests: go vet + golangci-lint clean across store/ kv/ internal/raftengine/etcd/; targeted test sweep ok (store 0.82s + etcd 0.40s). Refs PR #915 round-3 claude[bot] verdict (Ready-to-merge with 4 notes) + round-3 codex P2 (rejected as false-positive). --- .../raftengine/etcd/engine_applied_index_test.go | 16 +++++++++------- store/lsm_store.go | 8 +++++++- store/lsm_store_applied_index_test.go | 5 +++-- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/internal/raftengine/etcd/engine_applied_index_test.go b/internal/raftengine/etcd/engine_applied_index_test.go index 44e28d323..437e10f77 100644 --- a/internal/raftengine/etcd/engine_applied_index_test.go +++ b/internal/raftengine/etcd/engine_applied_index_test.go @@ -200,13 +200,15 @@ func TestPersistLocalSnapshotPayload_BumpsAppliedIndex(t *testing.T) { require.NoError(t, e.persistLocalSnapshotPayload(index, []byte("payload-stub"))) - events := rec.snapshot() - require.GreaterOrEqual(t, len(events), 2, - "both bump and save events MUST be recorded; got %d", len(events)) - require.Equal(t, "bump", events[0].kind, "bump MUST be first") - require.Equal(t, index, events[0].index) - require.Equal(t, "save", events[1].kind, "save MUST be second") - require.Equal(t, index, events[1].index) + // Exact slice match (matches Site 1 style + closes claude[bot] + // round-3 note #4 / coderabbit round-2 nit). A spurious third + // event — e.g. an accidental double SaveSnap — would fail this + // assertion; a GreaterOrEqual + positional check would not. + require.Equal(t, []orderEvent{ + {kind: "bump", index: index}, + {kind: "save", index: index}, + }, rec.snapshot(), + "hook MUST call SetDurableAppliedIndex BEFORE SaveSnap exactly once each") } // TestPersistLocalSnapshotPayload_BumpErrorAborts mirrors Site 1's diff --git a/store/lsm_store.go b/store/lsm_store.go index 0943c7425..ffc3de519 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -173,7 +173,13 @@ var metaAppliedIndexBytes = []byte(metaAppliedIndex) // 2. dbMu – guards the s.db pointer; held as a write-lock while the // DB is being swapped (Restore/Close), and as a read-lock by every // operation that accesses s.db. -// 3. mtx – guards the in-memory metadata fields +// 3. applyMu – serialises raft-apply conflict-check → batch-commit so +// concurrent ApplyMutationsRaft/At cannot both pass checkConflicts and +// then both commit. Also held by deletePrefixAtWithOpts and by +// SetDurableAppliedIndex's read-modify-write monotonic guard +// (PR #915 round-3) so the snapshot-persist checkpoint cannot rewind +// metaAppliedIndex below a concurrent per-Apply value. +// 4. mtx – guards the in-memory metadata fields // (lastCommitTS, minRetainedTS, pendingMinRetainedTS). type pebbleStore struct { db *pebble.DB diff --git a/store/lsm_store_applied_index_test.go b/store/lsm_store_applied_index_test.go index 96828971f..ff00ea950 100644 --- a/store/lsm_store_applied_index_test.go +++ b/store/lsm_store_applied_index_test.go @@ -179,8 +179,9 @@ func TestSetDurableAppliedIndex_UsesPebbleSync(t *testing.T) { // past the snapshot's index. // // The test mirrors the production race in single-threaded form: -// 1. ApplyMutationsRaftAt(... appliedIndex=N+k) — meta key = N+k -// 2. SetDurableAppliedIndex(N) — must NO-OP (N < N+k) +// 1. ApplyMutationsRaftAt(... appliedIndex=N+k) — meta key = N+k +// 2. SetDurableAppliedIndex(N) — must NO-OP (N < N+k) +// // After step 2, LastAppliedIndex must still report N+k. func TestSetDurableAppliedIndex_Monotonic(t *testing.T) { ctx := context.Background() From a8e2cf52def077c18c45e827ae45a135d21f864e Mon Sep 17 00:00:00 2001 From: Yoshiaki Ueda Date: Thu, 4 Jun 2026 21:54:22 +0900 Subject: [PATCH 10/11] fix(kv/fsm): kvFSM directly satisfies raftengine.AppliedIndexReader (codex P2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex round-4 P2 on kv/fsm.go:145 caught a real wiring bug: the round-1 method was named AppliedIndexReader() returning the interface (factory pattern), intended to be called through a separate AppliedIndexReporter interface. But that AppliedIndexReporter was never added to the raftengine package, and the planned Branch 3 cold-start skip gate type-asserts fsm.(raftengine.AppliedIndexReader) directly. A factory method with a different signature does NOT satisfy the interface, so the skip would always fall back to full restore even after meta keys are populated on every apply. Fix: rename kvFSM.AppliedIndexReader() to LastAppliedIndex() and inline the type-assert forward. kvFSM now directly satisfies raftengine.AppliedIndexReader. Added kv/fsm_applied_index_iface_check.go with compile-time guards: var _ raftengine.AppliedIndexReader = (*kvFSM)(nil) var _ raftengine.AppliedIndexWriter = (*kvFSM)(nil) so a future rename or signature drift fails at build time rather than silently degrading the skip optimisation to full-restore-always. No production callers of the old AppliedIndexReader() factory exist (verified by grep) — the consumer is the future Branch 3 skip gate which is not yet wired. Safe rename. Tests: go vet + golangci-lint clean across all touched packages; go test ./kv/ ./store/ ./internal/raftengine/... -short ok 56s. Refs PR #915 round-4 codex P2 (kv/fsm.go:145). Codex P2 on engine.go:4077 (snapshot artifact does not contain post-bump metaAppliedIndex) is acknowledged but out of B2 scope — push-back explanation in the PR comment. --- kv/fsm.go | 41 ++++++++++++++++++++--------- kv/fsm_applied_index_iface_check.go | 9 +++++++ 2 files changed, 38 insertions(+), 12 deletions(-) create mode 100644 kv/fsm_applied_index_iface_check.go diff --git a/kv/fsm.go b/kv/fsm.go index bef27cc77..ec7afc69c 100644 --- a/kv/fsm.go +++ b/kv/fsm.go @@ -129,19 +129,36 @@ func (f *kvFSM) SetApplyIndex(idx uint64) { f.pendingApplyIdx = idx } -// AppliedIndexReader implements raftengine.AppliedIndexReporter. It -// exposes the underlying store's durable applied-index when the -// store implements raftengine.AppliedIndexReader (pebbleStore does; -// the in-memory mvccStore does not, in which case (0, false, nil) -// from the missing-key path will land at the caller). nil means -// "not supported on this backend" and triggers the conservative -// full-restore fallback at the cold-start skip gate. See -// docs/design/2026_06_02_idempotent_snapshot_restore.md §3. -func (f *kvFSM) AppliedIndexReader() raftengine.AppliedIndexReader { - if r, ok := f.store.(raftengine.AppliedIndexReader); ok { - return r +// LastAppliedIndex implements raftengine.AppliedIndexReader by +// forwarding to the underlying store when it supports the reader +// seam (pebbleStore does; the in-memory mvccStore does not). +// +// Round-1 of this PR shipped this as a factory method +// (AppliedIndexReader() AppliedIndexReader) intended to be called +// through a separate AppliedIndexReporter interface. Codex P2 on +// round-4 (kv/fsm.go:145) correctly pointed out that the planned +// cold-start skip gate (Branch 3) will type-assert +// fsm.(raftengine.AppliedIndexReader) directly — a factory method +// with a different signature does NOT satisfy the interface, so +// the skip would always fall back even after the meta key is +// populated. Renaming the method to LastAppliedIndex and inlining +// the type-assert forward makes kvFSM directly satisfy +// raftengine.AppliedIndexReader. +// +// (0, false, nil) propagates the strictly-additive fallback when +// the store does not expose the seam — the future skip gate treats +// "missing" as "fall back to full restore." See +// docs/design/2026_06_02_idempotent_snapshot_restore.md §3 / §4. +func (f *kvFSM) LastAppliedIndex() (uint64, bool, error) { + r, ok := f.store.(raftengine.AppliedIndexReader) + if !ok { + return 0, false, nil } - return nil + idx, present, err := r.LastAppliedIndex() + if err != nil { + return 0, false, errors.WithStack(err) + } + return idx, present, nil } // SetDurableAppliedIndex implements raftengine.AppliedIndexWriter by diff --git a/kv/fsm_applied_index_iface_check.go b/kv/fsm_applied_index_iface_check.go new file mode 100644 index 000000000..1447c9198 --- /dev/null +++ b/kv/fsm_applied_index_iface_check.go @@ -0,0 +1,9 @@ +package kv + +import "github.com/bootjp/elastickv/internal/raftengine" + +// Round-4 P2 #2 fix: kvFSM must directly implement +// raftengine.AppliedIndexReader so the cold-start skip gate's +// type-assertion (Branch 3) succeeds. +var _ raftengine.AppliedIndexReader = (*kvFSM)(nil) +var _ raftengine.AppliedIndexWriter = (*kvFSM)(nil) From 39513379c867b2630c950701c6cb5cd5e12016a3 Mon Sep 17 00:00:00 2001 From: Yoshiaki Ueda Date: Thu, 4 Jun 2026 22:02:27 +0900 Subject: [PATCH 11/11] docs(snapshot-skip B2 round-6): claude[bot] round-5 conditional LGTM items MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit claude[bot] round-5 verdict was 'Ready to merge pending design doc update'. Two non-code items: 1) engine.go:2001 TODO sentinel — the round-5 PR comment claimed a 'sentinel comment in engine.applySnapshot' would land, but no such comment was added. Round-6 adds it directly above the openAndRestoreFSMSnapshot call so a future B3 / follow-up implementor can find the planned SetDurableAppliedIndex hook site without re-discovering the rationale. Cites the design doc Non-Goals scope-out and PR #915 round-4/5 codex P2. 2) docs/design/2026_06_02_idempotent_snapshot_restore.md — three stale references to AppliedIndexReporter / factory-method pattern, superseded by PR #915 round-5 (kvFSM directly satisfies raftengine.AppliedIndexReader via LastAppliedIndex). The stale pseudocode would mislead the B3 implementor into adding a reporter shim that was already superseded. Updates: - §3 'kvFSM exposes its store' block rewritten to show direct interface satisfaction + the compile-time guard. - §4 fsmAlreadyAtIndex pseudocode rewritten to use direct type-assert against raftengine.AppliedIndexReader. - Implementation Plan B2 row updated to reflect the actual kvFSM.LastAppliedIndex() shape + the round-5 monotonic applyMu.Lock() RMW guard on SetDurableAppliedIndex. - §8 compatibility list: AppliedIndexReporter removed; replaced by AppliedIndexWriter (the actually-shipped interface) plus a retrospective sentence explaining the round-5 supersession. Three retrospective mentions of AppliedIndexReporter remain (as 'why this is NOT what we use') so future readers understand the design history without re-deriving it. No code semantics change in round-6. Doc + comment-only. Tests: go vet + golangci-lint clean across kv/, store/, internal/raftengine/etcd/. Refs PR #915 round-5 claude[bot] verdict ('Ready to merge pending design doc update'). --- .../2026_06_02_idempotent_snapshot_restore.md | 62 +++++++++++++------ internal/raftengine/etcd/engine.go | 9 +++ 2 files changed, 52 insertions(+), 19 deletions(-) diff --git a/docs/design/2026_06_02_idempotent_snapshot_restore.md b/docs/design/2026_06_02_idempotent_snapshot_restore.md index 06d2405e0..f1d3f1c07 100644 --- a/docs/design/2026_06_02_idempotent_snapshot_restore.md +++ b/docs/design/2026_06_02_idempotent_snapshot_restore.md @@ -236,21 +236,37 @@ type AppliedIndexReader interface { } ``` -`kvFSM` exposes its store through a typed accessor so wal_store.go can -reach it without importing the concrete pebble type: +`kvFSM` directly satisfies `raftengine.AppliedIndexReader` by +forwarding to its underlying store (PR #915 round-5 — round-1 had a +factory `AppliedIndexReader() AppliedIndexReader` method intended to +be called through a separate `AppliedIndexReporter` interface, but +that pattern would require the skip gate to know about the reporter +shim; the direct interface satisfaction is simpler and a compile-time +guard catches future signature drift): ```go -type AppliedIndexReporter interface { - AppliedIndexReader() AppliedIndexReader -} -func (f *kvFSM) AppliedIndexReader() AppliedIndexReader { - if r, ok := f.store.(AppliedIndexReader); ok { - return r +// kv/fsm.go +func (f *kvFSM) LastAppliedIndex() (uint64, bool, error) { + r, ok := f.store.(raftengine.AppliedIndexReader) + if !ok { + return 0, false, nil } - return nil + idx, present, err := r.LastAppliedIndex() + if err != nil { + return 0, false, errors.WithStack(err) + } + return idx, present, nil } + +// kv/fsm_applied_index_iface_check.go (compile-time guard) +var _ raftengine.AppliedIndexReader = (*kvFSM)(nil) +var _ raftengine.AppliedIndexWriter = (*kvFSM)(nil) ``` +The compile-time guard means any future rename or signature drift +fails `go build` immediately — the soak investment is protected at +the compiler level. + ### 4. Conditional restore (with conservative error fallback) ```go @@ -279,16 +295,20 @@ func restoreSnapshotState(fsm StateMachine, snapshot raftpb.Snapshot, fsmSnapDir // fsmAlreadyAtIndex returns true ONLY when we can prove the FSM is // already at or past `want`. Any uncertainty -- FSM doesn't expose -// the reporter, store doesn't expose the reader, read error, or -// missing meta key -- returns false so we fall back to the full -// restore. A stale-but-incorrect skip is far worse than a wasteful -// full restore; the fallback errs strictly toward restoring. +// the reader interface, read error, or missing meta key -- returns +// false so we fall back to the full restore. A stale-but-incorrect +// skip is far worse than a wasteful full restore; the fallback errs +// strictly toward restoring. +// +// Direct type-assert against raftengine.AppliedIndexReader (PR #915 +// round-5): kvFSM satisfies the interface directly via its +// LastAppliedIndex method, so no separate AppliedIndexReporter shim +// is needed. The compile-time guard in kv/fsm_applied_index_iface_check.go +// keeps this stable. func fsmAlreadyAtIndex(fsm StateMachine, want uint64) bool { - reporter, ok := fsm.(AppliedIndexReporter) + r, ok := fsm.(raftengine.AppliedIndexReader) if !ok { return false } - reader := reporter.AppliedIndexReader() - if reader == nil { return false } - have, present, err := reader.LastAppliedIndex() + have, present, err := r.LastAppliedIndex() if err != nil || !present { return false } return have >= want } @@ -814,8 +834,12 @@ present. - `ApplyIndexAware` is **already** in `main`; this design only adds consumers. - The new opt-in interfaces (`AppliedIndexReader`, - `AppliedIndexReporter`, `SnapshotHeaderApplier`) are additive. + `AppliedIndexWriter`, `SnapshotHeaderApplier`) are additive. FSMs that don't implement them fall back to the current behaviour. + (Round-1 / round-2 of this doc mentioned an `AppliedIndexReporter` + factory-method shim; PR #915 round-5 superseded it by having + `kvFSM` satisfy `AppliedIndexReader` directly via its + `LastAppliedIndex` method.) - `metaAppliedIndexBytes` is new. Older fsm.db files don't have it. The `present=false` branch makes the first restart after upgrade fall back to full restore, populating the meta key from the next @@ -863,7 +887,7 @@ restoreSnapshotState skipped (FSM at index %d, snapshot at %d, ceiling=%d, cutov | Branch | Content | Behaviour change | |---|---|---| | **B1** (this PR) | Design doc | None | -| **B2** | `ApplyMutationsRaftAt` / `DeletePrefixAtRaftAt` overloads + meta-key bundling in both leaves + `pebbleStore.LastAppliedIndex()` (under `dbMu.RLock()`) + `pebbleStore.SetDurableAppliedIndex()` (under `dbMu.RLock()`, **`pebble.Sync` unconditionally**) + `kvFSM.AppliedIndexReader()` accessor + `kvFSM.SetDurableAppliedIndex` forwarding + thread `f.pendingApplyIdx` into the data-Apply leaves + BOTH `persistCreatedSnapshot` (`engine.go:2679`) AND `e.persistLocalSnapshotPayload` (`engine.go:4032`, the SnapshotCount-triggered hot path) call `SetDurableAppliedIndex` BEFORE the corresponding `persist.SaveSnap` | Meta key starts being written on every data Apply AND at every snapshot persist (both config-snapshot and steady-state local-snapshot paths). Skip is still disabled. Soak in production for one release. | +| **B2** | `ApplyMutationsRaftAt` / `DeletePrefixAtRaftAt` overloads + meta-key bundling in both leaves + `pebbleStore.LastAppliedIndex()` (under `dbMu.RLock()`) + `pebbleStore.SetDurableAppliedIndex()` (under `dbMu.RLock()` + `applyMu.Lock()` RMW monotonic guard, **`pebble.Sync` unconditionally**) + `kvFSM.LastAppliedIndex()` directly satisfies `raftengine.AppliedIndexReader` (compile-time guard in `kv/fsm_applied_index_iface_check.go`) + `kvFSM.SetDurableAppliedIndex` forwarding + thread `f.pendingApplyIdx` into the data-Apply leaves + BOTH `persistCreatedSnapshot` (`engine.go:2679`) AND `e.persistLocalSnapshotPayload` (`engine.go:4032`, the SnapshotCount-triggered hot path) call `SetDurableAppliedIndex` BEFORE the corresponding `persist.SaveSnap` | Meta key starts being written on every data Apply AND at every snapshot persist (both config-snapshot and steady-state local-snapshot paths). Skip is still disabled. Soak in production for one release. | | **B3** | `restoreSnapshotState` skip gate + `applyHeaderStateOnSkip(snapPath, tok.CRC32C)` orchestrating size + footer-vs-tokenCRC + full-body-CRC verification using `internal/raftengine/etcd`'s existing helpers (matching `openAndRestoreFSMSnapshot`'s safety contract) + two-phase `SnapshotHeaderApplier` seam on `kvFSM` (`ParseSnapshotHeader(r io.Reader) (ceiling, cutover, err)` + pure `ApplySnapshotHeader(ceiling, cutover)`) + metrics + INFO log | **User-visible cold-start win.** | | **B4** | Lower `HEALTH_TIMEOUT_SECONDS` default once production data shows steady-state skip rate ≥ 90 % | Tighter ceiling; the env override remains honoured. | diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index e1b684219..f8a32dd93 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -1998,6 +1998,15 @@ func (e *Engine) applyReadySnapshot(snapshot raftpb.Snapshot) error { if err != nil { return errors.Wrapf(err, "decode snapshot token index=%d", snapshot.Metadata.Index) } + // B3/follow-up: also call SetDurableAppliedIndex(tok.Index) here + // after Restore so peer-after-InstallSnapshot populates the meta + // key. The local-snapshot persist path already bumps the live + // store (engine.persistLocalSnapshotPayload), but the receiving + // node's restored store inherits the pre-bump value embedded in + // the snapshot artifact. Design Non-Goals § + // docs/design/2026_06_02_idempotent_snapshot_restore.md:71-74 + // scopes this out of Branch 2; see PR #915 round-4/5 codex P2 on + // engine.go:4077 for the rationale. if err := openAndRestoreFSMSnapshot(e.fsm, fsmSnapPath(e.fsmSnapDir, tok.Index), tok.CRC32C); err != nil { return errors.Wrapf(err, "restore fsm snapshot file index=%d crc=%08x", tok.Index, tok.CRC32C) }