From 9a72d972e5c85308f0c1e033ee92e8500e2b5a01 Mon Sep 17 00:00:00 2001 From: Janko Date: Fri, 13 Mar 2026 15:15:06 +0100 Subject: [PATCH] refactor(dev): split igord into product and research binaries Separate the product CLI (igord) from the research/P2P CLI (igord-lab) so Phase 2 product features can be built on a clean foundation without dragging in libp2p, migration, replay, and pricing dependencies. - igord: product CLI with run/resume/verify/inspect subcommands (0 libp2p deps) - igord-lab: research CLI preserving legacy flag-based P2P interface - internal/runner/research/: subpackage for research-specific runner functions - Makefile: add build-lab target, update run-agent to use igord-lab Co-Authored-By: Claude Opus 4.6 --- .gitignore | 3 +- Makefile | 16 +- cmd/igord-lab/main.go | 493 ++++++++++++++++++ cmd/igord/main.go | 583 +++------------------- internal/runner/research/research.go | 189 +++++++ internal/runner/research/research_test.go | 63 +++ internal/runner/runner.go | 179 +------ internal/runner/runner_test.go | 51 -- 8 files changed, 839 insertions(+), 738 deletions(-) create mode 100644 cmd/igord-lab/main.go create mode 100644 internal/runner/research/research.go create mode 100644 internal/runner/research/research_test.go diff --git a/.gitignore b/.gitignore index 65dbaa4..cd72463 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,8 @@ # Igor v0 .gitignore # Build artifacts -igord +/igord +/igord-lab bin/ dist/ build/ diff --git a/Makefile b/Makefile index b9ed3a5..f0b561f 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: help bootstrap build clean test lint vet fmt fmt-check tidy agent agent-heartbeat agent-reconciliation run-agent demo demo-portable gh-check gh-metadata gh-release +.PHONY: help bootstrap build build-lab clean test lint vet fmt fmt-check tidy agent agent-heartbeat agent-reconciliation run-agent demo demo-portable gh-check gh-metadata gh-release .DEFAULT_GOAL := help @@ -33,12 +33,18 @@ bootstrap: ## Install development toolchain and verify environment @echo "Running developer environment bootstrap..." @./scripts/bootstrap.sh -build: ## Build igord binary +build: ## Build igord binary (product CLI) @echo "Building $(BINARY_NAME)..." @mkdir -p $(BINARY_DIR) $(GOBUILD) -o $(BINARY_DIR)/$(BINARY_NAME) ./cmd/igord @echo "Built $(BINARY_DIR)/$(BINARY_NAME)" +build-lab: ## Build igord-lab binary (research/P2P CLI) + @echo "Building igord-lab..." + @mkdir -p $(BINARY_DIR) + $(GOBUILD) -o $(BINARY_DIR)/igord-lab ./cmd/igord-lab + @echo "Built $(BINARY_DIR)/igord-lab" + clean: ## Remove build artifacts @echo "Cleaning build artifacts..." $(GOCLEAN) @@ -62,7 +68,7 @@ lint: ## Run golangci-lint vet: ## Run go vet @echo "Running go vet..." - $(GOVET) ./cmd/... ./internal/... ./pkg/... + $(GOVET) ./cmd/... ./internal/... ./pkg/... ./sdk/... fmt: ## Format code with gofmt and goimports @echo "Formatting code..." @@ -94,9 +100,9 @@ agent: ## Build example agent WASM cd $(AGENT_DIR) && $(MAKE) build @echo "Agent built: $(AGENT_DIR)/agent.wasm" -run-agent: build agent ## Build and run example agent locally +run-agent: build-lab agent ## Build and run example agent locally (uses igord-lab) @echo "Running agent with default budget (1.0)..." - ./$(BINARY_DIR)/$(BINARY_NAME) --run-agent $(AGENT_DIR)/agent.wasm --budget 1.0 + ./$(BINARY_DIR)/igord-lab --run-agent $(AGENT_DIR)/agent.wasm --budget 1.0 agent-heartbeat: ## Build heartbeat demo agent WASM @echo "Building heartbeat agent..." diff --git a/cmd/igord-lab/main.go b/cmd/igord-lab/main.go new file mode 100644 index 0000000..374aa39 --- /dev/null +++ b/cmd/igord-lab/main.go @@ -0,0 +1,493 @@ +// SPDX-License-Identifier: Apache-2.0 + +// igord-lab is the research/P2P CLI for Igor. It preserves the legacy flag-based +// interface with full migration, replay verification, lease management, and +// pricing support. For the product CLI, use igord instead. +package main + +import ( + "context" + "flag" + "fmt" + "log/slog" + "os" + "os/signal" + "syscall" + "time" + + "github.com/simonovic86/igor/internal/agent" + "github.com/simonovic86/igor/internal/authority" + "github.com/simonovic86/igor/internal/config" + "github.com/simonovic86/igor/internal/inspector" + "github.com/simonovic86/igor/internal/logging" + "github.com/simonovic86/igor/internal/migration" + "github.com/simonovic86/igor/internal/p2p" + "github.com/simonovic86/igor/internal/pricing" + "github.com/simonovic86/igor/internal/replay" + "github.com/simonovic86/igor/internal/runner" + "github.com/simonovic86/igor/internal/runner/research" + "github.com/simonovic86/igor/internal/runtime" + "github.com/simonovic86/igor/internal/settlement" + "github.com/simonovic86/igor/internal/simulator" + "github.com/simonovic86/igor/internal/storage" + "github.com/simonovic86/igor/pkg/budget" + "github.com/simonovic86/igor/pkg/identity" +) + +func main() { + // Parse CLI flags + runAgent := flag.String("run-agent", "", "Path to WASM agent to run locally") + budgetFlag := flag.Float64("budget", 1.0, "Initial budget for agent execution") + manifestPath := flag.String("manifest", "", "Path to capability manifest JSON (default: .manifest.json)") + migrateAgent := flag.String("migrate-agent", "", "Agent ID to migrate") + targetPeer := flag.String("to", "", "Target peer multiaddr for migration") + wasmPath := flag.String("wasm", "", "WASM binary path for migration") + replayWindow := flag.Int("replay-window", 0, "Number of recent tick snapshots to retain for verification (0 = use config default)") + verifyInterval := flag.Int("verify-interval", 0, "Ticks between self-verification passes (0 = use config default)") + replayMode := flag.String("replay-mode", "", "Replay verification mode: off, periodic, on-migrate, full (default: full)") + replayCostLog := flag.Bool("replay-cost-log", false, "Log replay compute duration for economic observability") + replayOnDivergence := flag.String("replay-on-divergence", "", "Escalation policy on replay divergence: log, pause, intensify, migrate (default: log)") + inspectCheckpoint := flag.String("inspect-checkpoint", "", "Path to checkpoint file to inspect") + inspectWASM := flag.String("inspect-wasm", "", "Optional WASM binary to verify against checkpoint hash") + simulate := flag.Bool("simulate", false, "Run agent in local simulator mode (no P2P)") + simTicks := flag.Int("ticks", 0, "Number of ticks to simulate (0 = until budget exhausted)") + simVerify := flag.Bool("verify", false, "Per-tick replay verification during simulation") + simDeterministic := flag.Bool("deterministic", false, "Use fixed clock and seeded rand for reproducible simulation") + simSeed := flag.Uint64("seed", 0, "Random seed for deterministic simulation") + leaseDuration := flag.Duration("lease-duration", 60*time.Second, "Lease validity period (0 = disabled)") + leaseGrace := flag.Duration("lease-grace", 10*time.Second, "Grace period after lease expiry") + migrationRetries := flag.Int("migration-retries", 0, "Max retries per migration target (0 = use config default)") + migrationRetryDelay := flag.Duration("migration-retry-delay", 0, "Initial backoff delay between migration retries (0 = use config default)") + flag.Parse() + + // Checkpoint inspector — standalone, no config/P2P/engine needed + if *inspectCheckpoint != "" { + runInspector(*inspectCheckpoint, *inspectWASM) + return + } + + // Local simulator — standalone, no config/P2P needed + if *simulate && *runAgent != "" { + runSimulator(*runAgent, *manifestPath, *budgetFlag, *simTicks, *simVerify, *simDeterministic, *simSeed) + return + } + + // Create context + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Load configuration + cfg, err := config.Load() + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to load config: %v\n", err) + os.Exit(1) + } + + // Apply CLI overrides + applyCLIOverrides(cfg, *replayWindow, *verifyInterval, *replayMode, *replayCostLog, + *replayOnDivergence, *leaseDuration, *leaseGrace, *migrationRetries, *migrationRetryDelay) + + // Initialize logging + logger := logging.NewLogger() + + // Print startup banner + logger.Info("Igor Lab Node starting...") + logger.Info("NodeID: " + cfg.NodeID) + + // Initialize P2P node + node, err := p2p.NewNode(ctx, cfg, logger) + if err != nil { + logger.Error("Failed to create P2P node", "error", err) + os.Exit(1) + } + defer node.Close() + + // Create storage provider + storageProvider, err := storage.NewFSProvider(cfg.CheckpointDir, logger) + if err != nil { + logger.Error("Failed to create storage provider", "error", err) + os.Exit(1) + } + + // Create WASM runtime engine for migration service + engine, err := runtime.NewEngine(ctx, logger) + if err != nil { + logger.Error("Failed to create runtime engine", "error", err) + os.Exit(1) + } + defer engine.Close(ctx) + + // Initialize migration service + leaseCfg := authority.LeaseConfig{ + Duration: cfg.LeaseDuration, + RenewalWindow: cfg.LeaseRenewalWindow, + GracePeriod: cfg.LeaseGracePeriod, + } + migrationSvc := migration.NewService(node.Host, engine, storageProvider, cfg.ReplayMode, cfg.ReplayCostLog, cfg.PricePerSecond, leaseCfg, logger) + + // Initialize pricing service for inter-node price discovery + _ = pricing.NewService(node.Host, cfg.PricePerSecond, logger) + + // If --migrate-agent flag is provided, perform migration + if *migrateAgent != "" { + if *targetPeer == "" { + logger.Error("Migration requires --to flag with target peer address") + os.Exit(1) + } + if *wasmPath == "" { + logger.Error("Migration requires --wasm flag with WASM binary path") + os.Exit(1) + } + + logger.Info("Initiating agent migration", + "agent_id", *migrateAgent, + "target", *targetPeer, + ) + + if err := migrationSvc.MigrateAgent(ctx, *migrateAgent, *wasmPath, *targetPeer); err != nil { + logger.Error("Migration failed", "error", err) + os.Exit(1) + } + + logger.Info("Migration completed successfully") + return + } + + // If --run-agent flag is provided, run agent locally + if *runAgent != "" { + budgetMicrocents := budget.FromFloat(*budgetFlag) + if err := runLocalAgent(ctx, cfg, engine, storageProvider, *runAgent, budgetMicrocents, *manifestPath, migrationSvc, node, logger); err != nil { + logger.Error("Failed to run agent", "error", err) + os.Exit(1) + } + return + } + + logger.Info("Igor Lab Node ready") + + // Block until interrupted + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + + logger.Info("Igor Lab Node shutting down...") +} + +// runLocalAgent loads and executes an agent locally with tick loop and checkpointing. +func runLocalAgent( + ctx context.Context, + cfg *config.Config, + engine *runtime.Engine, + storageProvider storage.Provider, + wasmPath string, + budgetMicrocents int64, + manifestPathFlag string, + migrationSvc *migration.Service, + node *p2p.Node, + logger *slog.Logger, +) error { + manifestData := runner.LoadManifestData(wasmPath, manifestPathFlag, logger) + + signingKey, nodeID := research.ExtractSigningKey(node) + + // Load or generate agent cryptographic identity for signed checkpoint lineage. + agentIdent, err := loadOrGenerateIdentity(ctx, storageProvider, "local-agent", logger) + if err != nil { + return fmt.Errorf("agent identity: %w", err) + } + + // Load agent with budget and manifest + instance, err := agent.LoadAgent( + ctx, + engine, + wasmPath, + "local-agent", + storageProvider, + budgetMicrocents, + cfg.PricePerSecond, + manifestData, + signingKey, + nodeID, + agentIdent, + logger, + ) + if err != nil { + return fmt.Errorf("failed to load agent: %w", err) + } + defer instance.Close(ctx) + + if err := initLocalAgent(ctx, cfg, instance, migrationSvc, budgetMicrocents, logger); err != nil { + return err + } + + // Setup signal handling + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + // Create replay engine for CM-4 verification + replayEngine := replay.NewEngine(logger) + defer replayEngine.Close(ctx) + + // Adaptive tick loop constants. + const ( + normalTickInterval = 1 * time.Second + minTickInterval = 10 * time.Millisecond + ) + + checkpointTicker := time.NewTicker(5 * time.Second) + defer checkpointTicker.Stop() + + // Self-verification state (CM-4: Observation Determinism) + var ticksSinceVerify int + var lastVerifiedTick uint64 + + periodicVerify := cfg.ReplayMode == "periodic" || cfg.ReplayMode == "full" + + logger.Info("Starting agent tick loop", + "replay_window", cfg.ReplayWindowSize, + "verify_interval", cfg.VerifyInterval, + "replay_mode", cfg.ReplayMode, + ) + + tickTimer := time.NewTimer(normalTickInterval) + defer tickTimer.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + + case <-sigChan: + logger.Info("Received interrupt signal, checkpointing and shutting down...") + + // Final checkpoint before exit + if err := instance.SaveCheckpointToStorage(ctx); err != nil { + logger.Error("Failed to save checkpoint on shutdown", "error", err) + } + return nil + + case <-tickTimer.C: + result, err := handleTick(ctx, instance, cfg, replayEngine, periodicVerify, + &ticksSinceVerify, &lastVerifiedTick, logger) + if err != nil { + return err + } + switch result { + case tickRecovered: + tickTimer.Reset(normalTickInterval) + continue + case tickStopped: + return nil + case tickFastPath: + tickTimer.Reset(minTickInterval) + default: + tickTimer.Reset(normalTickInterval) + } + + case <-checkpointTicker.C: + // Periodic checkpoint + if err := instance.SaveCheckpointToStorage(ctx); err != nil { + logger.Error("Failed to save checkpoint", "error", err) + } + } + } +} + +// initLocalAgent configures, initializes, and resumes a local agent instance. +func initLocalAgent( + ctx context.Context, + cfg *config.Config, + instance *agent.Instance, + migrationSvc *migration.Service, + budgetMicrocents int64, + logger *slog.Logger, +) error { + instance.SetReplayWindowSize(cfg.ReplayWindowSize) + instance.BudgetAdapter = settlement.NewMockAdapter(logger) + migrationSvc.RegisterAgent("local-agent", instance) + + logger.Info("Agent loaded with budget", + "budget", budget.Format(budgetMicrocents), + "price_per_second", budget.Format(cfg.PricePerSecond), + ) + + if err := instance.Init(ctx); err != nil { + return fmt.Errorf("failed to initialize agent: %w", err) + } + + if err := instance.LoadCheckpointFromStorage(ctx); err != nil { + logger.Error("Failed to load checkpoint", "error", err) + } + + if cfg.LeaseDuration > 0 { + leaseCfg := authority.LeaseConfig{ + Duration: cfg.LeaseDuration, + RenewalWindow: cfg.LeaseRenewalWindow, + GracePeriod: cfg.LeaseGracePeriod, + } + instance.Lease = authority.NewLease(leaseCfg) + logger.Info("Lease granted", + "epoch", instance.Lease.Epoch, + "expiry", instance.Lease.Expiry, + "duration", cfg.LeaseDuration, + ) + } + return nil +} + +// tickResult indicates the outcome of a single tick iteration. +type tickResult int + +const ( + tickNormal tickResult = iota // Normal tick, use standard interval. + tickFastPath // Agent has more work, use fast interval. + tickRecovered // Lease recovered, continue immediately. + tickStopped // Divergence action requires stopping. +) + +// handleTick processes a single tick: lease check, agent tick, verification. +func handleTick( + ctx context.Context, + instance *agent.Instance, + cfg *config.Config, + replayEngine *replay.Engine, + periodicVerify bool, + ticksSinceVerify *int, + lastVerifiedTick *uint64, + logger *slog.Logger, +) (tickResult, error) { + // Pre-tick lease validation (EI-6: safety over liveness) + if leaseErr := research.CheckAndRenewLease(instance, logger); leaseErr != nil { + if instance.Lease != nil && instance.Lease.State == authority.StateRecoveryRequired { + if recoverErr := research.AttemptLeaseRecovery(ctx, instance, logger); recoverErr == nil { + return tickRecovered, nil + } + } + return tickNormal, research.HandleLeaseExpiry(ctx, instance, leaseErr, logger) + } + + hasMoreWork, tickErr := runner.SafeTick(ctx, instance) + if tickErr != nil { + return tickNormal, runner.HandleTickFailure(ctx, instance, tickErr, logger) + } + + *ticksSinceVerify++ + if periodicVerify && cfg.VerifyInterval > 0 && *ticksSinceVerify >= cfg.VerifyInterval { + *ticksSinceVerify = 0 + var action runner.DivergenceAction + *lastVerifiedTick, action = research.VerifyNextTick(ctx, instance, replayEngine, *lastVerifiedTick, cfg.ReplayCostLog, cfg.ReplayOnDivergence, logger) + if stop := research.HandleDivergenceAction(ctx, instance, cfg, action, nil, logger); stop { + return tickStopped, nil + } + } + + if hasMoreWork { + return tickFastPath, nil + } + return tickNormal, nil +} + +// applyCLIOverrides applies command-line flag values to the configuration. +func applyCLIOverrides(cfg *config.Config, replayWindow, verifyInterval int, replayMode string, + replayCostLog bool, replayOnDivergence string, leaseDuration, leaseGrace time.Duration, + migrationRetries int, migrationRetryDelay time.Duration) { + if replayWindow > 0 { + cfg.ReplayWindowSize = replayWindow + } + if verifyInterval > 0 { + cfg.VerifyInterval = verifyInterval + } + if replayMode != "" { + cfg.ReplayMode = replayMode + } + if replayCostLog { + cfg.ReplayCostLog = true + } + if replayOnDivergence != "" { + cfg.ReplayOnDivergence = replayOnDivergence + } + cfg.LeaseDuration = leaseDuration + cfg.LeaseGracePeriod = leaseGrace + if migrationRetries > 0 { + cfg.MigrationMaxRetries = migrationRetries + } + if migrationRetryDelay > 0 { + cfg.MigrationRetryDelay = migrationRetryDelay + } +} + +// runInspector parses and displays a checkpoint file. +func runInspector(checkpointPath, wasmPath string) { + result, err := inspector.InspectFile(checkpointPath) + if err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + os.Exit(1) + } + if wasmPath != "" { + if verr := result.VerifyWASM(wasmPath); verr != nil { + fmt.Fprintf(os.Stderr, "Warning: %v\n", verr) + } + } + result.Print(os.Stdout) +} + +// runSimulator executes an agent in local simulator mode (no P2P). +func runSimulator(wasmPath, manifestPath string, budgetVal float64, ticks int, verify, deterministic bool, seed uint64) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + logger := logging.NewLogger() + cfg := simulator.Config{ + WASMPath: wasmPath, + ManifestPath: manifestPath, + Budget: budgetVal, + Ticks: ticks, + Verify: verify, + Deterministic: deterministic, + RandSeed: seed, + } + result, err := simulator.Run(ctx, cfg, logger) + if err != nil { + logger.Error("Simulation failed", "error", err) + os.Exit(1) + } + simulator.PrintSummary(result, logger) + if len(result.Errors) > 0 { + os.Exit(1) + } +} + +// loadOrGenerateIdentity loads an existing agent identity from storage, +// or generates a new one and persists it. +func loadOrGenerateIdentity( + ctx context.Context, + storageProvider storage.Provider, + agentID string, + logger *slog.Logger, +) (*identity.AgentIdentity, error) { + data, err := storageProvider.LoadIdentity(ctx, agentID) + if err == nil { + id, parseErr := identity.UnmarshalBinary(data) + if parseErr != nil { + logger.Warn("Corrupted agent identity, generating new", "error", parseErr) + } else { + logger.Info("Agent identity loaded", + "agent_id", agentID, + "pub_key_size", len(id.PublicKey), + ) + return id, nil + } + } + + id, err := identity.Generate() + if err != nil { + return nil, fmt.Errorf("generate identity: %w", err) + } + + if err := storageProvider.SaveIdentity(ctx, agentID, id.MarshalBinary()); err != nil { + return nil, fmt.Errorf("save identity: %w", err) + } + + logger.Info("Agent identity generated and saved", "agent_id", agentID) + return id, nil +} diff --git a/cmd/igord/main.go b/cmd/igord/main.go index d764e97..75ac043 100644 --- a/cmd/igord/main.go +++ b/cmd/igord/main.go @@ -1,5 +1,10 @@ // SPDX-License-Identifier: Apache-2.0 +// igord is the product CLI for Igor — the runtime for portable, immortal +// software agents. Subcommands: run, resume, verify, inspect. +// +// For the research/P2P CLI with migration, replay verification, and lease +// management, use igord-lab instead. package main import ( @@ -14,510 +19,55 @@ import ( "time" "github.com/simonovic86/igor/internal/agent" - "github.com/simonovic86/igor/internal/authority" - "github.com/simonovic86/igor/internal/config" "github.com/simonovic86/igor/internal/inspector" "github.com/simonovic86/igor/internal/logging" - "github.com/simonovic86/igor/internal/migration" - "github.com/simonovic86/igor/internal/p2p" - "github.com/simonovic86/igor/internal/pricing" - "github.com/simonovic86/igor/internal/replay" "github.com/simonovic86/igor/internal/runner" "github.com/simonovic86/igor/internal/runtime" "github.com/simonovic86/igor/internal/settlement" - "github.com/simonovic86/igor/internal/simulator" "github.com/simonovic86/igor/internal/storage" "github.com/simonovic86/igor/pkg/budget" "github.com/simonovic86/igor/pkg/identity" ) func main() { - // Handle subcommands before flag parsing. - if len(os.Args) > 1 { - switch os.Args[1] { - case "run": - subcmdRun(os.Args[2:]) - return - case "resume": - subcmdResume(os.Args[2:]) - return - case "verify": - subcmdVerify(os.Args[2:]) - return - case "inspect": - subcmdInspect(os.Args[2:]) - return - } - } - - // Legacy flag-based CLI (backwards compatible). - // Parse CLI flags - runAgent := flag.String("run-agent", "", "Path to WASM agent to run locally") - budgetFlag := flag.Float64("budget", 1.0, "Initial budget for agent execution") - manifestPath := flag.String("manifest", "", "Path to capability manifest JSON (default: .manifest.json)") - migrateAgent := flag.String("migrate-agent", "", "Agent ID to migrate") - targetPeer := flag.String("to", "", "Target peer multiaddr for migration") - wasmPath := flag.String("wasm", "", "WASM binary path for migration") - replayWindow := flag.Int("replay-window", 0, "Number of recent tick snapshots to retain for verification (0 = use config default)") - verifyInterval := flag.Int("verify-interval", 0, "Ticks between self-verification passes (0 = use config default)") - replayMode := flag.String("replay-mode", "", "Replay verification mode: off, periodic, on-migrate, full (default: full)") - replayCostLog := flag.Bool("replay-cost-log", false, "Log replay compute duration for economic observability") - replayOnDivergence := flag.String("replay-on-divergence", "", "Escalation policy on replay divergence: log, pause, intensify, migrate (default: log)") - inspectCheckpoint := flag.String("inspect-checkpoint", "", "Path to checkpoint file to inspect") - inspectWASM := flag.String("inspect-wasm", "", "Optional WASM binary to verify against checkpoint hash") - simulate := flag.Bool("simulate", false, "Run agent in local simulator mode (no P2P)") - simTicks := flag.Int("ticks", 0, "Number of ticks to simulate (0 = until budget exhausted)") - simVerify := flag.Bool("verify", false, "Per-tick replay verification during simulation") - simDeterministic := flag.Bool("deterministic", false, "Use fixed clock and seeded rand for reproducible simulation") - simSeed := flag.Uint64("seed", 0, "Random seed for deterministic simulation") - leaseDuration := flag.Duration("lease-duration", 60*time.Second, "Lease validity period (0 = disabled)") - leaseGrace := flag.Duration("lease-grace", 10*time.Second, "Grace period after lease expiry") - migrationRetries := flag.Int("migration-retries", 0, "Max retries per migration target (0 = use config default)") - migrationRetryDelay := flag.Duration("migration-retry-delay", 0, "Initial backoff delay between migration retries (0 = use config default)") - flag.Parse() - - // Checkpoint inspector — standalone, no config/P2P/engine needed - if *inspectCheckpoint != "" { - runInspector(*inspectCheckpoint, *inspectWASM) - return - } - - // Local simulator — standalone, no config/P2P needed - if *simulate && *runAgent != "" { - runSimulator(*runAgent, *manifestPath, *budgetFlag, *simTicks, *simVerify, *simDeterministic, *simSeed) - return - } - - // Create context - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Load configuration - cfg, err := config.Load() - if err != nil { - fmt.Fprintf(os.Stderr, "Failed to load config: %v\n", err) + if len(os.Args) < 2 { + printUsage() os.Exit(1) } - // Apply CLI overrides - applyCLIOverrides(cfg, *replayWindow, *verifyInterval, *replayMode, *replayCostLog, - *replayOnDivergence, *leaseDuration, *leaseGrace, *migrationRetries, *migrationRetryDelay) - - // Initialize logging - logger := logging.NewLogger() - - // Print startup banner - logger.Info("Igor Node starting...") - logger.Info("NodeID: " + cfg.NodeID) - - // Initialize P2P node - node, err := p2p.NewNode(ctx, cfg, logger) - if err != nil { - logger.Error("Failed to create P2P node", "error", err) + switch os.Args[1] { + case "run": + subcmdRun(os.Args[2:]) + case "resume": + subcmdResume(os.Args[2:]) + case "verify": + subcmdVerify(os.Args[2:]) + case "inspect": + subcmdInspect(os.Args[2:]) + default: + fmt.Fprintf(os.Stderr, "Unknown command: %s\n\n", os.Args[1]) + printUsage() os.Exit(1) } - defer node.Close() - - // Create storage provider - storageProvider, err := storage.NewFSProvider(cfg.CheckpointDir, logger) - if err != nil { - logger.Error("Failed to create storage provider", "error", err) - os.Exit(1) - } - - // Create WASM runtime engine for migration service - engine, err := runtime.NewEngine(ctx, logger) - if err != nil { - logger.Error("Failed to create runtime engine", "error", err) - os.Exit(1) - } - defer engine.Close(ctx) - - // Initialize migration service - leaseCfg := authority.LeaseConfig{ - Duration: cfg.LeaseDuration, - RenewalWindow: cfg.LeaseRenewalWindow, - GracePeriod: cfg.LeaseGracePeriod, - } - migrationSvc := migration.NewService(node.Host, engine, storageProvider, cfg.ReplayMode, cfg.ReplayCostLog, cfg.PricePerSecond, leaseCfg, logger) - - // Initialize pricing service for inter-node price discovery - _ = pricing.NewService(node.Host, cfg.PricePerSecond, logger) - - // If --migrate-agent flag is provided, perform migration - if *migrateAgent != "" { - if *targetPeer == "" { - logger.Error("Migration requires --to flag with target peer address") - os.Exit(1) - } - if *wasmPath == "" { - logger.Error("Migration requires --wasm flag with WASM binary path") - os.Exit(1) - } - - logger.Info("Initiating agent migration", - "agent_id", *migrateAgent, - "target", *targetPeer, - ) - - if err := migrationSvc.MigrateAgent(ctx, *migrateAgent, *wasmPath, *targetPeer); err != nil { - logger.Error("Migration failed", "error", err) - os.Exit(1) - } - - logger.Info("Migration completed successfully") - return - } - - // If --run-agent flag is provided, run agent locally - if *runAgent != "" { - budgetMicrocents := budget.FromFloat(*budgetFlag) - if err := runLocalAgent(ctx, cfg, engine, storageProvider, *runAgent, budgetMicrocents, *manifestPath, migrationSvc, node, logger); err != nil { - logger.Error("Failed to run agent", "error", err) - os.Exit(1) - } - return - } - - logger.Info("Igor Node ready") - - // Block until interrupted - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - <-sigChan - - logger.Info("Igor Node shutting down...") } -// runLocalAgent loads and executes an agent locally with tick loop and checkpointing. -func runLocalAgent( - ctx context.Context, - cfg *config.Config, - engine *runtime.Engine, - storageProvider storage.Provider, - wasmPath string, - budgetMicrocents int64, - manifestPathFlag string, - migrationSvc *migration.Service, - node *p2p.Node, - logger *slog.Logger, -) error { - manifestData := runner.LoadManifestData(wasmPath, manifestPathFlag, logger) - - signingKey, nodeID := runner.ExtractSigningKey(node) - - // Load or generate agent cryptographic identity for signed checkpoint lineage. - agentIdent, err := loadOrGenerateIdentity(ctx, storageProvider, "local-agent", logger) - if err != nil { - return fmt.Errorf("agent identity: %w", err) - } - - // Load agent with budget and manifest - instance, err := agent.LoadAgent( - ctx, - engine, - wasmPath, - "local-agent", - storageProvider, - budgetMicrocents, - cfg.PricePerSecond, - manifestData, - signingKey, - nodeID, - agentIdent, - logger, - ) - if err != nil { - return fmt.Errorf("failed to load agent: %w", err) - } - defer instance.Close(ctx) - - if err := initLocalAgent(ctx, cfg, instance, migrationSvc, budgetMicrocents, logger); err != nil { - return err - } - - // Setup signal handling - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - // Create replay engine for CM-4 verification - replayEngine := replay.NewEngine(logger) - defer replayEngine.Close(ctx) - - // Adaptive tick loop constants. - const ( - normalTickInterval = 1 * time.Second - minTickInterval = 10 * time.Millisecond - ) - - checkpointTicker := time.NewTicker(5 * time.Second) - defer checkpointTicker.Stop() - - // Self-verification state (CM-4: Observation Determinism) - var ticksSinceVerify int - var lastVerifiedTick uint64 - - periodicVerify := cfg.ReplayMode == "periodic" || cfg.ReplayMode == "full" - - logger.Info("Starting agent tick loop", - "replay_window", cfg.ReplayWindowSize, - "verify_interval", cfg.VerifyInterval, - "replay_mode", cfg.ReplayMode, - ) - - tickTimer := time.NewTimer(normalTickInterval) - defer tickTimer.Stop() - - for { - select { - case <-ctx.Done(): - return ctx.Err() - - case <-sigChan: - logger.Info("Received interrupt signal, checkpointing and shutting down...") - - // Final checkpoint before exit - if err := instance.SaveCheckpointToStorage(ctx); err != nil { - logger.Error("Failed to save checkpoint on shutdown", "error", err) - } - return nil - - case <-tickTimer.C: - result, err := handleTick(ctx, instance, cfg, replayEngine, periodicVerify, - &ticksSinceVerify, &lastVerifiedTick, logger) - if err != nil { - return err - } - switch result { - case tickRecovered: - tickTimer.Reset(normalTickInterval) - continue - case tickStopped: - return nil - case tickFastPath: - tickTimer.Reset(minTickInterval) - default: - tickTimer.Reset(normalTickInterval) - } - - case <-checkpointTicker.C: - // Periodic checkpoint - if err := instance.SaveCheckpointToStorage(ctx); err != nil { - logger.Error("Failed to save checkpoint", "error", err) - } - } - } -} - -// initLocalAgent configures, initializes, and resumes a local agent instance. -func initLocalAgent( - ctx context.Context, - cfg *config.Config, - instance *agent.Instance, - migrationSvc *migration.Service, - budgetMicrocents int64, - logger *slog.Logger, -) error { - instance.SetReplayWindowSize(cfg.ReplayWindowSize) - instance.BudgetAdapter = settlement.NewMockAdapter(logger) - migrationSvc.RegisterAgent("local-agent", instance) - - logger.Info("Agent loaded with budget", - "budget", budget.Format(budgetMicrocents), - "price_per_second", budget.Format(cfg.PricePerSecond), - ) - - if err := instance.Init(ctx); err != nil { - return fmt.Errorf("failed to initialize agent: %w", err) - } - - if err := instance.LoadCheckpointFromStorage(ctx); err != nil { - logger.Error("Failed to load checkpoint", "error", err) - } - - if cfg.LeaseDuration > 0 { - leaseCfg := authority.LeaseConfig{ - Duration: cfg.LeaseDuration, - RenewalWindow: cfg.LeaseRenewalWindow, - GracePeriod: cfg.LeaseGracePeriod, - } - instance.Lease = authority.NewLease(leaseCfg) - logger.Info("Lease granted", - "epoch", instance.Lease.Epoch, - "expiry", instance.Lease.Expiry, - "duration", cfg.LeaseDuration, - ) - } - return nil -} - -// tickResult indicates the outcome of a single tick iteration. -type tickResult int - -const ( - tickNormal tickResult = iota // Normal tick, use standard interval. - tickFastPath // Agent has more work, use fast interval. - tickRecovered // Lease recovered, continue immediately. - tickStopped // Divergence action requires stopping. -) - -// handleTick processes a single tick: lease check, agent tick, verification. -func handleTick( - ctx context.Context, - instance *agent.Instance, - cfg *config.Config, - replayEngine *replay.Engine, - periodicVerify bool, - ticksSinceVerify *int, - lastVerifiedTick *uint64, - logger *slog.Logger, -) (tickResult, error) { - // Pre-tick lease validation (EI-6: safety over liveness) - if leaseErr := runner.CheckAndRenewLease(instance, logger); leaseErr != nil { - if instance.Lease != nil && instance.Lease.State == authority.StateRecoveryRequired { - if recoverErr := runner.AttemptLeaseRecovery(ctx, instance, logger); recoverErr == nil { - return tickRecovered, nil - } - } - return tickNormal, runner.HandleLeaseExpiry(ctx, instance, leaseErr, logger) - } - - hasMoreWork, tickErr := runner.SafeTick(ctx, instance) - if tickErr != nil { - return tickNormal, runner.HandleTickFailure(ctx, instance, tickErr, logger) - } - - *ticksSinceVerify++ - if periodicVerify && cfg.VerifyInterval > 0 && *ticksSinceVerify >= cfg.VerifyInterval { - *ticksSinceVerify = 0 - var action runner.DivergenceAction - *lastVerifiedTick, action = runner.VerifyNextTick(ctx, instance, replayEngine, *lastVerifiedTick, cfg.ReplayCostLog, cfg.ReplayOnDivergence, logger) - if stop := runner.HandleDivergenceAction(ctx, instance, cfg, action, nil, logger); stop { - return tickStopped, nil - } - } - - if hasMoreWork { - return tickFastPath, nil - } - return tickNormal, nil -} - -// applyCLIOverrides applies command-line flag values to the configuration. -func applyCLIOverrides(cfg *config.Config, replayWindow, verifyInterval int, replayMode string, - replayCostLog bool, replayOnDivergence string, leaseDuration, leaseGrace time.Duration, - migrationRetries int, migrationRetryDelay time.Duration) { - if replayWindow > 0 { - cfg.ReplayWindowSize = replayWindow - } - if verifyInterval > 0 { - cfg.VerifyInterval = verifyInterval - } - if replayMode != "" { - cfg.ReplayMode = replayMode - } - if replayCostLog { - cfg.ReplayCostLog = true - } - if replayOnDivergence != "" { - cfg.ReplayOnDivergence = replayOnDivergence - } - cfg.LeaseDuration = leaseDuration - cfg.LeaseGracePeriod = leaseGrace - if migrationRetries > 0 { - cfg.MigrationMaxRetries = migrationRetries - } - if migrationRetryDelay > 0 { - cfg.MigrationRetryDelay = migrationRetryDelay - } -} - -// runInspector parses and displays a checkpoint file. -func runInspector(checkpointPath, wasmPath string) { - result, err := inspector.InspectFile(checkpointPath) - if err != nil { - fmt.Fprintf(os.Stderr, "Error: %v\n", err) - os.Exit(1) - } - if wasmPath != "" { - if verr := result.VerifyWASM(wasmPath); verr != nil { - fmt.Fprintf(os.Stderr, "Warning: %v\n", verr) - } - } - result.Print(os.Stdout) -} - -// runSimulator executes an agent in local simulator mode (no P2P). -func runSimulator(wasmPath, manifestPath string, budgetVal float64, ticks int, verify, deterministic bool, seed uint64) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - logger := logging.NewLogger() - cfg := simulator.Config{ - WASMPath: wasmPath, - ManifestPath: manifestPath, - Budget: budgetVal, - Ticks: ticks, - Verify: verify, - Deterministic: deterministic, - RandSeed: seed, - } - result, err := simulator.Run(ctx, cfg, logger) - if err != nil { - logger.Error("Simulation failed", "error", err) - os.Exit(1) - } - simulator.PrintSummary(result, logger) - if len(result.Errors) > 0 { - os.Exit(1) - } -} - -// loadOrGenerateIdentity loads an existing agent identity from storage, -// or generates a new one and persists it. The identity is the agent's Ed25519 -// keypair used for signed checkpoint lineage (Task 13). -func loadOrGenerateIdentity( - ctx context.Context, - storageProvider storage.Provider, - agentID string, - logger *slog.Logger, -) (*identity.AgentIdentity, error) { - data, err := storageProvider.LoadIdentity(ctx, agentID) - if err == nil { - id, parseErr := identity.UnmarshalBinary(data) - if parseErr != nil { - logger.Warn("Corrupted agent identity, generating new", "error", parseErr) - } else { - logger.Info("Agent identity loaded", - "agent_id", agentID, - "pub_key_size", len(id.PublicKey), - ) - return id, nil - } - } - - id, err := identity.Generate() - if err != nil { - return nil, fmt.Errorf("generate identity: %w", err) - } - - if err := storageProvider.SaveIdentity(ctx, agentID, id.MarshalBinary()); err != nil { - return nil, fmt.Errorf("save identity: %w", err) - } - - logger.Info("Agent identity generated and saved", "agent_id", agentID) - return id, nil +func printUsage() { + fmt.Fprintf(os.Stderr, "Usage: igord [flags]\n\n") + fmt.Fprintf(os.Stderr, "Commands:\n") + fmt.Fprintf(os.Stderr, " run Run a WASM agent with a new identity\n") + fmt.Fprintf(os.Stderr, " resume Resume an agent from a checkpoint file\n") + fmt.Fprintf(os.Stderr, " verify Verify checkpoint lineage chain\n") + fmt.Fprintf(os.Stderr, " inspect Display checkpoint details\n") + fmt.Fprintf(os.Stderr, "\nRun 'igord -h' for help on a specific command.\n") } // subcmdRun implements "igord run [--budget N] [--manifest path]". -// Runs an agent locally with a simplified setup (no P2P, no migration). func subcmdRun(args []string) { fs := flag.NewFlagSet("run", flag.ExitOnError) budgetFlag := fs.Float64("budget", 1.0, "Initial budget for agent execution") manifestPath := fs.String("manifest", "", "Path to capability manifest JSON") checkpointDir := fs.String("checkpoint-dir", "checkpoints", "Directory for checkpoint storage") agentID := fs.String("agent-id", "", "Agent ID (default: derived from WASM filename)") - leaseDuration := fs.Duration("lease-duration", 0, "Lease validity period (0 = disabled)") fs.Usage = func() { fmt.Fprintf(os.Stderr, "Usage: igord run [flags]\n\n") fmt.Fprintf(os.Stderr, "Run a WASM agent locally. The agent gets a DID identity,\n") @@ -538,11 +88,10 @@ func subcmdRun(args []string) { aid = agentIDFromPath(wasmPath) } - runStandalone(wasmPath, aid, *budgetFlag, *manifestPath, *checkpointDir, *leaseDuration) + runStandalone(wasmPath, aid, *budgetFlag, *manifestPath, *checkpointDir) } // subcmdResume implements "igord resume --checkpoint --wasm ". -// Resumes an agent from a checkpoint file. func subcmdResume(args []string) { fs := flag.NewFlagSet("resume", flag.ExitOnError) checkpointPath := fs.String("checkpoint", "", "Path to checkpoint file") @@ -616,11 +165,28 @@ func subcmdInspect(args []string) { fs.Usage() os.Exit(1) } - runInspectorWithDID(fs.Arg(0), *wasmPath) + + result, err := inspector.InspectFile(fs.Arg(0)) + if err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + os.Exit(1) + } + if *wasmPath != "" { + if verr := result.VerifyWASM(*wasmPath); verr != nil { + fmt.Fprintf(os.Stderr, "Warning: %v\n", verr) + } + } + result.Print(os.Stdout) + + // Show DID if checkpoint has lineage. + if result.HasLineage && len(result.AgentPubKey) == 32 { + id := &identity.AgentIdentity{PublicKey: result.AgentPubKey} + fmt.Fprintf(os.Stdout, "Agent DID: %s\n", id.DID()) + } } // runStandalone runs an agent without P2P, migration, or leases. -func runStandalone(wasmPath, agentID string, budgetVal float64, manifestPath, checkpointDir string, leaseDuration time.Duration) { +func runStandalone(wasmPath, agentID string, budgetVal float64, manifestPath, checkpointDir string) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -677,15 +243,6 @@ func runStandalone(wasmPath, agentID string, budgetVal float64, manifestPath, ch logger.Info("No existing checkpoint, starting fresh") } - if leaseDuration > 0 { - leaseCfg := authority.LeaseConfig{ - Duration: leaseDuration, - RenewalWindow: 0.5, - GracePeriod: 10 * time.Second, - } - instance.Lease = authority.NewLease(leaseCfg) - } - logger.Info("Agent started", "agent_id", agentID, "did", agentIdent.DIDShort(), @@ -703,7 +260,6 @@ func resumeFromCheckpoint(checkpointPath, wasmPath, agentID string, budgetVal fl logger := logging.NewLogger() - // Copy checkpoint file to storage directory so the agent can find it. storageProvider, err := storage.NewFSProvider(checkpointDir, logger) if err != nil { logger.Error("Failed to create storage provider", "error", err) @@ -847,25 +403,39 @@ func runTickLoop(ctx context.Context, instance *agent.Instance, logger *slog.Log } } -// runInspectorWithDID inspects a checkpoint and displays DID identity. -func runInspectorWithDID(checkpointPath, wasmPath string) { - result, err := inspector.InspectFile(checkpointPath) - if err != nil { - fmt.Fprintf(os.Stderr, "Error: %v\n", err) - os.Exit(1) - } - if wasmPath != "" { - if verr := result.VerifyWASM(wasmPath); verr != nil { - fmt.Fprintf(os.Stderr, "Warning: %v\n", verr) +// loadOrGenerateIdentity loads an existing agent identity from storage, +// or generates a new one and persists it. +func loadOrGenerateIdentity( + ctx context.Context, + storageProvider storage.Provider, + agentID string, + logger *slog.Logger, +) (*identity.AgentIdentity, error) { + data, err := storageProvider.LoadIdentity(ctx, agentID) + if err == nil { + id, parseErr := identity.UnmarshalBinary(data) + if parseErr != nil { + logger.Warn("Corrupted agent identity, generating new", "error", parseErr) + } else { + logger.Info("Agent identity loaded", + "agent_id", agentID, + "pub_key_size", len(id.PublicKey), + ) + return id, nil } } - result.Print(os.Stdout) - // Show DID if checkpoint has lineage. - if result.HasLineage && len(result.AgentPubKey) == 32 { - id := &identity.AgentIdentity{PublicKey: result.AgentPubKey} - fmt.Fprintf(os.Stdout, "Agent DID: %s\n", id.DID()) + id, err := identity.Generate() + if err != nil { + return nil, fmt.Errorf("generate identity: %w", err) + } + + if err := storageProvider.SaveIdentity(ctx, agentID, id.MarshalBinary()); err != nil { + return nil, fmt.Errorf("save identity: %w", err) } + + logger.Info("Agent identity generated and saved", "agent_id", agentID) + return id, nil } // agentIDFromPath derives an agent ID from a WASM file path. @@ -874,7 +444,6 @@ func agentIDFromPath(wasmPath string) string { ext := filepath.Ext(base) name := base[:len(base)-len(ext)] if name == "" || name == "agent" { - // Use parent directory name. name = filepath.Base(filepath.Dir(wasmPath)) } if name == "" || name == "." { diff --git a/internal/runner/research/research.go b/internal/runner/research/research.go new file mode 100644 index 0000000..3ab5499 --- /dev/null +++ b/internal/runner/research/research.go @@ -0,0 +1,189 @@ +// SPDX-License-Identifier: Apache-2.0 + +// Package research provides research-specific tick loop functions that depend +// on P2P, replay verification, and lease management. These are used by +// igord-lab but not by the product igord binary. +package research + +import ( + "context" + "crypto/ed25519" + "crypto/sha256" + "fmt" + "log/slog" + + "github.com/simonovic86/igor/internal/agent" + "github.com/simonovic86/igor/internal/config" + "github.com/simonovic86/igor/internal/p2p" + "github.com/simonovic86/igor/internal/replay" + "github.com/simonovic86/igor/internal/runner" +) + +// HandleDivergenceAction acts on the escalation policy returned by VerifyNextTick. +// migrateFn is optional — passing nil preserves the existing "fall through to pause" +// behavior for DivergenceMigrate. +// Returns true if the tick loop should exit. +func HandleDivergenceAction(ctx context.Context, instance *agent.Instance, cfg *config.Config, action runner.DivergenceAction, migrateFn runner.MigrationTrigger, logger *slog.Logger) bool { + switch action { + case runner.DivergencePause: + logger.Info("Agent paused due to replay divergence (EI-6), saving checkpoint") + trySaveCheckpoint(ctx, instance, logger) + return true + case runner.DivergenceIntensify: + logger.Info("Verification frequency intensified to every tick") + cfg.VerifyInterval = 1 + case runner.DivergenceMigrate: + if migrateFn != nil { + logger.Info("Divergence-triggered migration starting", + "agent_id", instance.AgentID, + ) + if err := migrateFn(ctx, instance.AgentID); err != nil { + logger.Error("Divergence migration failed, pausing", + "error", err, + ) + } else { + logger.Info("Divergence migration succeeded") + return true // exit tick loop — agent is on another node + } + } else { + logger.Info("Migration escalation triggered — pausing (no migration function available)") + } + trySaveCheckpoint(ctx, instance, logger) + return true + } + return false +} + +// CheckAndRenewLease validates the lease before a tick and renews if needed. +// Returns nil if ticking is allowed, or an error if the lease has expired. +// No-op if leases are disabled (instance.Lease == nil). +func CheckAndRenewLease(instance *agent.Instance, logger *slog.Logger) error { + if instance.Lease == nil { + return nil + } + if err := instance.Lease.ValidateForTick(); err != nil { + return err + } + if instance.Lease.NeedsRenewal() { + if err := instance.Lease.Renew(); err != nil { + logger.Error("Lease renewal failed", "error", err) + } else { + logger.Info("Lease renewed", + "epoch", instance.Lease.Epoch, + "expiry", instance.Lease.Expiry, + ) + } + } + return nil +} + +// HandleLeaseExpiry saves a final checkpoint and returns the lease error. +// EI-6: safety over liveness — we stop rather than tick without authority. +func HandleLeaseExpiry(ctx context.Context, instance *agent.Instance, leaseErr error, logger *slog.Logger) error { + logger.Error("Lease expired, halting agent", + "agent_id", instance.AgentID, + "error", leaseErr, + ) + trySaveCheckpoint(ctx, instance, logger) + return leaseErr +} + +// AttemptLeaseRecovery tries to recover from RECOVERY_REQUIRED state. +func AttemptLeaseRecovery(ctx context.Context, instance *agent.Instance, logger *slog.Logger) error { + if instance.Lease == nil { + return fmt.Errorf("lease recovery: no lease configured") + } + if err := instance.Lease.Recover(); err != nil { + return fmt.Errorf("lease recovery: %w", err) + } + logger.Info("Lease recovered", + "agent_id", instance.AgentID, + "epoch", instance.Lease.Epoch, + "expiry", instance.Lease.Expiry, + ) + trySaveCheckpoint(ctx, instance, logger) + return nil +} + +// VerifyNextTick replays the oldest unverified tick in the replay window. +// Returns the tick number of the verified tick (for tracking) and an escalation +// action if divergence is detected. +func VerifyNextTick( + ctx context.Context, + instance *agent.Instance, + replayEngine *replay.Engine, + lastVerified uint64, + logCost bool, + policy string, + logger *slog.Logger, +) (uint64, runner.DivergenceAction) { + for _, snap := range instance.ReplayWindow { + if snap.TickNumber <= lastVerified { + continue + } + if snap.TickLog == nil || len(snap.TickLog.Entries) == 0 { + continue + } + + result := replayEngine.ReplayTick( + ctx, + instance.WASMBytes, + instance.Manifest, + snap.PreState, + snap.TickLog, + nil, + ) + + if result.Error != nil { + logger.Error("Replay verification failed", + "tick", result.TickNumber, + "error", result.Error, + ) + return snap.TickNumber, runner.EscalationForPolicy(policy) + } + + replayedHash := sha256.Sum256(result.ReplayedState) + if replayedHash != snap.PostStateHash { + logger.Error("Replay divergence detected", + "tick", result.TickNumber, + "state_bytes", len(result.ReplayedState), + ) + return snap.TickNumber, runner.EscalationForPolicy(policy) + } + + attrs := []any{ + "tick", result.TickNumber, + "state_bytes", len(result.ReplayedState), + } + if logCost { + attrs = append(attrs, "replay_duration", result.Duration) + } + logger.Info("Replay verified", attrs...) + return snap.TickNumber, runner.DivergenceNone + } + return lastVerified, runner.DivergenceNone +} + +// ExtractSigningKey returns the Ed25519 private key and peer ID from the node, +// or nil/"" if the key is not available or not Ed25519. +func ExtractSigningKey(node *p2p.Node) (ed25519.PrivateKey, string) { + privKey := node.Host.Peerstore().PrivKey(node.Host.ID()) + if privKey == nil { + return nil, "" + } + raw, err := privKey.Raw() + if err != nil || len(raw) != ed25519.PrivateKeySize { + return nil, "" + } + return ed25519.PrivateKey(raw), node.Host.ID().String() +} + +// trySaveCheckpoint attempts to save a checkpoint, logging errors. +func trySaveCheckpoint(ctx context.Context, instance *agent.Instance, logger *slog.Logger) { + if instance == nil || instance.Storage == nil { + return + } + if err := instance.SaveCheckpointToStorage(ctx); err != nil { + logger.Error("Failed to save checkpoint", "error", err) + } +} diff --git a/internal/runner/research/research_test.go b/internal/runner/research/research_test.go new file mode 100644 index 0000000..cdcb62d --- /dev/null +++ b/internal/runner/research/research_test.go @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: Apache-2.0 + +package research + +import ( + "context" + "fmt" + "log/slog" + "os" + "testing" + + "github.com/simonovic86/igor/internal/agent" + "github.com/simonovic86/igor/internal/runner" +) + +func testLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) +} + +func TestHandleDivergenceAction_None(t *testing.T) { + ctx := context.Background() + stop := HandleDivergenceAction(ctx, nil, nil, runner.DivergenceNone, nil, testLogger()) + if stop { + t.Error("DivergenceNone should not stop the loop") + } +} + +func TestHandleDivergenceAction_Log(t *testing.T) { + ctx := context.Background() + stop := HandleDivergenceAction(ctx, nil, nil, runner.DivergenceLog, nil, testLogger()) + if stop { + t.Error("DivergenceLog should not stop the loop") + } +} + +func TestHandleDivergenceAction_MigrateWithNilFn(t *testing.T) { + ctx := context.Background() + inst := &agent.Instance{AgentID: "test-agent"} + stop := HandleDivergenceAction(ctx, inst, nil, runner.DivergenceMigrate, nil, testLogger()) + if !stop { + t.Error("DivergenceMigrate with nil migrateFn should stop the loop") + } +} + +func TestHandleDivergenceAction_MigrateWithFn_Success(t *testing.T) { + ctx := context.Background() + inst := &agent.Instance{AgentID: "test-agent"} + migrateFn := func(_ context.Context, _ string) error { return nil } + stop := HandleDivergenceAction(ctx, inst, nil, runner.DivergenceMigrate, migrateFn, testLogger()) + if !stop { + t.Error("successful migration should stop the loop") + } +} + +func TestHandleDivergenceAction_MigrateWithFn_Failure(t *testing.T) { + ctx := context.Background() + inst := &agent.Instance{AgentID: "test-agent"} + migrateFn := func(_ context.Context, _ string) error { return fmt.Errorf("no peers") } + stop := HandleDivergenceAction(ctx, inst, nil, runner.DivergenceMigrate, migrateFn, testLogger()) + if !stop { + t.Error("failed migration should still stop the loop (pause fallback)") + } +} diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 04c778e..99a636b 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -7,15 +7,10 @@ package runner import ( "context" - "crypto/ed25519" - "crypto/sha256" "fmt" "log/slog" "github.com/simonovic86/igor/internal/agent" - "github.com/simonovic86/igor/internal/config" - "github.com/simonovic86/igor/internal/p2p" - "github.com/simonovic86/igor/internal/replay" "github.com/simonovic86/igor/pkg/manifest" ) @@ -30,6 +25,11 @@ const ( DivergenceMigrate // trigger migration ) +// MigrationTrigger is called when divergence escalation requests migration. +// The implementation should attempt migration with retry and fallback. +// Returns nil if migration succeeded, error if it failed. +type MigrationTrigger func(ctx context.Context, agentID string) error + // SafeTick executes one tick with panic recovery (EI-6: Safety Over Liveness). // A WASM trap or runtime bug must not crash the node. func SafeTick(ctx context.Context, instance *agent.Instance) (hasMore bool, err error) { @@ -41,40 +41,6 @@ func SafeTick(ctx context.Context, instance *agent.Instance) (hasMore bool, err return instance.Tick(ctx) } -// CheckAndRenewLease validates the lease before a tick and renews if needed. -// Returns nil if ticking is allowed, or an error if the lease has expired. -// No-op if leases are disabled (instance.Lease == nil). -func CheckAndRenewLease(instance *agent.Instance, logger *slog.Logger) error { - if instance.Lease == nil { - return nil - } - if err := instance.Lease.ValidateForTick(); err != nil { - return err - } - if instance.Lease.NeedsRenewal() { - if err := instance.Lease.Renew(); err != nil { - logger.Error("Lease renewal failed", "error", err) - } else { - logger.Info("Lease renewed", - "epoch", instance.Lease.Epoch, - "expiry", instance.Lease.Expiry, - ) - } - } - return nil -} - -// HandleLeaseExpiry saves a final checkpoint and returns the lease error. -// EI-6: safety over liveness — we stop rather than tick without authority. -func HandleLeaseExpiry(ctx context.Context, instance *agent.Instance, leaseErr error, logger *slog.Logger) error { - logger.Error("Lease expired, halting agent", - "agent_id", instance.AgentID, - "error", leaseErr, - ) - trySaveCheckpoint(ctx, instance, logger) - return leaseErr -} - // HandleTickFailure logs the failure reason, saves a final checkpoint, and returns the error. func HandleTickFailure(ctx context.Context, instance *agent.Instance, tickErr error, logger *slog.Logger) error { if instance.Budget <= 0 { @@ -100,127 +66,6 @@ func trySaveCheckpoint(ctx context.Context, instance *agent.Instance, logger *sl } } -// MigrationTrigger is called when divergence escalation requests migration. -// The implementation should attempt migration with retry and fallback. -// Returns nil if migration succeeded, error if it failed. -type MigrationTrigger func(ctx context.Context, agentID string) error - -// HandleDivergenceAction acts on the escalation policy returned by VerifyNextTick. -// migrateFn is optional — passing nil preserves the existing "fall through to pause" -// behavior for DivergenceMigrate. -// Returns true if the tick loop should exit. -func HandleDivergenceAction(ctx context.Context, instance *agent.Instance, cfg *config.Config, action DivergenceAction, migrateFn MigrationTrigger, logger *slog.Logger) bool { - switch action { - case DivergencePause: - logger.Info("Agent paused due to replay divergence (EI-6), saving checkpoint") - trySaveCheckpoint(ctx, instance, logger) - return true - case DivergenceIntensify: - logger.Info("Verification frequency intensified to every tick") - cfg.VerifyInterval = 1 - case DivergenceMigrate: - if migrateFn != nil { - logger.Info("Divergence-triggered migration starting", - "agent_id", instance.AgentID, - ) - if err := migrateFn(ctx, instance.AgentID); err != nil { - logger.Error("Divergence migration failed, pausing", - "error", err, - ) - } else { - logger.Info("Divergence migration succeeded") - return true // exit tick loop — agent is on another node - } - } else { - logger.Info("Migration escalation triggered — pausing (no migration function available)") - } - trySaveCheckpoint(ctx, instance, logger) - return true - } - return false -} - -// AttemptLeaseRecovery tries to recover from RECOVERY_REQUIRED state. -// For v0, this is a local-only operation: the node re-grants itself a fresh -// lease with an incremented major version, ensuring stale leases are superseded. -// Returns nil on success, error if recovery is not possible. -func AttemptLeaseRecovery(ctx context.Context, instance *agent.Instance, logger *slog.Logger) error { - if instance.Lease == nil { - return fmt.Errorf("lease recovery: no lease configured") - } - if err := instance.Lease.Recover(); err != nil { - return fmt.Errorf("lease recovery: %w", err) - } - logger.Info("Lease recovered", - "agent_id", instance.AgentID, - "epoch", instance.Lease.Epoch, - "expiry", instance.Lease.Expiry, - ) - trySaveCheckpoint(ctx, instance, logger) - return nil -} - -// VerifyNextTick replays the oldest unverified tick in the replay window. -// Returns the tick number of the verified tick (for tracking) and an escalation -// action if divergence is detected. Returns DivergenceNone when verification passes. -func VerifyNextTick( - ctx context.Context, - instance *agent.Instance, - replayEngine *replay.Engine, - lastVerified uint64, - logCost bool, - policy string, - logger *slog.Logger, -) (uint64, DivergenceAction) { - for _, snap := range instance.ReplayWindow { - if snap.TickNumber <= lastVerified { - continue - } - if snap.TickLog == nil || len(snap.TickLog.Entries) == 0 { - continue - } - - // Pass nil expectedState — hash-based verification (IMPROVEMENTS #2). - result := replayEngine.ReplayTick( - ctx, - instance.WASMBytes, - instance.Manifest, - snap.PreState, - snap.TickLog, - nil, - ) - - if result.Error != nil { - logger.Error("Replay verification failed", - "tick", result.TickNumber, - "error", result.Error, - ) - return snap.TickNumber, EscalationForPolicy(policy) - } - - // Hash-based post-state comparison (IMPROVEMENTS #2). - replayedHash := sha256.Sum256(result.ReplayedState) - if replayedHash != snap.PostStateHash { - logger.Error("Replay divergence detected", - "tick", result.TickNumber, - "state_bytes", len(result.ReplayedState), - ) - return snap.TickNumber, EscalationForPolicy(policy) - } - - attrs := []any{ - "tick", result.TickNumber, - "state_bytes", len(result.ReplayedState), - } - if logCost { - attrs = append(attrs, "replay_duration", result.Duration) - } - logger.Info("Replay verified", attrs...) - return snap.TickNumber, DivergenceNone - } - return lastVerified, DivergenceNone -} - // EscalationForPolicy maps a policy string to a DivergenceAction. func EscalationForPolicy(policy string) DivergenceAction { switch policy { @@ -240,17 +85,3 @@ func EscalationForPolicy(policy string) DivergenceAction { func LoadManifestData(wasmPath, manifestPathFlag string, logger *slog.Logger) []byte { return manifest.LoadSidecarData(wasmPath, manifestPathFlag, logger) } - -// ExtractSigningKey returns the Ed25519 private key and peer ID from the node, -// or nil/"" if the key is not available or not Ed25519. -func ExtractSigningKey(node *p2p.Node) (ed25519.PrivateKey, string) { - privKey := node.Host.Peerstore().PrivKey(node.Host.ID()) - if privKey == nil { - return nil, "" - } - raw, err := privKey.Raw() - if err != nil || len(raw) != ed25519.PrivateKeySize { - return nil, "" - } - return ed25519.PrivateKey(raw), node.Host.ID().String() -} diff --git a/internal/runner/runner_test.go b/internal/runner/runner_test.go index 6e4988e..5d8fef7 100644 --- a/internal/runner/runner_test.go +++ b/internal/runner/runner_test.go @@ -3,14 +3,10 @@ package runner import ( - "context" - "fmt" "log/slog" "os" "path/filepath" "testing" - - "github.com/simonovic86/igor/internal/agent" ) func testLogger() *slog.Logger { @@ -77,50 +73,3 @@ func TestLoadManifestData_NonWASMPath(t *testing.T) { t.Fatalf("expected empty JSON for non-.wasm path, got: %s", data) } } - -func TestHandleDivergenceAction_None(t *testing.T) { - ctx := context.Background() - stop := HandleDivergenceAction(ctx, nil, nil, DivergenceNone, nil, testLogger()) - if stop { - t.Error("DivergenceNone should not stop the loop") - } -} - -func TestHandleDivergenceAction_Log(t *testing.T) { - ctx := context.Background() - stop := HandleDivergenceAction(ctx, nil, nil, DivergenceLog, nil, testLogger()) - if stop { - t.Error("DivergenceLog should not stop the loop") - } -} - -func TestHandleDivergenceAction_MigrateWithNilFn(t *testing.T) { - ctx := context.Background() - // With nil migrateFn, DivergenceMigrate should still stop the loop (pause fallback). - // We use a minimal instance to avoid nil dereference in SaveCheckpointToStorage. - inst := &agent.Instance{AgentID: "test-agent"} - stop := HandleDivergenceAction(ctx, inst, nil, DivergenceMigrate, nil, testLogger()) - if !stop { - t.Error("DivergenceMigrate with nil migrateFn should stop the loop") - } -} - -func TestHandleDivergenceAction_MigrateWithFn_Success(t *testing.T) { - ctx := context.Background() - inst := &agent.Instance{AgentID: "test-agent"} - migrateFn := func(_ context.Context, _ string) error { return nil } - stop := HandleDivergenceAction(ctx, inst, nil, DivergenceMigrate, migrateFn, testLogger()) - if !stop { - t.Error("successful migration should stop the loop") - } -} - -func TestHandleDivergenceAction_MigrateWithFn_Failure(t *testing.T) { - ctx := context.Background() - inst := &agent.Instance{AgentID: "test-agent"} - migrateFn := func(_ context.Context, _ string) error { return fmt.Errorf("no peers") } - stop := HandleDivergenceAction(ctx, inst, nil, DivergenceMigrate, migrateFn, testLogger()) - if !stop { - t.Error("failed migration should still stop the loop (pause fallback)") - } -}