-
Notifications
You must be signed in to change notification settings - Fork 2
feat(snapshot-skip B3): cold-start restoreSnapshotState skip gate + CRC verifier + metrics #934
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8312f4b
024eb43
933dab8
6ada77c
ce277e6
320d06e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| package raftengine | ||
|
|
||
| // ColdStartObserver receives cold-start snapshot-restore lifecycle | ||
| // events from restoreSnapshotState. Implementations live in the | ||
| // monitoring package and wire to Prometheus counters/gauges; the | ||
| // engine receives a value through OpenConfig and treats nil as | ||
| // "no metrics emitted" (preserves the byte-for-byte cold-start | ||
| // behaviour for tests and callers that do not wire monitoring). | ||
| // | ||
| // Three outcomes match the design's strictly-additive policy | ||
| // (docs/design/2026_06_02_idempotent_snapshot_restore.md §9): | ||
| // | ||
| // - RestoreSkipped: the gate fired. `gap = haveAppliedIndex - | ||
| // snapshot.Metadata.Index` (how far ahead the live store was). | ||
| // This is the user-visible perf win. | ||
| // | ||
| // - RestoreExecuted: the gate did NOT fire because the live store | ||
| // was genuinely stale (haveAppliedIndex < snapshot.Metadata.Index). | ||
| // `gap = snapshot.Metadata.Index - haveAppliedIndex` (the work | ||
| // the full restore re-did). | ||
| // | ||
| // - RestoreFallback: the strictly-additive fallback path — the | ||
| // FSM did not expose AppliedIndexReader, LastAppliedIndex | ||
| // reported the meta key missing, or it returned an error. The | ||
| // full restore runs but the skip was never even attempted. | ||
| // `reason` carries a stable short label so Prometheus can | ||
| // surface why the optimisation could not engage: | ||
| // | ||
| // not_reader — FSM does not implement AppliedIndexReader | ||
| // missing_meta — meta key absent (pre-upgrade fsm.db) | ||
| // read_err — LastAppliedIndex returned an error | ||
| // | ||
| // Implementations MUST NOT block; the engine calls these on the | ||
| // cold-start critical path. Treat all label/string arguments as | ||
| // untrusted enum values from the engine's enumeration above. | ||
| type ColdStartObserver interface { | ||
| RestoreSkipped(snapIndex, haveAppliedIndex uint64) | ||
| RestoreExecuted(snapIndex, haveAppliedIndex uint64) | ||
| RestoreFallback(snapIndex uint64, reason string) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,10 +2,12 @@ package etcd | |
|
|
||
| import ( | ||
| "bytes" | ||
| "hash/crc32" | ||
| "io" | ||
| "os" | ||
| "path/filepath" | ||
|
|
||
| "github.com/bootjp/elastickv/internal/raftengine" | ||
| "github.com/cockroachdb/errors" | ||
| "go.etcd.io/etcd/server/v3/etcdserver/api/snap" | ||
| etcdstorage "go.etcd.io/etcd/server/v3/storage" | ||
|
|
@@ -38,7 +40,7 @@ func openDiskState(cfg OpenConfig, peers []Peer) (*diskState, error) { | |
| } | ||
|
|
||
| if wal.Exist(walDir) { | ||
| return loadWalState(logger, walDir, snapDir, fsmSnapDir, cfg.StateMachine) | ||
| return loadWalState(logger, walDir, snapDir, fsmSnapDir, cfg.StateMachine, cfg.ColdStartObserver) | ||
| } | ||
|
|
||
| legacy, legacyErr := loadLegacyOrSplitState(cfg.DataDir) | ||
|
|
@@ -114,7 +116,7 @@ func bootstrapWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, f | |
| return persistBootState(logger, walDir, snapDir, fsmSnapDir, fsm, boot) | ||
| } | ||
|
|
||
| func loadWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fsm StateMachine) (*diskState, error) { | ||
| func loadWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fsm StateMachine, obs raftengine.ColdStartObserver) (*diskState, error) { | ||
| // Scope the repair retry tightly to WAL-only reads: both | ||
| // loadPersistedSnapshot (scans WAL via wal.ValidSnapshotEntries) | ||
| // and openAndReadWAL's ReadAll can surface io.ErrUnexpectedEOF | ||
|
|
@@ -130,7 +132,7 @@ func loadWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fsm St | |
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if err := restoreSnapshotState(fsm, snapshot, fsmSnapDir); err != nil { | ||
| if err := restoreSnapshotState(fsm, snapshot, fsmSnapDir, obs, logger); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
|
|
@@ -243,7 +245,7 @@ func loadPersistedSnapshot(logger *zap.Logger, walDir string, snapshotter *snap. | |
| } | ||
| } | ||
|
|
||
| func restoreSnapshotState(fsm StateMachine, snapshot raftpb.Snapshot, fsmSnapDir string) error { | ||
| func restoreSnapshotState(fsm StateMachine, snapshot raftpb.Snapshot, fsmSnapDir string, obs raftengine.ColdStartObserver, logger *zap.Logger) error { | ||
| if etcdraft.IsEmptySnap(snapshot) || len(snapshot.Data) == 0 || fsm == nil { | ||
| return nil | ||
| } | ||
|
|
@@ -252,12 +254,219 @@ func restoreSnapshotState(fsm StateMachine, snapshot raftpb.Snapshot, fsmSnapDir | |
| if err != nil { | ||
| return err | ||
| } | ||
| // Branch 3 (PR #910 design §4): skip the multi-GiB body | ||
| // restore when the on-disk FSM is already at least as fresh | ||
| // as the snapshot pointer. The skip path still consumes the | ||
| // v1/v2 header so the HLC ceiling + Stage 8a cutover are | ||
| // preserved (PR #910 §5), and runs the same three-step CRC | ||
| // verification (size + footer-vs-tokenCRC + full-body-CRC) | ||
| // as openAndRestoreFSMSnapshot before mutating any FSM state. | ||
| decision, have := decideSkipOutcome(fsm, tok.Index) | ||
| reportColdStart(obs, logger, decision, tok.Index, have) | ||
| if decision == coldStartSkip { | ||
| return applyHeaderStateOnSkip(fsm, fsmSnapPath(fsmSnapDir, tok.Index), tok.CRC32C) | ||
|
Comment on lines
+264
to
+267
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the FSM has applied entries after the latest persisted snapshot (the normal interval between snapshots, since Useful? React with 👍 / 👎. |
||
| } | ||
| return openAndRestoreFSMSnapshot(fsm, fsmSnapPath(fsmSnapDir, tok.Index), tok.CRC32C) | ||
| } | ||
| // Legacy format: full FSM payload embedded in snapshot.Data. | ||
| return errors.WithStack(fsm.Restore(bytes.NewReader(snapshot.Data))) | ||
| } | ||
|
|
||
| // coldStartDecision enumerates the three outcomes the skip gate | ||
| // distinguishes. Used together with ColdStartObserver labels to | ||
| // keep the metrics + log emitter centralised. | ||
| type coldStartDecision int | ||
|
|
||
| const ( | ||
| coldStartSkip coldStartDecision = iota | ||
| coldStartExecute | ||
| coldStartFallbackNotReader | ||
| coldStartFallbackMissingMeta | ||
| coldStartFallbackReadErr | ||
| ) | ||
|
|
||
| func (d coldStartDecision) fallbackReason() string { | ||
| switch d { //nolint:exhaustive // skip / execute return "" via default | ||
| case coldStartFallbackNotReader: | ||
| return "not_reader" | ||
| case coldStartFallbackMissingMeta: | ||
| return "missing_meta" | ||
| case coldStartFallbackReadErr: | ||
| return "read_err" | ||
| default: | ||
| return "" | ||
| } | ||
| } | ||
|
|
||
| // decideSkipOutcome reads the FSM's durable applied index and | ||
| // classifies into one of the five outcomes. Returns (decision, | ||
| // haveIndex). haveIndex is meaningful only for skip / execute | ||
| // outcomes; the three fallback outcomes leave it at 0 because the | ||
| // store could not authoritatively report a value. | ||
| func decideSkipOutcome(fsm StateMachine, want uint64) (coldStartDecision, uint64) { | ||
| r, ok := fsm.(raftengine.AppliedIndexReader) | ||
| if !ok { | ||
| return coldStartFallbackNotReader, 0 | ||
| } | ||
| have, present, err := r.LastAppliedIndex() | ||
| switch { | ||
| case err != nil: | ||
| return coldStartFallbackReadErr, 0 | ||
| case !present: | ||
| return coldStartFallbackMissingMeta, 0 | ||
| case have < want: | ||
| return coldStartExecute, have | ||
| default: | ||
| return coldStartSkip, have | ||
| } | ||
| } | ||
|
|
||
| // reportColdStart dispatches the outcome to the observer + the | ||
| // engine logger. nil observer / nil logger no-op individually. | ||
| func reportColdStart(obs raftengine.ColdStartObserver, logger *zap.Logger, d coldStartDecision, snapIndex, have uint64) { | ||
| switch d { //nolint:exhaustive // default groups the three fallback variants | ||
| case coldStartSkip: | ||
| if obs != nil { | ||
| obs.RestoreSkipped(snapIndex, have) | ||
| } | ||
| if logger != nil { | ||
| logger.Info("restoreSnapshotState skipped", | ||
| zap.Uint64("fsm_applied", have), | ||
| zap.Uint64("snapshot_index", snapIndex), | ||
| zap.Uint64("gap_ahead", have-snapIndex), | ||
| ) | ||
| } | ||
| case coldStartExecute: | ||
| if obs != nil { | ||
| obs.RestoreExecuted(snapIndex, have) | ||
| } | ||
| if logger != nil { | ||
| logger.Info("restoreSnapshotState executed (FSM behind snapshot)", | ||
| zap.Uint64("fsm_applied", have), | ||
| zap.Uint64("snapshot_index", snapIndex), | ||
| zap.Uint64("gap_behind", snapIndex-have), | ||
| ) | ||
| } | ||
| default: | ||
| // Fallback variants: the strictly-additive policy. We could | ||
| // not even attempt the skip; the full restore runs. | ||
| reason := d.fallbackReason() | ||
| if obs != nil { | ||
| obs.RestoreFallback(snapIndex, reason) | ||
| } | ||
| if logger != nil { | ||
| logger.Info("restoreSnapshotState fallback to full restore", | ||
| zap.Uint64("snapshot_index", snapIndex), | ||
| zap.String("reason", reason), | ||
| ) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // applyHeaderStateOnSkip mirrors openAndRestoreFSMSnapshot's safety | ||
| // contract (size + footer-vs-tokenCRC + full-body-CRC) but applies | ||
| // only the header side-effects (HLC ceiling + Stage 8a cutover) | ||
| // instead of running the body restore. The body bytes are read for | ||
| // CRC coverage but discarded -- fsm.db already holds equivalent | ||
| // state, which is precisely the reason we're skipping the restore. | ||
| // | ||
| // FSMs that do not implement raftengine.SnapshotHeaderApplier | ||
| // silently no-op the apply phase -- the FSM has no header state to | ||
| // carry forward, and the CRC verification still runs (with no | ||
| // observable side-effect on success). On any verification failure | ||
| // the typed error propagates and FSM state stays untouched. | ||
| // | ||
| // See PR #910 design §5 round-7 (two-phase seam) + round-6 | ||
| // (three-step CRC mirroring openAndRestoreFSMSnapshot). | ||
| func applyHeaderStateOnSkip(fsm StateMachine, snapPath string, tokenCRC uint32) error { | ||
| file, err := os.Open(snapPath) | ||
| if err != nil { | ||
| return statFSMFileError(err) | ||
| } | ||
| defer func() { _ = file.Close() }() | ||
|
|
||
| info, err := file.Stat() | ||
| if err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
| footer, err := verifyFSMSnapshotPrefix(file, info.Size(), snapPath, tokenCRC) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Step 3: full-body CRC. Wrap the payload in a crc32 TeeReader | ||
| // and hand it to the FSM's ParseSnapshotHeader for header parse | ||
| // + drain. Every payload byte flows through h, matching | ||
| // restoreAndComputeCRC's boundary in openAndRestoreFSMSnapshot. | ||
| if _, err := file.Seek(0, io.SeekStart); err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
| payloadSize := info.Size() - fsmFooterSize | ||
| h := crc32.New(crc32cTable) | ||
| tee := io.TeeReader(io.LimitReader(file, payloadSize), h) | ||
|
|
||
| setter, hasSetter := fsm.(raftengine.SnapshotHeaderApplier) | ||
| ceiling, cutover, err := readSnapshotHeaderOrDrain(setter, hasSetter, tee) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if h.Sum32() != footer { | ||
| return errors.Wrapf(ErrFSMSnapshotFileCRC, | ||
| "path=%s footer=%08x computed=%08x", snapPath, footer, h.Sum32()) | ||
| } | ||
|
|
||
| // All three checks passed; apply side-effects (pure assignment | ||
| // in the FSM). Skipped silently when the FSM does not expose | ||
| // the seam. | ||
| if hasSetter { | ||
| setter.ApplySnapshotHeader(ceiling, cutover) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // verifyFSMSnapshotPrefix runs the first two cheap checks of | ||
| // openAndRestoreFSMSnapshot's three-step contract: size and | ||
| // footer-vs-tokenCRC. Returns the on-disk footer value (caller | ||
| // reuses it for the step-3 full-body CRC compare). Typed errors | ||
| // surface unchanged. | ||
| func verifyFSMSnapshotPrefix(file *os.File, fileSize int64, snapPath string, tokenCRC uint32) (uint32, error) { | ||
| if fileSize < fsmMinFileSize { | ||
| return 0, errors.Wrapf(ErrFSMSnapshotTooSmall, | ||
| "file too small: %d bytes (minimum %d)", fileSize, fsmMinFileSize) | ||
| } | ||
| footer, err := readFSMFooter(file, fileSize) | ||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| if footer != tokenCRC { | ||
| return 0, errors.Wrapf(ErrFSMSnapshotTokenCRC, | ||
| "path=%s footer=%08x token=%08x", snapPath, footer, tokenCRC) | ||
| } | ||
| return footer, nil | ||
| } | ||
|
|
||
| // readSnapshotHeaderOrDrain branches on whether the FSM exposes the | ||
| // SnapshotHeaderApplier seam: when present, delegate to | ||
| // ParseSnapshotHeader (which parses the header AND drains the rest); | ||
| // otherwise drain the entire payload through the tee'd reader so the | ||
| // CRC pass covers every byte. The (ceiling, cutover) tuple is zero | ||
| // in the no-seam case -- the caller's ApplySnapshotHeader branch | ||
| // short-circuits on hasSetter, so the zero values are inert. | ||
| func readSnapshotHeaderOrDrain(setter raftengine.SnapshotHeaderApplier, hasSetter bool, tee io.Reader) (uint64, uint64, error) { | ||
| if hasSetter { | ||
| ceiling, cutover, err := setter.ParseSnapshotHeader(tee) | ||
| if err != nil { | ||
| return 0, 0, errors.WithStack(err) | ||
| } | ||
| return ceiling, cutover, nil | ||
| } | ||
| if _, err := io.Copy(io.Discard, tee); err != nil { | ||
| return 0, 0, errors.WithStack(err) | ||
| } | ||
| return 0, 0, nil | ||
| } | ||
|
|
||
| func walSnapshotFor(snapshot raftpb.Snapshot) walpb.Snapshot { | ||
| return walpb.Snapshot{ | ||
| Index: snapshot.Metadata.Index, | ||
|
|
@@ -310,7 +519,12 @@ func persistBootState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fs | |
| return nil, err | ||
| } | ||
| if wal.Exist(walDir) { | ||
| return loadWalState(logger, walDir, snapDir, fsmSnapDir, fsm) | ||
| // Recursive load after bootstrap-style setup: no observer | ||
| // needed because the engine has not handed one to us | ||
| // (bootstrap path runs before OpenConfig wiring reaches | ||
| // this point) and a fresh-bootstrap restore will be a no-op | ||
| // anyway (the WAL was just created). | ||
| return loadWalState(logger, walDir, snapDir, fsmSnapDir, fsm, nil) | ||
| } | ||
|
|
||
| w, err := wal.Create(logger, walDir, nil) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the production startup path this observer is never populated:
main.gocreatesmonitoring.NewRegistry(...), but engines are opened throughinternal/raftengine/etcd.Factory.Create, and that method buildsOpenConfigwithout aColdStartObserverfield becauseraftengine.FactoryConfighas no way to carry it. As a resultloadWalStatealways receivesobs == nilin normal server runs, so the new cold-start counters/gauges remain empty even when skips/executions/fallbacks happen. Please threadRegistry.ColdStartObserver()through the factory config/open call.Useful? React with 👍 / 👎.