Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 43 additions & 19 deletions docs/design/2026_06_02_idempotent_snapshot_restore.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,21 +236,37 @@ type AppliedIndexReader interface {
}
```

`kvFSM` exposes its store through a typed accessor so wal_store.go can
reach it without importing the concrete pebble type:
`kvFSM` directly satisfies `raftengine.AppliedIndexReader` by
forwarding to its underlying store (PR #915 round-5 — round-1 had a
factory `AppliedIndexReader() AppliedIndexReader` method intended to
be called through a separate `AppliedIndexReporter` interface, but
that pattern would require the skip gate to know about the reporter
shim; the direct interface satisfaction is simpler and a compile-time
guard catches future signature drift):

```go
type AppliedIndexReporter interface {
AppliedIndexReader() AppliedIndexReader
}
func (f *kvFSM) AppliedIndexReader() AppliedIndexReader {
if r, ok := f.store.(AppliedIndexReader); ok {
return r
// kv/fsm.go
func (f *kvFSM) LastAppliedIndex() (uint64, bool, error) {
r, ok := f.store.(raftengine.AppliedIndexReader)
if !ok {
return 0, false, nil
}
return nil
idx, present, err := r.LastAppliedIndex()
if err != nil {
return 0, false, errors.WithStack(err)
}
return idx, present, nil
}

// kv/fsm_applied_index_iface_check.go (compile-time guard)
var _ raftengine.AppliedIndexReader = (*kvFSM)(nil)
var _ raftengine.AppliedIndexWriter = (*kvFSM)(nil)
```

The compile-time guard means any future rename or signature drift
fails `go build` immediately — the soak investment is protected at
the compiler level.

### 4. Conditional restore (with conservative error fallback)

```go
Expand Down Expand Up @@ -279,16 +295,20 @@ func restoreSnapshotState(fsm StateMachine, snapshot raftpb.Snapshot, fsmSnapDir

// fsmAlreadyAtIndex returns true ONLY when we can prove the FSM is
// already at or past `want`. Any uncertainty -- FSM doesn't expose
// the reporter, store doesn't expose the reader, read error, or
// missing meta key -- returns false so we fall back to the full
// restore. A stale-but-incorrect skip is far worse than a wasteful
// full restore; the fallback errs strictly toward restoring.
// the reader interface, read error, or missing meta key -- returns
// false so we fall back to the full restore. A stale-but-incorrect
// skip is far worse than a wasteful full restore; the fallback errs
// strictly toward restoring.
//
// Direct type-assert against raftengine.AppliedIndexReader (PR #915
// round-5): kvFSM satisfies the interface directly via its
// LastAppliedIndex method, so no separate AppliedIndexReporter shim
// is needed. The compile-time guard in kv/fsm_applied_index_iface_check.go
// keeps this stable.
func fsmAlreadyAtIndex(fsm StateMachine, want uint64) bool {
reporter, ok := fsm.(AppliedIndexReporter)
r, ok := fsm.(raftengine.AppliedIndexReader)
if !ok { return false }
reader := reporter.AppliedIndexReader()
if reader == nil { return false }
have, present, err := reader.LastAppliedIndex()
have, present, err := r.LastAppliedIndex()
if err != nil || !present { return false }
return have >= want
}
Expand Down Expand Up @@ -814,8 +834,12 @@ present.
- `ApplyIndexAware` is **already** in `main`; this design only adds
consumers.
- The new opt-in interfaces (`AppliedIndexReader`,
`AppliedIndexReporter`, `SnapshotHeaderApplier`) are additive.
`AppliedIndexWriter`, `SnapshotHeaderApplier`) are additive.
FSMs that don't implement them fall back to the current behaviour.
(Round-1 / round-2 of this doc mentioned an `AppliedIndexReporter`
factory-method shim; PR #915 round-5 superseded it by having
`kvFSM` satisfy `AppliedIndexReader` directly via its
`LastAppliedIndex` method.)
- `metaAppliedIndexBytes` is new. Older fsm.db files don't have it.
The `present=false` branch makes the first restart after upgrade
fall back to full restore, populating the meta key from the next
Expand Down Expand Up @@ -863,7 +887,7 @@ restoreSnapshotState skipped (FSM at index %d, snapshot at %d, ceiling=%d, cutov
| Branch | Content | Behaviour change |
|---|---|---|
| **B1** (this PR) | Design doc | None |
| **B2** | `ApplyMutationsRaftAt` / `DeletePrefixAtRaftAt` overloads + meta-key bundling in both leaves + `pebbleStore.LastAppliedIndex()` (under `dbMu.RLock()`) + `pebbleStore.SetDurableAppliedIndex()` (under `dbMu.RLock()`, **`pebble.Sync` unconditionally**) + `kvFSM.AppliedIndexReader()` accessor + `kvFSM.SetDurableAppliedIndex` forwarding + thread `f.pendingApplyIdx` into the data-Apply leaves + BOTH `persistCreatedSnapshot` (`engine.go:2679`) AND `e.persistLocalSnapshotPayload` (`engine.go:4032`, the SnapshotCount-triggered hot path) call `SetDurableAppliedIndex` BEFORE the corresponding `persist.SaveSnap` | Meta key starts being written on every data Apply AND at every snapshot persist (both config-snapshot and steady-state local-snapshot paths). Skip is still disabled. Soak in production for one release. |
| **B2** | `ApplyMutationsRaftAt` / `DeletePrefixAtRaftAt` overloads + meta-key bundling in both leaves + `pebbleStore.LastAppliedIndex()` (under `dbMu.RLock()`) + `pebbleStore.SetDurableAppliedIndex()` (under `dbMu.RLock()` + `applyMu.Lock()` RMW monotonic guard, **`pebble.Sync` unconditionally**) + `kvFSM.LastAppliedIndex()` directly satisfies `raftengine.AppliedIndexReader` (compile-time guard in `kv/fsm_applied_index_iface_check.go`) + `kvFSM.SetDurableAppliedIndex` forwarding + thread `f.pendingApplyIdx` into the data-Apply leaves + BOTH `persistCreatedSnapshot` (`engine.go:2679`) AND `e.persistLocalSnapshotPayload` (`engine.go:4032`, the SnapshotCount-triggered hot path) call `SetDurableAppliedIndex` BEFORE the corresponding `persist.SaveSnap` | Meta key starts being written on every data Apply AND at every snapshot persist (both config-snapshot and steady-state local-snapshot paths). Skip is still disabled. Soak in production for one release. |
| **B3** | `restoreSnapshotState` skip gate + `applyHeaderStateOnSkip(snapPath, tok.CRC32C)` orchestrating size + footer-vs-tokenCRC + full-body-CRC verification using `internal/raftengine/etcd`'s existing helpers (matching `openAndRestoreFSMSnapshot`'s safety contract) + two-phase `SnapshotHeaderApplier` seam on `kvFSM` (`ParseSnapshotHeader(r io.Reader) (ceiling, cutover, err)` + pure `ApplySnapshotHeader(ceiling, cutover)`) + metrics + INFO log | **User-visible cold-start win.** |
| **B4** | Lower `HEALTH_TIMEOUT_SECONDS` default once production data shows steady-state skip rate ≥ 90 % | Tighter ceiling; the env override remains honoured. |

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/aws/smithy-go v1.25.1
github.com/cockroachdb/errors v1.13.0
github.com/cockroachdb/pebble/v2 v2.1.5
github.com/coreos/go-semver v0.3.1
github.com/emirpasic/gods v1.18.1
github.com/getsentry/sentry-go v0.46.2
github.com/goccy/go-json v0.10.6
Expand Down Expand Up @@ -58,7 +59,6 @@ require (
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/swiss v0.0.0-20251224182025-b0f6560f979b // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
Expand Down
83 changes: 70 additions & 13 deletions internal/raftengine/etcd/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1998,6 +1998,15 @@ func (e *Engine) applyReadySnapshot(snapshot raftpb.Snapshot) error {
if err != nil {
return errors.Wrapf(err, "decode snapshot token index=%d", snapshot.Metadata.Index)
}
// B3/follow-up: also call SetDurableAppliedIndex(tok.Index) here
// after Restore so peer-after-InstallSnapshot populates the meta
// key. The local-snapshot persist path already bumps the live
// store (engine.persistLocalSnapshotPayload), but the receiving
// node's restored store inherits the pre-bump value embedded in
// the snapshot artifact. Design Non-Goals §
// docs/design/2026_06_02_idempotent_snapshot_restore.md:71-74
// scopes this out of Branch 2; see PR #915 round-4/5 codex P2 on
// engine.go:4077 for the rationale.
if err := openAndRestoreFSMSnapshot(e.fsm, fsmSnapPath(e.fsmSnapDir, tok.Index), tok.CRC32C); err != nil {
return errors.Wrapf(err, "restore fsm snapshot file index=%d crc=%08x", tok.Index, tok.CRC32C)
}
Expand Down Expand Up @@ -2676,10 +2685,38 @@ func (e *Engine) createConfigSnapshot(index uint64, confState raftpb.ConfState,
}
}

// bumpDurableAppliedIndexBeforeSave pins the FSM's durable applied
// index to `index` BEFORE the engine calls persist.SaveSnap, so a
// successful snapshot persist always implies LastAppliedIndex >=
// snap.Metadata.Index — closes the HLC-lease-only / encryption-only
// fallback (PR #910 design §6).
//
// FSMs that do not expose raftengine.AppliedIndexWriter silently
// no-op; the skip optimisation falls back to full restore for them
// (legacy test fakes, in-memory backends). pebble.Sync is forced on
// the writer side regardless of ELASTICKV_FSM_SYNC_MODE — once
// persist.SaveSnap returns, WAL compaction discards every log entry
// at or before snap.Metadata.Index, so there is no source to replay
// the meta key bump from.
//
// Used by BOTH snapshot persist sites: persistCreatedSnapshot (this
// file) and e.persistLocalSnapshotPayload (the steady-state
// SnapshotCount-triggered hot path).
func (e *Engine) bumpDurableAppliedIndexBeforeSave(index uint64) error {
w, ok := e.fsm.(raftengine.AppliedIndexWriter)
if !ok {
return nil
}
return errors.WithStack(w.SetDurableAppliedIndex(index))
}

func (e *Engine) persistCreatedSnapshot(snap raftpb.Snapshot) error {
if etcdraft.IsEmptySnap(snap) || e.persist == nil {
return nil
}
if err := e.bumpDurableAppliedIndexBeforeSave(snap.Metadata.Index); err != nil {
return err
}
Comment on lines +2717 to +2719
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include config snapshot checkpoint in payload

When the config-snapshot path reaches this hook, persistConfigSnapshotPayloadLocked has already obtained payload from snapshotPayload, which writes/closes the .fsm file (or serializes the legacy bytes) before calling persistCreatedSnapshot. Bumping _meta_applied_index here only updates the live store, so a peer or restart that restores this saved config snapshot gets the older/missing meta key while the raft snapshot is at snap.Metadata.Index; after WAL entries up to that index are released, the advertised LastAppliedIndex >= snapshot index invariant does not hold for the snapshot artifact. This is separate from the steady-state snapshot site: the same ordering occurs through persistConfigSnapshotPayloadLocked -> snapshotPayload -> persistCreatedSnapshot.

Useful? React with 👍 / 👎.

if err := e.persist.SaveSnap(snap); err != nil {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -4044,29 +4081,49 @@ func (e *Engine) persistLocalSnapshotPayload(index uint64, payload []byte) error
return nil
}

if err := e.bumpDurableAppliedIndexBeforeSave(index); err != nil {
return err
}
Comment on lines +4084 to +4086
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include the snapshot checkpoint in the payload

In the steady-state snapshot path, the FSM snapshot has already been serialized in persistLocalSnapshot before this bump runs, so the .fsm file / legacy payload that gets saved with the raft snapshot does not contain the _meta_applied_index value just written here. If that snapshot is later restored on a peer or after restart, especially after HLC/encryption-only entries since the last data apply, the restored store still reports the older or missing applied index and the snapshot-skip invariant promised by this checkpoint does not hold for the snapshot artifact.

Useful? React with 👍 / 👎.


_, err = persistLocalSnapshotPayload(e.storage, e.persist, index, payload)
return e.handleLocalSnapshotPersistResult(err)
}

// handleLocalSnapshotPersistResult collapses the post-SaveSnap error
// switch into a single helper so persistLocalSnapshotPayload stays
// under the cyclomatic-complexity budget. The three raft-side
// 'snapshot already moved on' cases (ErrCompacted / ErrUnavailable /
// ErrSnapOutOfDate) are all treated as no-ops; only the success path
// runs the disk-side purge.
func (e *Engine) handleLocalSnapshotPersistResult(err error) error {
switch {
case err == nil:
snapDir := filepath.Join(e.dataDir, snapDirName)
if purgeErr := purgeOldSnapshotFiles(snapDir, e.fsmSnapDir); purgeErr != nil {
slog.Warn("failed to purge old snap files", "error", purgeErr)
}
walDir := filepath.Join(e.dataDir, walDirName)
if purgeErr := purgeOldWALFiles(walDir, e.walRetention()); purgeErr != nil {
slog.Warn("failed to purge old wal files", "error", purgeErr)
}
return nil
case errors.Is(err, etcdraft.ErrCompacted):
return nil
case errors.Is(err, etcdraft.ErrUnavailable):
e.purgeAfterLocalSnapshot()
return nil
case errors.Is(err, etcdraft.ErrSnapOutOfDate):
case errors.Is(err, etcdraft.ErrCompacted),
errors.Is(err, etcdraft.ErrUnavailable),
errors.Is(err, etcdraft.ErrSnapOutOfDate):
return nil
default:
return err
}
}

// purgeAfterLocalSnapshot runs the disk-side cleanup that follows a
// successful local-snapshot persist: trim old .snap/.fsm files and
// rotate ageing WAL segments. Both calls log on error but do not
// propagate — failing to purge is non-fatal.
func (e *Engine) purgeAfterLocalSnapshot() {
snapDir := filepath.Join(e.dataDir, snapDirName)
if purgeErr := purgeOldSnapshotFiles(snapDir, e.fsmSnapDir); purgeErr != nil {
slog.Warn("failed to purge old snap files", "error", purgeErr)
}
walDir := filepath.Join(e.dataDir, walDirName)
if purgeErr := purgeOldWALFiles(walDir, e.walRetention()); purgeErr != nil {
slog.Warn("failed to purge old wal files", "error", purgeErr)
}
}

func encodeReadContext(id uint64) []byte {
out := make([]byte, envelopeHeaderSize)
out[0] = readContextVersion
Expand Down
Loading
Loading