-
Notifications
You must be signed in to change notification settings - Fork 2
feat(snapshot-skip B2): plumb metaAppliedIndex through raft-Apply + both snapshot persist sites #915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(snapshot-skip B2): plumb metaAppliedIndex through raft-Apply + both snapshot persist sites #915
Changes from all commits
2339a6f
525fc15
aa9b8ac
7cd72bd
f1e8748
2c42f7d
7001a93
b957e1a
5684857
a8e2cf5
3951337
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1998,6 +1998,15 @@ func (e *Engine) applyReadySnapshot(snapshot raftpb.Snapshot) error { | |
| if err != nil { | ||
| return errors.Wrapf(err, "decode snapshot token index=%d", snapshot.Metadata.Index) | ||
| } | ||
| // B3/follow-up: also call SetDurableAppliedIndex(tok.Index) here | ||
| // after Restore so peer-after-InstallSnapshot populates the meta | ||
| // key. The local-snapshot persist path already bumps the live | ||
| // store (engine.persistLocalSnapshotPayload), but the receiving | ||
| // node's restored store inherits the pre-bump value embedded in | ||
| // the snapshot artifact. Design Non-Goals § | ||
| // docs/design/2026_06_02_idempotent_snapshot_restore.md:71-74 | ||
| // scopes this out of Branch 2; see PR #915 round-4/5 codex P2 on | ||
| // engine.go:4077 for the rationale. | ||
| if err := openAndRestoreFSMSnapshot(e.fsm, fsmSnapPath(e.fsmSnapDir, tok.Index), tok.CRC32C); err != nil { | ||
| return errors.Wrapf(err, "restore fsm snapshot file index=%d crc=%08x", tok.Index, tok.CRC32C) | ||
| } | ||
|
|
@@ -2676,10 +2685,38 @@ func (e *Engine) createConfigSnapshot(index uint64, confState raftpb.ConfState, | |
| } | ||
| } | ||
|
|
||
| // bumpDurableAppliedIndexBeforeSave pins the FSM's durable applied | ||
| // index to `index` BEFORE the engine calls persist.SaveSnap, so a | ||
| // successful snapshot persist always implies LastAppliedIndex >= | ||
| // snap.Metadata.Index — closes the HLC-lease-only / encryption-only | ||
| // fallback (PR #910 design §6). | ||
| // | ||
| // FSMs that do not expose raftengine.AppliedIndexWriter silently | ||
| // no-op; the skip optimisation falls back to full restore for them | ||
| // (legacy test fakes, in-memory backends). pebble.Sync is forced on | ||
| // the writer side regardless of ELASTICKV_FSM_SYNC_MODE — once | ||
| // persist.SaveSnap returns, WAL compaction discards every log entry | ||
| // at or before snap.Metadata.Index, so there is no source to replay | ||
| // the meta key bump from. | ||
| // | ||
| // Used by BOTH snapshot persist sites: persistCreatedSnapshot (this | ||
| // file) and e.persistLocalSnapshotPayload (the steady-state | ||
| // SnapshotCount-triggered hot path). | ||
| func (e *Engine) bumpDurableAppliedIndexBeforeSave(index uint64) error { | ||
| w, ok := e.fsm.(raftengine.AppliedIndexWriter) | ||
| if !ok { | ||
| return nil | ||
| } | ||
| return errors.WithStack(w.SetDurableAppliedIndex(index)) | ||
| } | ||
|
|
||
| func (e *Engine) persistCreatedSnapshot(snap raftpb.Snapshot) error { | ||
| if etcdraft.IsEmptySnap(snap) || e.persist == nil { | ||
| return nil | ||
| } | ||
| if err := e.bumpDurableAppliedIndexBeforeSave(snap.Metadata.Index); err != nil { | ||
| return err | ||
| } | ||
| if err := e.persist.SaveSnap(snap); err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
|
|
@@ -4044,29 +4081,49 @@ func (e *Engine) persistLocalSnapshotPayload(index uint64, payload []byte) error | |
| return nil | ||
| } | ||
|
|
||
| if err := e.bumpDurableAppliedIndexBeforeSave(index); err != nil { | ||
| return err | ||
| } | ||
|
Comment on lines
+4084
to
+4086
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In the steady-state snapshot path, the FSM snapshot has already been serialized in Useful? React with 👍 / 👎. |
||
|
|
||
| _, err = persistLocalSnapshotPayload(e.storage, e.persist, index, payload) | ||
| return e.handleLocalSnapshotPersistResult(err) | ||
| } | ||
|
|
||
| // handleLocalSnapshotPersistResult collapses the post-SaveSnap error | ||
| // switch into a single helper so persistLocalSnapshotPayload stays | ||
| // under the cyclomatic-complexity budget. The three raft-side | ||
| // 'snapshot already moved on' cases (ErrCompacted / ErrUnavailable / | ||
| // ErrSnapOutOfDate) are all treated as no-ops; only the success path | ||
| // runs the disk-side purge. | ||
| func (e *Engine) handleLocalSnapshotPersistResult(err error) error { | ||
| switch { | ||
| case err == nil: | ||
| snapDir := filepath.Join(e.dataDir, snapDirName) | ||
| if purgeErr := purgeOldSnapshotFiles(snapDir, e.fsmSnapDir); purgeErr != nil { | ||
| slog.Warn("failed to purge old snap files", "error", purgeErr) | ||
| } | ||
| walDir := filepath.Join(e.dataDir, walDirName) | ||
| if purgeErr := purgeOldWALFiles(walDir, e.walRetention()); purgeErr != nil { | ||
| slog.Warn("failed to purge old wal files", "error", purgeErr) | ||
| } | ||
| return nil | ||
| case errors.Is(err, etcdraft.ErrCompacted): | ||
| return nil | ||
| case errors.Is(err, etcdraft.ErrUnavailable): | ||
| e.purgeAfterLocalSnapshot() | ||
| return nil | ||
| case errors.Is(err, etcdraft.ErrSnapOutOfDate): | ||
| case errors.Is(err, etcdraft.ErrCompacted), | ||
| errors.Is(err, etcdraft.ErrUnavailable), | ||
| errors.Is(err, etcdraft.ErrSnapOutOfDate): | ||
| return nil | ||
| default: | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| // purgeAfterLocalSnapshot runs the disk-side cleanup that follows a | ||
| // successful local-snapshot persist: trim old .snap/.fsm files and | ||
| // rotate ageing WAL segments. Both calls log on error but do not | ||
| // propagate — failing to purge is non-fatal. | ||
| func (e *Engine) purgeAfterLocalSnapshot() { | ||
| snapDir := filepath.Join(e.dataDir, snapDirName) | ||
| if purgeErr := purgeOldSnapshotFiles(snapDir, e.fsmSnapDir); purgeErr != nil { | ||
| slog.Warn("failed to purge old snap files", "error", purgeErr) | ||
| } | ||
| walDir := filepath.Join(e.dataDir, walDirName) | ||
| if purgeErr := purgeOldWALFiles(walDir, e.walRetention()); purgeErr != nil { | ||
| slog.Warn("failed to purge old wal files", "error", purgeErr) | ||
| } | ||
| } | ||
|
|
||
| func encodeReadContext(id uint64) []byte { | ||
| out := make([]byte, envelopeHeaderSize) | ||
| out[0] = readContextVersion | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the config-snapshot path reaches this hook,
persistConfigSnapshotPayloadLockedhas already obtainedpayloadfromsnapshotPayload, which writes/closes the.fsmfile (or serializes the legacy bytes) before callingpersistCreatedSnapshot. Bumping_meta_applied_indexhere only updates the live store, so a peer or restart that restores this saved config snapshot gets the older/missing meta key while the raft snapshot is atsnap.Metadata.Index; after WAL entries up to that index are released, the advertisedLastAppliedIndex >= snapshot indexinvariant does not hold for the snapshot artifact. This is separate from the steady-state snapshot site: the same ordering occurs throughpersistConfigSnapshotPayloadLocked -> snapshotPayload -> persistCreatedSnapshot.Useful? React with 👍 / 👎.