Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,299 changes: 1,299 additions & 0 deletions docs/superpowers/plans/2026-05-09-f-130-recovery-mode-admin-http-surface.md

Large diffs are not rendered by default.

143 changes: 135 additions & 8 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
Expand Down Expand Up @@ -65,14 +66,19 @@ type Daemon struct {
automationEngine *automation.Engine
configDir string

phase atomic.Int32
recoveryInfo atomic.Pointer[recoveryState]
phase atomic.Int32
recoveryInfo atomic.Pointer[recoveryState]
shutdownCancel atomic.Pointer[context.CancelFunc]
}

type recoveryState struct {
reason string
reason string
failedPos uint64
}

// Compile-time assertion: *Daemon must satisfy RecoveryProvider.
var _ observability.RecoveryProvider = (*Daemon)(nil)

// Version, Commit, and GoVersion are set via -ldflags at build time.
var (
Version = "dev"
Expand All @@ -88,6 +94,10 @@ func New(cfg Config, logger *slog.Logger, metrics *observability.Metrics) *Daemo

// Run boots through phases 1-5 and blocks until ctx is done.
func (d *Daemon) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
d.shutdownCancel.Store(&cancel)
defer cancel()

d.metrics.SetBuildInfo(Version, Commit, GoVersion)
start := time.Now()

Expand All @@ -114,7 +124,7 @@ func (d *Daemon) Run(ctx context.Context) error {
defer func() { _ = db.Close() }()

go func() {
_ = d.metrics.ServeMetrics(ctx, fmt.Sprintf(":%d", d.cfg.AdminPort), d.healthStatus)
_ = d.metrics.ServeMetrics(ctx, fmt.Sprintf(":%d", d.cfg.AdminPort), d.healthStatus, d)
}()

// Phase 2: construct projectors
Expand Down Expand Up @@ -173,7 +183,12 @@ func (d *Daemon) Run(ctx context.Context) error {
d.metrics.StartupPhase.Set(3)

if err := store.Replay(ctx); err != nil {
d.enterRecovery(err.Error())
var replayErr *eventstore.ReplayError
var failedPos uint64
if errors.As(err, &replayErr) {
failedPos = replayErr.Position
}
d.enterRecovery(ctx, err.Error(), failedPos)
<-ctx.Done()
return nil
}
Expand Down Expand Up @@ -586,12 +601,124 @@ func (d *Daemon) healthStatus() (string, int) {
}
}

func (d *Daemon) enterRecovery(reason string) {
func (d *Daemon) enterRecovery(ctx context.Context, reason string, failedPos uint64) {
d.metrics.RecoveryModeEntered.Inc()
d.phase.Store(-1)
d.metrics.StartupPhase.Set(-1)
d.recoveryInfo.Store(&recoveryState{reason: reason})
d.logger.Error("entering recovery mode", "reason", reason)
d.recoveryInfo.Store(&recoveryState{reason: reason, failedPos: failedPos})
d.logger.Error("entering recovery mode", "reason", reason, "failed_position", failedPos)
go func() {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
for {
select {
case <-t.C:
d.logger.Error("daemon in recovery mode — operator action required",
"reason", reason,
"failed_position", failedPos,
)
case <-ctx.Done():
return
}
}
}()
}

func (d *Daemon) InRecovery() bool {
return d.phase.Load() == -1
}

func (d *Daemon) RecoveryInfo() (string, uint64) {
if info := d.recoveryInfo.Load(); info != nil {
return info.reason, info.failedPos
}
return "", 0
}

// QueryEvents returns up to limit events starting from around position.
// Uses position - limit as the exclusive lower bound so the failing event is included.
func (d *Daemon) QueryEvents(ctx context.Context, position uint64, limit int) ([]observability.RecoveryEvent, error) {
var from uint64
if position > uint64(limit) {
from = position - uint64(limit) - 1
}
events, err := d.store.Query(ctx, eventstore.QueryOptions{
FromPosition: from,
Limit: limit,
})
if err != nil {
return nil, err
}
out := make([]observability.RecoveryEvent, len(events))
for i, e := range events {
out[i] = observability.RecoveryEvent{
Position: e.Position,
Timestamp: e.Timestamp,
Kind: e.Kind,
Entity: e.Entity,
Source: e.Source,
}
}
return out, nil
}

func (d *Daemon) QueryProjectionCursors(ctx context.Context) ([]observability.ProjectionCursor, error) {
rows, err := d.db.QueryContext(ctx,
`SELECT name, position, updated_at FROM projection_cursors ORDER BY name`)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
var out []observability.ProjectionCursor
for rows.Next() {
var c observability.ProjectionCursor
var updatedAtNano int64
if err := rows.Scan(&c.Name, &c.Position, &updatedAtNano); err != nil {
return nil, err
}
c.UpdatedAt = time.Unix(0, updatedAtNano)
out = append(out, c)
}
return out, rows.Err()
}

func (d *Daemon) QuerySkippedEvents(ctx context.Context) ([]observability.SkippedEvent, error) {
rows, err := d.db.QueryContext(ctx,
`SELECT position, projector, skipped_at, skipped_by, reason FROM skipped_events ORDER BY position, projector`)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
var out []observability.SkippedEvent
for rows.Next() {
var e observability.SkippedEvent
var skippedAtNano int64
if err := rows.Scan(&e.Position, &e.Projector, &skippedAtNano, &e.SkippedBy, &e.Reason); err != nil {
return nil, err
}
e.SkippedAt = time.Unix(0, skippedAtNano)
out = append(out, e)
}
return out, rows.Err()
}

func (d *Daemon) SkipEvent(ctx context.Context, position uint64, projector, reason, skippedBy string) error {
_, err := d.db.ExecContext(ctx, `
INSERT INTO skipped_events (position, projector, skipped_at, skipped_by, reason)
VALUES (?, ?, ?, ?, ?)`,
position, projector, time.Now().UnixNano(), skippedBy, reason,
)
return err
}

func (d *Daemon) ProjectorNames() []string {
return d.store.ProjectorNames()
}

func (d *Daemon) Shutdown() {
if fn := d.shutdownCancel.Load(); fn != nil {
(*fn)()
}
}

func expandHome(path string) string {
Expand Down
24 changes: 22 additions & 2 deletions internal/eventstore/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,25 @@ import (

const maxUint64 = ^uint64(0)

// ReplayError is returned by Replay when a sync projector's Apply fails.
// It carries the event position and projector name for recovery introspection.
type ReplayError struct {
Position uint64
Projector string
Err error
}

func (e *ReplayError) Error() string {
return fmt.Sprintf("replay failed at position %d (projector %s): %v",
e.Position, e.Projector, e.Err)
}

func (e *ReplayError) Unwrap() error { return e.Err }

// Replay restores each sync projector's snapshot, then applies pending
// events in 1000-event batches. Call before Start; not safe after tailer is live.
// On Apply failure, returns a *ReplayError which can be unwrapped with errors.As
// to retrieve the failing position and projector name.
func (s *Store) Replay(ctx context.Context) error {
if s.started.Load() {
return errors.New("Replay: store already started")
Expand Down Expand Up @@ -131,8 +148,11 @@ func (s *Store) replayBatch(ctx context.Context, after uint64, limit int) (uint6
continue
}
if err := reg.p.Apply(ctx, tx, e); err != nil {
return 0, fmt.Errorf("replay projector %s at position %d: %w",
reg.p.Name(), e.Position, err)
return 0, &ReplayError{
Position: e.Position,
Projector: reg.p.Name(),
Err: err,
}
}
}
}
Expand Down
94 changes: 94 additions & 0 deletions internal/eventstore/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package eventstore_test

import (
"context"
"errors"
"strings"
"testing"
"time"

"github.com/fdatoo/switchyard/internal/eventstore"
"github.com/fdatoo/switchyard/internal/observability"
Expand Down Expand Up @@ -49,3 +52,94 @@ func TestReplay_RebuildsStateFromEvents(t *testing.T) {
t.Fatalf("brightness = %d, want 200", s.Attributes.GetLight().Brightness)
}
}

func TestReplay_ReturnsReplayError(t *testing.T) {
ctx := context.Background()

// Populate the DB with one event using a store that has no failing projector.
f := newStoreFixture(t)
if err := f.store.Start(ctx); err != nil {
t.Fatal(err)
}
pos, err := f.store.Append(ctx, testutil.StateChanged("light.x", 1))
if err != nil {
t.Fatal(err)
}
_ = f.store.Close(ctx)

// Replay on a fresh store with a projector that fails on the first Apply call.
f2 := newStoreFixtureOnDB(t, f.db)
if err := f2.store.RegisterProjector(&countingProjector{name: "boom", failAt: 1}, eventstore.ProjectorModeSync); err != nil {
t.Fatal(err)
}
err = f2.store.Replay(ctx)
if err == nil {
t.Fatal("expected replay to fail")
}
var re *eventstore.ReplayError
if !errors.As(err, &re) {
t.Fatalf("expected *eventstore.ReplayError, got %T: %v", err, err)
}
if re.Position != pos {
t.Fatalf("ReplayError.Position = %d, want %d", re.Position, pos)
}
if re.Projector != "boom" {
t.Fatalf("ReplayError.Projector = %q, want %q", re.Projector, "boom")
}
if re.Err == nil {
t.Fatal("ReplayError.Err must not be nil")
}
// Verify the inner error is accessible via the unwrap chain.
if re.Err == nil || !strings.Contains(re.Err.Error(), "intentional failure") {
t.Fatalf("expected inner error to contain 'intentional failure', got: %v", re.Err)
}
}

func TestReplay_SkipEventAllowsReplayToProceed(t *testing.T) {
ctx := context.Background()

// Phase A: populate the DB with one event using a store with no failing projector.
f := newStoreFixture(t)
if err := f.store.Start(ctx); err != nil {
t.Fatal(err)
}
pos, err := f.store.Append(ctx, testutil.StateChanged("light.z", 1))
if err != nil {
t.Fatal(err)
}
t.Logf("appended event at position %d", pos)
_ = f.store.Close(ctx)

// Phase B: replay with a projector that fails on the first Apply → ReplayError.
f2 := newStoreFixtureOnDB(t, f.db)
if err := f2.store.RegisterProjector(&countingProjector{name: "boom", failAt: 1}, eventstore.ProjectorModeSync); err != nil {
t.Fatal(err)
}
replayErr := f2.store.Replay(ctx)
if replayErr == nil {
t.Fatal("expected replay to fail")
}
var re *eventstore.ReplayError
if !errors.As(replayErr, &re) {
t.Fatalf("expected *eventstore.ReplayError, got %T", replayErr)
}

// Phase C: insert a skip row for (position, "boom") — simulates POST /events/{pos}/skip.
_, err = f.db.ExecContext(ctx, `
INSERT INTO skipped_events (position, projector, skipped_at, skipped_by, reason)
VALUES (?, ?, ?, ?, ?)`,
re.Position, "boom", time.Now().UnixNano(), "integration-test", "skip to unblock replay",
)
if err != nil {
t.Fatalf("insert skipped_events: %v", err)
}

// Phase D: replay again — must succeed now that the event is skipped.
f3 := newStoreFixtureOnDB(t, f.db)
if err := f3.store.RegisterProjector(&countingProjector{name: "boom", failAt: 1}, eventstore.ProjectorModeSync); err != nil {
t.Fatal(err)
}
if err := f3.store.Replay(ctx); err != nil {
t.Fatalf("replay after skip failed: %v", err)
}
}
9 changes: 9 additions & 0 deletions internal/eventstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ func (s *Store) RegisterProjector(p Projector, mode ProjectorMode) error {
return nil
}

// ProjectorNames returns the names of all registered projectors in registration order.
func (s *Store) ProjectorNames() []string {
names := make([]string, len(s.projectors))
for i, reg := range s.projectors {
names[i] = reg.p.Name()
}
return names
}

func (s *Store) LatestPosition() uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
Expand Down
20 changes: 20 additions & 0 deletions internal/eventstore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,23 @@ func TestAppend_SyncProjectorFailureRollsBack(t *testing.T) {
t.Fatalf("rolled-back event still present: %+v", got)
}
}

func TestStore_ProjectorNames(t *testing.T) {
f := newStoreFixture(t)
if err := f.store.RegisterProjector(&countingProjector{name: "alpha"}, eventstore.ProjectorModeSync); err != nil {
t.Fatal(err)
}
if err := f.store.RegisterProjector(&countingProjector{name: "beta"}, eventstore.ProjectorModeSync); err != nil {
t.Fatal(err)
}
got := f.store.ProjectorNames()
want := []string{"alpha", "beta"}
if len(got) != len(want) {
t.Fatalf("ProjectorNames() = %v, want %v", got, want)
}
for i := range want {
if got[i] != want[i] {
t.Fatalf("ProjectorNames()[%d] = %q, want %q", i, got[i], want[i])
}
}
}
Loading
Loading