Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions internal/raftengine/cold_start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package raftengine

// ColdStartObserver receives cold-start snapshot-restore lifecycle
// events from restoreSnapshotState. Implementations live in the
// monitoring package and wire to Prometheus counters/gauges; the
// engine receives a value through OpenConfig and treats nil as
// "no metrics emitted" (preserves the byte-for-byte cold-start
// behaviour for tests and callers that do not wire monitoring).
//
// Three outcomes match the design's strictly-additive policy
// (docs/design/2026_06_02_idempotent_snapshot_restore.md §9):
//
// - RestoreSkipped: the gate fired. `gap = haveAppliedIndex -
// snapshot.Metadata.Index` (how far ahead the live store was).
// This is the user-visible perf win.
//
// - RestoreExecuted: the gate did NOT fire because the live store
// was genuinely stale (haveAppliedIndex < snapshot.Metadata.Index).
// `gap = snapshot.Metadata.Index - haveAppliedIndex` (the work
// the full restore re-did).
//
// - RestoreFallback: the strictly-additive fallback path — the
// FSM did not expose AppliedIndexReader, LastAppliedIndex
// reported the meta key missing, or it returned an error. The
// full restore runs but the skip was never even attempted.
// `reason` carries a stable short label so Prometheus can
// surface why the optimisation could not engage:
//
// not_reader — FSM does not implement AppliedIndexReader
// missing_meta — meta key absent (pre-upgrade fsm.db)
// read_err — LastAppliedIndex returned an error
//
// Implementations MUST NOT block; the engine calls these on the
// cold-start critical path. Treat all label/string arguments as
// untrusted enum values from the engine's enumeration above.
type ColdStartObserver interface {
RestoreSkipped(snapIndex, haveAppliedIndex uint64)
RestoreExecuted(snapIndex, haveAppliedIndex uint64)
RestoreFallback(snapIndex uint64, reason string)
}
6 changes: 6 additions & 0 deletions internal/raftengine/etcd/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ type OpenConfig struct {
// has been observed yet, equivalent to "raft envelope hook
// off".
RaftCutoverIndex RaftCutoverIndex
// ColdStartObserver receives the cold-start snapshot-restore
// skip-gate lifecycle events (skipped / executed / fallback).
// nil disables metrics; the skip itself still runs. See
// docs/design/2026_06_02_idempotent_snapshot_restore.md §9 and
// internal/raftengine/cold_start.go for the contract.
ColdStartObserver raftengine.ColdStartObserver
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Plumb the cold-start observer through the factory

In the production startup path this observer is never populated: main.go creates monitoring.NewRegistry(...), but engines are opened through internal/raftengine/etcd.Factory.Create, and that method builds OpenConfig without a ColdStartObserver field because raftengine.FactoryConfig has no way to carry it. As a result loadWalState always receives obs == nil in normal server runs, so the new cold-start counters/gauges remain empty even when skips/executions/fallbacks happen. Please thread Registry.ColdStartObserver() through the factory config/open call.

Useful? React with 👍 / 👎.

}

type Engine struct {
Expand Down
35 changes: 22 additions & 13 deletions internal/raftengine/etcd/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ type FactoryConfig struct {
ElectionTick int
MaxSizePerMsg uint64
MaxInflightMsg int
// ColdStartObserver receives cold-start snapshot-restore
// lifecycle events for the Branch 3 skip gate
// (raftengine.ColdStartObserver). The Factory threads it into
// every OpenConfig it builds; nil disables metrics emission
// (the skip itself still runs). Wire via main.go from
// monitoring.Registry.ColdStartObserver(). See PR #934 round-1
// codex P2 for the plumbing rationale.
ColdStartObserver raftengine.ColdStartObserver
}

// Factory creates etcd raft engine instances.
Expand Down Expand Up @@ -44,19 +52,20 @@ func (f *Factory) Create(cfg raftengine.FactoryConfig) (*raftengine.FactoryResul
}

engine, err := Open(context.Background(), OpenConfig{
LocalID: cfg.LocalID,
LocalAddress: cfg.LocalAddress,
DataDir: cfg.DataDir,
Peers: peers,
Bootstrap: cfg.Bootstrap,
JoinAsLearner: cfg.JoinAsLearner,
Transport: transport,
StateMachine: cfg.StateMachine,
TickInterval: f.cfg.TickInterval,
HeartbeatTick: f.cfg.HeartbeatTick,
ElectionTick: f.cfg.ElectionTick,
MaxSizePerMsg: f.cfg.MaxSizePerMsg,
MaxInflightMsg: f.cfg.MaxInflightMsg,
LocalID: cfg.LocalID,
LocalAddress: cfg.LocalAddress,
DataDir: cfg.DataDir,
Peers: peers,
Bootstrap: cfg.Bootstrap,
JoinAsLearner: cfg.JoinAsLearner,
Transport: transport,
StateMachine: cfg.StateMachine,
TickInterval: f.cfg.TickInterval,
HeartbeatTick: f.cfg.HeartbeatTick,
ElectionTick: f.cfg.ElectionTick,
MaxSizePerMsg: f.cfg.MaxSizePerMsg,
MaxInflightMsg: f.cfg.MaxInflightMsg,
ColdStartObserver: f.cfg.ColdStartObserver,
})
if err != nil {
var closeErr error
Expand Down
2 changes: 1 addition & 1 deletion internal/raftengine/etcd/grpc_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestReceiveSnapshotStream_StreamingTokenWhenFSMSnapDirSet(t *testing.T) {
// materializing the payload as []byte. Verify the receiver FSM ends up
// with exactly the entries the sender serialized.
receiverFSM := &testStateMachine{}
require.NoError(t, restoreSnapshotState(receiverFSM, *msg.Snapshot, fsmSnapDir))
require.NoError(t, restoreSnapshotState(receiverFSM, *msg.Snapshot, fsmSnapDir, nil, nil))
require.Equal(t, senderFSM.Applied(), receiverFSM.Applied())
}

Expand Down
224 changes: 219 additions & 5 deletions internal/raftengine/etcd/wal_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package etcd

import (
"bytes"
"hash/crc32"
"io"
"os"
"path/filepath"

"github.com/bootjp/elastickv/internal/raftengine"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
etcdstorage "go.etcd.io/etcd/server/v3/storage"
Expand Down Expand Up @@ -38,7 +40,7 @@ func openDiskState(cfg OpenConfig, peers []Peer) (*diskState, error) {
}

if wal.Exist(walDir) {
return loadWalState(logger, walDir, snapDir, fsmSnapDir, cfg.StateMachine)
return loadWalState(logger, walDir, snapDir, fsmSnapDir, cfg.StateMachine, cfg.ColdStartObserver)
}

legacy, legacyErr := loadLegacyOrSplitState(cfg.DataDir)
Expand Down Expand Up @@ -114,7 +116,7 @@ func bootstrapWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, f
return persistBootState(logger, walDir, snapDir, fsmSnapDir, fsm, boot)
}

func loadWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fsm StateMachine) (*diskState, error) {
func loadWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fsm StateMachine, obs raftengine.ColdStartObserver) (*diskState, error) {
// Scope the repair retry tightly to WAL-only reads: both
// loadPersistedSnapshot (scans WAL via wal.ValidSnapshotEntries)
// and openAndReadWAL's ReadAll can surface io.ErrUnexpectedEOF
Expand All @@ -130,7 +132,7 @@ func loadWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fsm St
if err != nil {
return nil, err
}
if err := restoreSnapshotState(fsm, snapshot, fsmSnapDir); err != nil {
if err := restoreSnapshotState(fsm, snapshot, fsmSnapDir, obs, logger); err != nil {
return nil, err
}

Expand Down Expand Up @@ -243,7 +245,7 @@ func loadPersistedSnapshot(logger *zap.Logger, walDir string, snapshotter *snap.
}
}

func restoreSnapshotState(fsm StateMachine, snapshot raftpb.Snapshot, fsmSnapDir string) error {
func restoreSnapshotState(fsm StateMachine, snapshot raftpb.Snapshot, fsmSnapDir string, obs raftengine.ColdStartObserver, logger *zap.Logger) error {
if etcdraft.IsEmptySnap(snapshot) || len(snapshot.Data) == 0 || fsm == nil {
return nil
}
Expand All @@ -252,12 +254,219 @@ func restoreSnapshotState(fsm StateMachine, snapshot raftpb.Snapshot, fsmSnapDir
if err != nil {
return err
}
// Branch 3 (PR #910 design §4): skip the multi-GiB body
// restore when the on-disk FSM is already at least as fresh
// as the snapshot pointer. The skip path still consumes the
// v1/v2 header so the HLC ceiling + Stage 8a cutover are
// preserved (PR #910 §5), and runs the same three-step CRC
// verification (size + footer-vs-tokenCRC + full-body-CRC)
// as openAndRestoreFSMSnapshot before mutating any FSM state.
decision, have := decideSkipOutcome(fsm, tok.Index)
reportColdStart(obs, logger, decision, tok.Index, have)
if decision == coldStartSkip {
return applyHeaderStateOnSkip(fsm, fsmSnapPath(fsmSnapDir, tok.Index), tok.CRC32C)
Comment on lines +264 to +267
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Prevent skipping when WAL will replay newer entries

When the FSM has applied entries after the latest persisted snapshot (the normal interval between snapshots, since metaAppliedIndex advances on each Apply), LastAppliedIndex() > tok.Index takes this skip path even though Open still initializes the raft applied floor from LocalSnap.Metadata.Index and openAndReadWAL reloads the post-snapshot entries. Those entries are then replayed into a Pebble store that already contains them; transaction replays can hit OCC conflicts before kvFSM observes their commit timestamps, leaving the restarted node’s HLC below timestamps already present on disk. The skip needs to advance/suppress replay through have, or only skip when there are no post-snapshot WAL entries to replay.

Useful? React with 👍 / 👎.

}
return openAndRestoreFSMSnapshot(fsm, fsmSnapPath(fsmSnapDir, tok.Index), tok.CRC32C)
}
// Legacy format: full FSM payload embedded in snapshot.Data.
return errors.WithStack(fsm.Restore(bytes.NewReader(snapshot.Data)))
}

// coldStartDecision enumerates the three outcomes the skip gate
// distinguishes. Used together with ColdStartObserver labels to
// keep the metrics + log emitter centralised.
type coldStartDecision int

const (
coldStartSkip coldStartDecision = iota
coldStartExecute
coldStartFallbackNotReader
coldStartFallbackMissingMeta
coldStartFallbackReadErr
)

func (d coldStartDecision) fallbackReason() string {
switch d { //nolint:exhaustive // skip / execute return "" via default
case coldStartFallbackNotReader:
return "not_reader"
case coldStartFallbackMissingMeta:
return "missing_meta"
case coldStartFallbackReadErr:
return "read_err"
default:
return ""
}
}

// decideSkipOutcome reads the FSM's durable applied index and
// classifies into one of the five outcomes. Returns (decision,
// haveIndex). haveIndex is meaningful only for skip / execute
// outcomes; the three fallback outcomes leave it at 0 because the
// store could not authoritatively report a value.
func decideSkipOutcome(fsm StateMachine, want uint64) (coldStartDecision, uint64) {
r, ok := fsm.(raftengine.AppliedIndexReader)
if !ok {
return coldStartFallbackNotReader, 0
}
have, present, err := r.LastAppliedIndex()
switch {
case err != nil:
return coldStartFallbackReadErr, 0
case !present:
return coldStartFallbackMissingMeta, 0
case have < want:
return coldStartExecute, have
default:
return coldStartSkip, have
}
}

// reportColdStart dispatches the outcome to the observer + the
// engine logger. nil observer / nil logger no-op individually.
func reportColdStart(obs raftengine.ColdStartObserver, logger *zap.Logger, d coldStartDecision, snapIndex, have uint64) {
switch d { //nolint:exhaustive // default groups the three fallback variants
case coldStartSkip:
if obs != nil {
obs.RestoreSkipped(snapIndex, have)
}
if logger != nil {
logger.Info("restoreSnapshotState skipped",
zap.Uint64("fsm_applied", have),
zap.Uint64("snapshot_index", snapIndex),
zap.Uint64("gap_ahead", have-snapIndex),
)
}
case coldStartExecute:
if obs != nil {
obs.RestoreExecuted(snapIndex, have)
}
if logger != nil {
logger.Info("restoreSnapshotState executed (FSM behind snapshot)",
zap.Uint64("fsm_applied", have),
zap.Uint64("snapshot_index", snapIndex),
zap.Uint64("gap_behind", snapIndex-have),
)
}
default:
// Fallback variants: the strictly-additive policy. We could
// not even attempt the skip; the full restore runs.
reason := d.fallbackReason()
if obs != nil {
obs.RestoreFallback(snapIndex, reason)
}
if logger != nil {
logger.Info("restoreSnapshotState fallback to full restore",
zap.Uint64("snapshot_index", snapIndex),
zap.String("reason", reason),
)
}
}
}

// applyHeaderStateOnSkip mirrors openAndRestoreFSMSnapshot's safety
// contract (size + footer-vs-tokenCRC + full-body-CRC) but applies
// only the header side-effects (HLC ceiling + Stage 8a cutover)
// instead of running the body restore. The body bytes are read for
// CRC coverage but discarded -- fsm.db already holds equivalent
// state, which is precisely the reason we're skipping the restore.
//
// FSMs that do not implement raftengine.SnapshotHeaderApplier
// silently no-op the apply phase -- the FSM has no header state to
// carry forward, and the CRC verification still runs (with no
// observable side-effect on success). On any verification failure
// the typed error propagates and FSM state stays untouched.
//
// See PR #910 design §5 round-7 (two-phase seam) + round-6
// (three-step CRC mirroring openAndRestoreFSMSnapshot).
func applyHeaderStateOnSkip(fsm StateMachine, snapPath string, tokenCRC uint32) error {
file, err := os.Open(snapPath)
if err != nil {
return statFSMFileError(err)
}
defer func() { _ = file.Close() }()

info, err := file.Stat()
if err != nil {
return errors.WithStack(err)
}
footer, err := verifyFSMSnapshotPrefix(file, info.Size(), snapPath, tokenCRC)
if err != nil {
return err
}

// Step 3: full-body CRC. Wrap the payload in a crc32 TeeReader
// and hand it to the FSM's ParseSnapshotHeader for header parse
// + drain. Every payload byte flows through h, matching
// restoreAndComputeCRC's boundary in openAndRestoreFSMSnapshot.
if _, err := file.Seek(0, io.SeekStart); err != nil {
return errors.WithStack(err)
}
payloadSize := info.Size() - fsmFooterSize
h := crc32.New(crc32cTable)
tee := io.TeeReader(io.LimitReader(file, payloadSize), h)

setter, hasSetter := fsm.(raftengine.SnapshotHeaderApplier)
ceiling, cutover, err := readSnapshotHeaderOrDrain(setter, hasSetter, tee)
if err != nil {
return err
}

if h.Sum32() != footer {
return errors.Wrapf(ErrFSMSnapshotFileCRC,
"path=%s footer=%08x computed=%08x", snapPath, footer, h.Sum32())
}

// All three checks passed; apply side-effects (pure assignment
// in the FSM). Skipped silently when the FSM does not expose
// the seam.
if hasSetter {
setter.ApplySnapshotHeader(ceiling, cutover)
}
return nil
}

// verifyFSMSnapshotPrefix runs the first two cheap checks of
// openAndRestoreFSMSnapshot's three-step contract: size and
// footer-vs-tokenCRC. Returns the on-disk footer value (caller
// reuses it for the step-3 full-body CRC compare). Typed errors
// surface unchanged.
func verifyFSMSnapshotPrefix(file *os.File, fileSize int64, snapPath string, tokenCRC uint32) (uint32, error) {
if fileSize < fsmMinFileSize {
return 0, errors.Wrapf(ErrFSMSnapshotTooSmall,
"file too small: %d bytes (minimum %d)", fileSize, fsmMinFileSize)
}
footer, err := readFSMFooter(file, fileSize)
if err != nil {
return 0, err
}
if footer != tokenCRC {
return 0, errors.Wrapf(ErrFSMSnapshotTokenCRC,
"path=%s footer=%08x token=%08x", snapPath, footer, tokenCRC)
}
return footer, nil
}

// readSnapshotHeaderOrDrain branches on whether the FSM exposes the
// SnapshotHeaderApplier seam: when present, delegate to
// ParseSnapshotHeader (which parses the header AND drains the rest);
// otherwise drain the entire payload through the tee'd reader so the
// CRC pass covers every byte. The (ceiling, cutover) tuple is zero
// in the no-seam case -- the caller's ApplySnapshotHeader branch
// short-circuits on hasSetter, so the zero values are inert.
func readSnapshotHeaderOrDrain(setter raftengine.SnapshotHeaderApplier, hasSetter bool, tee io.Reader) (uint64, uint64, error) {
if hasSetter {
ceiling, cutover, err := setter.ParseSnapshotHeader(tee)
if err != nil {
return 0, 0, errors.WithStack(err)
}
return ceiling, cutover, nil
}
if _, err := io.Copy(io.Discard, tee); err != nil {
return 0, 0, errors.WithStack(err)
}
return 0, 0, nil
}

func walSnapshotFor(snapshot raftpb.Snapshot) walpb.Snapshot {
return walpb.Snapshot{
Index: snapshot.Metadata.Index,
Expand Down Expand Up @@ -310,7 +519,12 @@ func persistBootState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fs
return nil, err
}
if wal.Exist(walDir) {
return loadWalState(logger, walDir, snapDir, fsmSnapDir, fsm)
// Recursive load after bootstrap-style setup: no observer
// needed because the engine has not handed one to us
// (bootstrap path runs before OpenConfig wiring reaches
// this point) and a fresh-bootstrap restore will be a no-op
// anyway (the WAL was just created).
return loadWalState(logger, walDir, snapDir, fsmSnapDir, fsm, nil)
}

w, err := wal.Create(logger, walDir, nil)
Expand Down
Loading
Loading