From e5e3801d5fae1f36f466e2b0b68a47c06b2cf0af Mon Sep 17 00:00:00 2001 From: David Mora Date: Sun, 7 Jun 2026 12:44:50 -0600 Subject: [PATCH] feat: add agy backend, RunFirstTurn, ContentBlock/BlockSender, engine fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New Antigravity CLI (agy) backend - engine/cli/agy: Spawner + Parser + Resumer (spawn-per-turn). Wraps agy in an injection-safe `sh -c '"$0" "$@" ...'` shell that emits a JSON result sentinel on clean exit; captures the conversation ID from agy's --log-file and resumes via --conversation. Maps Model/AddDirs/HITL/sandbox/skip-permissions options. - cmd/agentrun-mcp: register agy in newCLIBackend, cliBackendNames, validBackends. RunFirstTurn helper (fixes spawn-per-turn first-turn double-prompt) - Root RunFirstTurn drains the Start-initiated turn for SequentialSender (spawn-per-turn) backends and sends for streaming/ACP backends, so callers no longer branch on backend type. Previously Start(prompt)+RunTurn(prompt) ran a redundant turn-2 resume before the session ID was captured, breaking codex, opencode, and agy under run_turn / session_start / probe_backend. - Use it in cmd/agentrun-mcp doRunTurn/doSessionStart/doProbe and examples/interactive. CLI engine turn-boundary race fix - Close output/done channels under p.mu (finishLocked/finalizeTurn) and have replaceSubprocess re-check done under the same lock, falling back to fresh channels (spawnCleanResume) — and returning ErrTerminated rather than silently resuming a turn that finalized with an error. Fixes a Send-right-after-result race that dropped the resumed turn. ContentBlock / BlockSender (multi-modal input) - Root BlockSender/ContentBlock + RunTurnBlocks; cli.BlockFormatter with Claude implementation; ACP SendBlocks; text-degradation fallback. Tests: deterministic regression tests for the race fix and RunFirstTurn (engine/cli/race_test.go, firstturn_test.go, runfirstturn_test.go). make qa and make mcp-qa green. Co-Authored-By: Claude Opus 4.8 (1M context) --- block_sender.go | 155 ++++++ block_sender_test.go | 347 ++++++++++++ cmd/agentrun-mcp/engine.go | 8 +- cmd/agentrun-mcp/tools.go | 28 +- ...unify-context-fill-and-turn-abstraction.md | 521 ++++++++++++++++++ .../2026-03-12-issue-40-remaining-gaps.md | 462 ++++++++++++++++ engine/acp/engine_test.go | 31 ++ engine/acp/process.go | 39 +- engine/acp/protocol.go | 7 +- engine/cli/agy/agy.go | 180 ++++++ engine/cli/agy/agy_test.go | 123 +++++ engine/cli/agy/compliance_test.go | 29 + engine/cli/agy/doc.go | 2 + engine/cli/agy/parse.go | 34 ++ engine/cli/agy/parse_test.go | 87 +++ engine/cli/claude/args_test.go | 88 ++- engine/cli/claude/claude.go | 26 + engine/cli/engine_test.go | 106 ++++ engine/cli/firstturn_test.go | 112 ++++ engine/cli/interfaces.go | 14 + engine/cli/process.go | 103 +++- engine/cli/race_test.go | 131 +++++ examples/interactive/main.go | 24 +- runfirstturn_test.go | 89 +++ runturn.go | 26 + 25 files changed, 2725 insertions(+), 47 deletions(-) create mode 100644 block_sender.go create mode 100644 block_sender_test.go create mode 100644 docs/designs/2026-03-09-unify-context-fill-and-turn-abstraction.md create mode 100644 docs/designs/2026-03-12-issue-40-remaining-gaps.md create mode 100644 engine/cli/agy/agy.go create mode 100644 engine/cli/agy/agy_test.go create mode 100644 engine/cli/agy/compliance_test.go create mode 100644 engine/cli/agy/doc.go create mode 100644 engine/cli/agy/parse.go create mode 100644 engine/cli/agy/parse_test.go create mode 100644 engine/cli/firstturn_test.go create mode 100644 engine/cli/race_test.go create mode 100644 runfirstturn_test.go diff --git a/block_sender.go b/block_sender.go new file mode 100644 index 0000000..b99d3f2 --- /dev/null +++ b/block_sender.go @@ -0,0 +1,155 @@ +package agentrun + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" +) + +// ContentBlock is a single content element in a prompt. +// +// Modern LLMs support multi-modal inputs. Backends that support multi-modal +// input accept ContentBlock arrays. +type ContentBlock struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + Source json.RawMessage `json:"source,omitempty"` // image source (base64/URL) — format follows Anthropic API + MimeType string `json:"mime_type,omitempty"` +} + +// TextBlock creates a text-only ContentBlock. +func TextBlock(s string) ContentBlock { + return ContentBlock{ + Type: "text", + Text: s, + } +} + +// ImageBase64Block creates an image ContentBlock from base64 data. +// mediaType should be one of "image/jpeg", "image/png", "image/gif", "image/webp". +// data is the raw base64 data string. +func ImageBase64Block(mediaType, data string) ContentBlock { + src := map[string]string{ + "type": "base64", + "media_type": mediaType, + "data": data, + } + srcJSON, _ := json.Marshal(src) // marshalling simple map cannot fail + return ContentBlock{ + Type: "image", + Source: json.RawMessage(srcJSON), + } +} + +// TextFromBlocks extracts concatenated text from blocks. +// Backends that only support text use this for graceful degradation. +func TextFromBlocks(blocks []ContentBlock) string { + var sb strings.Builder + first := true + for _, b := range blocks { + if b.Type == "text" && b.Text != "" { + if !first { + sb.WriteByte('\n') + } + sb.WriteString(b.Text) + first = false + } + } + return sb.String() +} + +// MaxBase64Size is the maximum allowed size for base64 image payload (15 MiB). +const MaxBase64Size = 15 * 1024 * 1024 + +// ValidateBlocks checks the validity of the content blocks. +// Rejects blocks with empty Type and enforces a 15 MiB limit on base64 data. +func ValidateBlocks(blocks []ContentBlock) error { + if len(blocks) == 0 { + return errors.New("no content blocks provided") + } + for i, b := range blocks { + if strings.TrimSpace(b.Type) == "" { + return fmt.Errorf("block %d: empty type", i) + } + if b.Type == "image" { + if err := validateImageBlock(b, i); err != nil { + return err + } + } + } + return nil +} + +func validateImageBlock(b ContentBlock, index int) error { + if len(b.Source) == 0 { + return fmt.Errorf("block %d: image block missing source", index) + } + var src struct { + Type string `json:"type"` + MediaType string `json:"media_type"` + Data string `json:"data"` + } + if err := json.Unmarshal(b.Source, &src); err != nil { + return fmt.Errorf("block %d: invalid image source JSON: %w", index, err) + } + if src.Type != "base64" { + return fmt.Errorf("block %d: unsupported image source type %q", index, src.Type) + } + switch src.MediaType { + case "image/jpeg", "image/png", "image/gif", "image/webp": + // valid media type + default: + return fmt.Errorf("block %d: unsupported media type %q", index, src.MediaType) + } + if len(src.Data) > MaxBase64Size { + return fmt.Errorf("block %d: base64 image data exceeds 15 MiB limit", index) + } + return nil +} + +// BlockSender is an optional interface for processes that support structured content. +// Discovered via type assertion on Process: +// +// if bs, ok := proc.(BlockSender); ok { +// bs.SendBlocks(ctx, TextBlock("describe this"), ImageBase64Block("image/png", data)) +// } +type BlockSender interface { + Process + SendBlocks(ctx context.Context, blocks ...ContentBlock) error +} + +// RunTurnBlocks sends structured content blocks and drains Output() until MessageResult +// or channel close. handler is called for each message (including MessageResult). +// Safe for all engine types. +// +// If the process satisfies [BlockSender], it uses SendBlocks to transmit the blocks. +// If the process does not satisfy [BlockSender], it degrades gracefully by converting +// the blocks to text using [TextFromBlocks] and calling the process's Send method. +// +// Like [RunTurn], it respects [SequentialSender] to select the send/drain strategy. +func RunTurnBlocks(ctx context.Context, proc Process, blocks []ContentBlock, handler func(Message) error) error { + if _, ok := proc.(SequentialSender); ok { + // Send must complete before draining Output (spawn-per-turn backends). + if err := sendBlocksOrDegrade(ctx, proc, blocks); err != nil { + return err + } + return drainOutput(ctx, proc, nil, handler) + } + // Default: concurrent Send + drain (ACP, streaming CLI). + sendCh := make(chan error, 1) + go func() { + sendCh <- sendBlocksOrDegrade(ctx, proc, blocks) + }() + + return drainOutput(ctx, proc, sendCh, handler) +} + +func sendBlocksOrDegrade(ctx context.Context, proc Process, blocks []ContentBlock) error { + if bs, ok := proc.(BlockSender); ok { + return bs.SendBlocks(ctx, blocks...) + } + // Fallback to text Send + return proc.Send(ctx, TextFromBlocks(blocks)) +} diff --git a/block_sender_test.go b/block_sender_test.go new file mode 100644 index 0000000..b351dcf --- /dev/null +++ b/block_sender_test.go @@ -0,0 +1,347 @@ +package agentrun + +import ( + "context" + "encoding/json" + "strings" + "testing" + "time" +) + +func TestTextBlock(t *testing.T) { + b := TextBlock("hello world") + if b.Type != "text" { + t.Errorf("expected type 'text', got %q", b.Type) + } + if b.Text != "hello world" { + t.Errorf("expected text 'hello world', got %q", b.Text) + } + if b.Source != nil { + t.Errorf("expected nil Source, got %s", b.Source) + } +} + +func TestImageBase64Block(t *testing.T) { + b := ImageBase64Block("image/png", "SGVsbG8=") + if b.Type != "image" { + t.Errorf("expected type 'image', got %q", b.Type) + } + if b.Text != "" { + t.Errorf("expected empty text, got %q", b.Text) + } + if b.Source == nil { + t.Fatal("expected non-nil Source") + } + + var src struct { + Type string `json:"type"` + MediaType string `json:"media_type"` + Data string `json:"data"` + } + if err := json.Unmarshal(b.Source, &src); err != nil { + t.Fatalf("failed to unmarshal source: %v", err) + } + if src.Type != "base64" { + t.Errorf("expected source type 'base64', got %q", src.Type) + } + if src.MediaType != "image/png" { + t.Errorf("expected source media_type 'image/png', got %q", src.MediaType) + } + if src.Data != "SGVsbG8=" { + t.Errorf("expected source data 'SGVsbG8=', got %q", src.Data) + } +} + +func TestTextFromBlocks(t *testing.T) { + tests := []struct { + name string + blocks []ContentBlock + want string + }{ + { + name: "empty", + blocks: nil, + want: "", + }, + { + name: "text only", + blocks: []ContentBlock{ + TextBlock("hello"), + TextBlock("world"), + }, + want: "hello\nworld", + }, + { + name: "mixed with image", + blocks: []ContentBlock{ + TextBlock("hello"), + ImageBase64Block("image/png", "data"), + TextBlock("world"), + }, + want: "hello\nworld", + }, + { + name: "empty blocks ignored", + blocks: []ContentBlock{ + TextBlock(""), + TextBlock("hello"), + }, + want: "hello", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := TextFromBlocks(tt.blocks) + if got != tt.want { + t.Errorf("got %q, want %q", got, tt.want) + } + }) + } +} + +func TestValidateBlocks(t *testing.T) { + longData := strings.Repeat("a", MaxBase64Size+1) + + tests := []struct { + name string + blocks []ContentBlock + wantErr bool + errMsg string + }{ + { + name: "nil blocks", + blocks: nil, + wantErr: true, + errMsg: "no content blocks", + }, + { + name: "empty blocks", + blocks: []ContentBlock{}, + wantErr: true, + errMsg: "no content blocks", + }, + { + name: "valid text", + blocks: []ContentBlock{ + TextBlock("hello"), + }, + wantErr: false, + }, + { + name: "empty type", + blocks: []ContentBlock{ + {Type: ""}, + }, + wantErr: true, + errMsg: "empty type", + }, + { + name: "valid image", + blocks: []ContentBlock{ + ImageBase64Block("image/png", "SGVsbG8="), + }, + wantErr: false, + }, + { + name: "image missing source", + blocks: []ContentBlock{ + {Type: "image"}, + }, + wantErr: true, + errMsg: "missing source", + }, + { + name: "image invalid source JSON", + blocks: []ContentBlock{ + {Type: "image", Source: []byte("{invalid JSON")}, + }, + wantErr: true, + errMsg: "invalid image source JSON", + }, + { + name: "image unsupported source type", + blocks: []ContentBlock{ + {Type: "image", Source: []byte(`{"type": "url", "url": "http://example.com"}`)}, + }, + wantErr: true, + errMsg: "unsupported image source type", + }, + { + name: "image unsupported media type", + blocks: []ContentBlock{ + ImageBase64Block("image/tiff", "data"), + }, + wantErr: true, + errMsg: "unsupported media type", + }, + { + name: "image exceeds size limit", + blocks: []ContentBlock{ + ImageBase64Block("image/png", longData), + }, + wantErr: true, + errMsg: "exceeds 15 MiB limit", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := ValidateBlocks(tt.blocks) + if (err != nil) != tt.wantErr { + t.Errorf("ValidateBlocks() error = %v, wantErr %v", err, tt.wantErr) + return + } + if err != nil && tt.errMsg != "" && !strings.Contains(err.Error(), tt.errMsg) { + t.Errorf("expected error containing %q, got %q", tt.errMsg, err.Error()) + } + }) + } +} + +type mockBlockSender struct { + *mockProcess + sendBlocksFn func(ctx context.Context, blocks ...ContentBlock) error +} + +func (m *mockBlockSender) SendBlocks(ctx context.Context, blocks ...ContentBlock) error { + if m.sendBlocksFn != nil { + return m.sendBlocksFn(ctx, blocks...) + } + return nil +} + +var _ BlockSender = (*mockBlockSender)(nil) + +type sequentialMockBlockSender struct { + *mockBlockSender +} + +func (s *sequentialMockBlockSender) SequentialSend() {} + +var _ SequentialSender = (*sequentialMockBlockSender)(nil) +var _ BlockSender = (*sequentialMockBlockSender)(nil) + +func TestRunTurnBlocks_Concurrent_BlockSender(t *testing.T) { + mp := &mockBlockSender{ + mockProcess: newMockProcess(), + } + blocks := []ContentBlock{TextBlock("hello"), TextBlock("world")} + gotBlocks := make(chan []ContentBlock, 1) + mp.sendBlocksFn = func(_ context.Context, blks ...ContentBlock) error { + gotBlocks <- blks + return nil + } + + mp.output <- Message{Type: MessageText, Content: "response"} + mp.output <- Message{Type: MessageResult, Content: "done"} + + var msgs []Message + err := RunTurnBlocks(context.Background(), mp, blocks, func(msg Message) error { + msgs = append(msgs, msg) + return nil + }) + if err != nil { + t.Fatalf("RunTurnBlocks error: %v", err) + } + + if len(msgs) != 2 { + t.Errorf("got %d messages, want 2", len(msgs)) + } + + select { + case blks := <-gotBlocks: + if len(blks) != 2 || blks[0].Text != "hello" || blks[1].Text != "world" { + t.Errorf("SendBlocks received unexpected blocks: %v", blks) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for SendBlocks to be called") + } +} + +func TestRunTurnBlocks_Concurrent_Fallback(t *testing.T) { + mp := newMockProcess() + blocks := []ContentBlock{TextBlock("hello"), TextBlock("world")} + gotText := make(chan string, 1) + mp.sendFn = func(_ context.Context, text string) error { + gotText <- text + return nil + } + + mp.output <- Message{Type: MessageText, Content: "response"} + mp.output <- Message{Type: MessageResult, Content: "done"} + + var msgs []Message + err := RunTurnBlocks(context.Background(), mp, blocks, func(msg Message) error { + msgs = append(msgs, msg) + return nil + }) + if err != nil { + t.Fatalf("RunTurnBlocks error: %v", err) + } + + if len(msgs) != 2 { + t.Errorf("got %d messages, want 2", len(msgs)) + } + + select { + case txt := <-gotText: + if txt != "hello\nworld" { + t.Errorf("Send received unexpected text: %q, want 'hello\\nworld'", txt) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for Send to be called") + } +} + +func TestRunTurnBlocks_Sequential_BlockSender(t *testing.T) { + smp := &sequentialMockBlockSender{ + mockBlockSender: &mockBlockSender{ + mockProcess: newMockProcess(), + }, + } + blocks := []ContentBlock{TextBlock("hello")} + smp.sendBlocksFn = func(_ context.Context, _ ...ContentBlock) error { + smp.output = make(chan Message, 16) + smp.output <- Message{Type: MessageText, Content: "sequential block response"} + smp.output <- Message{Type: MessageResult, Content: "done"} + return nil + } + + var msgs []Message + err := RunTurnBlocks(context.Background(), smp, blocks, func(msg Message) error { + msgs = append(msgs, msg) + return nil + }) + if err != nil { + t.Fatalf("RunTurnBlocks error: %v", err) + } + + if len(msgs) != 2 || msgs[0].Content != "sequential block response" { + t.Errorf("msgs = %v, want 2 messages", msgs) + } +} + +func TestRunTurnBlocks_Sequential_Fallback(t *testing.T) { + smp := newSequentialMockProcess() + blocks := []ContentBlock{TextBlock("hello")} + smp.sendFn = func(_ context.Context, _ string) error { + smp.output = make(chan Message, 16) + smp.output <- Message{Type: MessageText, Content: "sequential text response"} + smp.output <- Message{Type: MessageResult, Content: "done"} + return nil + } + + var msgs []Message + err := RunTurnBlocks(context.Background(), smp, blocks, func(msg Message) error { + msgs = append(msgs, msg) + return nil + }) + if err != nil { + t.Fatalf("RunTurnBlocks error: %v", err) + } + + if len(msgs) != 2 || msgs[0].Content != "sequential text response" { + t.Errorf("msgs = %v, want 2 messages", msgs) + } +} diff --git a/cmd/agentrun-mcp/engine.go b/cmd/agentrun-mcp/engine.go index 2adc211..f583ccc 100644 --- a/cmd/agentrun-mcp/engine.go +++ b/cmd/agentrun-mcp/engine.go @@ -12,10 +12,10 @@ import ( ) // cliBackendNames lists CLI backend names (excludes ACP). -var cliBackendNames = []string{backendClaude, backendCodex, backendOpenCode} +var cliBackendNames = []string{backendClaude, backendCodex, backendOpenCode, backendAgy} // validBackends lists all known backend names for validation and enumeration. -var validBackends = []string{backendClaude, backendCodex, backendOpenCode, backendACP} +var validBackends = []string{backendClaude, backendCodex, backendOpenCode, backendAgy, backendACP} // makeEngine creates an engine for the given backend name. // CLI backends are constructed via newCLIBackend (single source of truth). @@ -35,6 +35,6 @@ func makeEngine(backend string, stderrW io.Writer, acpBin string, acpExtraArgs [ } return acp.NewEngine(opts...), nil } - return nil, fmt.Errorf("unknown backend %q (valid: %s, %s, %s, %s)", - backend, backendClaude, backendCodex, backendOpenCode, backendACP) + return nil, fmt.Errorf("unknown backend %q (valid: %s, %s, %s, %s, %s)", + backend, backendClaude, backendCodex, backendOpenCode, backendAgy, backendACP) } diff --git a/cmd/agentrun-mcp/tools.go b/cmd/agentrun-mcp/tools.go index aac8fef..f0684c8 100644 --- a/cmd/agentrun-mcp/tools.go +++ b/cmd/agentrun-mcp/tools.go @@ -15,6 +15,7 @@ import ( "github.com/dmora/agentrun" "github.com/dmora/agentrun/engine/cli" + "github.com/dmora/agentrun/engine/cli/agy" "github.com/dmora/agentrun/engine/cli/claude" "github.com/dmora/agentrun/engine/cli/codex" "github.com/dmora/agentrun/engine/cli/opencode" @@ -29,6 +30,7 @@ const ( backendClaude = "claude" backendCodex = "codex" backendOpenCode = "opencode" + backendAgy = "agy" backendACP = "acp" ) @@ -64,8 +66,10 @@ func newCLIBackend(name string) (cli.Backend, error) { return codex.New(), nil case backendOpenCode: return opencode.New(), nil + case backendAgy: + return agy.New(), nil default: - return nil, fmt.Errorf("unknown CLI backend %q (valid: %s, %s, %s)", name, backendClaude, backendCodex, backendOpenCode) + return nil, fmt.Errorf("unknown CLI backend %q (valid: %s, %s, %s, %s)", name, backendClaude, backendCodex, backendOpenCode, backendAgy) } } @@ -163,7 +167,7 @@ func detectCapabilities(name string) []string { // --- Tool 3: parse_line --- type parseLineInput struct { - Backend string `json:"backend" jsonschema:"Backend name (claude or codex or opencode)"` + Backend string `json:"backend" jsonschema:"Backend name (claude or codex or opencode or agy)"` Line string `json:"line" jsonschema:"Raw JSONL line to parse"` } @@ -291,7 +295,7 @@ func toSummaryJSON(s *agentrun.TurnSummary) *turnSummaryJSON { // --- Tool 5: run_turn --- type runTurnInput struct { - Backend string `json:"backend" jsonschema:"Backend name (claude or codex or opencode or acp)"` + Backend string `json:"backend" jsonschema:"Backend name (claude or codex or opencode or agy or acp)"` Prompt string `json:"prompt" jsonschema:"User prompt"` Model string `json:"model,omitempty" jsonschema:"Model override"` CWD string `json:"cwd,omitempty" jsonschema:"Working directory (must be under workspace)"` @@ -403,9 +407,13 @@ func doRunTurn(ctx context.Context, input runTurnInput) (*runTurnOutput, error) return nil } - turnErr := agentrun.RunTurn(ctx, proc, input.Prompt, handler) + // RunFirstTurn handles both execution models: spawn-per-turn backends + // already ran turn 1 at Start (drain only); streaming/ACP backends need the + // prompt sent. Using RunTurn here would spawn a redundant second turn (and + // fail outright on resume-only backends whose session ID isn't captured yet). + turnErr := agentrun.RunFirstTurn(ctx, proc, input.Prompt, handler) if turnErr != nil { - log.Warn("RunTurn error", "error", turnErr) + log.Warn("RunFirstTurn error", "error", turnErr) } out := collectTerminalState(proc, &summary, start, stderrW) @@ -422,7 +430,7 @@ func doRunTurn(ctx context.Context, input runTurnInput) (*runTurnOutput, error) // --- Tool 6: session_start --- type sessionStartInput struct { - Backend string `json:"backend" jsonschema:"Backend name (claude or codex or opencode or acp)"` + Backend string `json:"backend" jsonschema:"Backend name (claude or codex or opencode or agy or acp)"` Prompt string `json:"prompt" jsonschema:"Initial prompt"` Model string `json:"model,omitempty" jsonschema:"Model override"` CWD string `json:"cwd,omitempty" jsonschema:"Working directory (must be under workspace)"` @@ -489,7 +497,8 @@ func doSessionStart(ctx context.Context, input sessionStartInput) (*sessionStart summary.Add(msg) return nil } - turnErr := agentrun.RunTurn(ctx, proc, input.Prompt, handler) + // RunFirstTurn handles both execution models (see doRunTurn). + turnErr := agentrun.RunFirstTurn(ctx, proc, input.Prompt, handler) if turnErr != nil { log.Warn("first turn error", "error", turnErr) } @@ -726,7 +735,10 @@ func doProbe(ctx context.Context, backend string) (*probeOutput, error) { defer stopProcess(proc) out := &probeOutput{SpawnsOK: true} - turnErr := agentrun.RunTurn(ctx, proc, "Say 'hello'.", func(msg agentrun.Message) error { + // RunFirstTurn (not RunTurn): for spawn-per-turn backends Start already ran + // turn 1, so sending here would trigger a premature resume (no session ID + // captured yet) — the same double-prompt bug fixed in doRunTurn. + turnErr := agentrun.RunFirstTurn(ctx, proc, "Say 'hello'.", func(msg agentrun.Message) error { applyProbeInit(msg, out) return nil }) diff --git a/docs/designs/2026-03-09-unify-context-fill-and-turn-abstraction.md b/docs/designs/2026-03-09-unify-context-fill-and-turn-abstraction.md new file mode 100644 index 0000000..07cc86f --- /dev/null +++ b/docs/designs/2026-03-09-unify-context-fill-and-turn-abstraction.md @@ -0,0 +1,521 @@ +# Design: Unify Real-Time Context Fill Signal and Eliminate Turn Abstraction Leaks + +## 4.1 Context + +### Problem Statement + +Callers of agentrun building real-time context fill indicators (fuel gauges) must handle backend-specific differences that the library should abstract. A review of the MCP diagnostic server — the first non-trivial consumer — exposed four abstraction leaks that force every consumer to manage engine-level complexity: + +1. **`spawnPerTurn` branching**: duplicated in every consumer (MCP server, interactive example, Foundry) +2. **`makeEngine` returns a boolean**: consumers must track `spawnPerTurn` externally +3. **No unified turn summary**: consumers reimplement message-iteration logic +4. **Context fill requires filtering on 3+ message types**: CLI piggybacks on `MessageText`/`MessageThinking`; ACP emits dedicated `MessageContextWindow` + +### Current Architecture State + +**Known** (from codebase): + +- `Process` interface (`process.go:18-51`) defines `Output()`, `Send()`, `Stop()`, `Wait()`, `Err()` — no metadata about send semantics. +- `RunTurn` (`runturn.go:21-28`) always calls `proc.Send(ctx, message)` in a goroutine and drains `Output()` concurrently. Works for ACP (blocking Send + concurrent Output) and CLI streaming (stdin pipe), but for spawn-per-turn backends, `Send()` spawns a new subprocess and `Output()` must be called fresh after Send. +- CLI engine's `process.Send()` (`engine/cli/process.go:122-159`) has three internal paths: stdin pipe (Streamer), `replaceSubprocess` (Resumer while running), and `resumeAfterCleanExit` (Resumer after clean exit). All paths work, but the Output channel is replaced on resume (`process.go:618-620`), meaning `RunTurn`'s concurrent drain starts on the wrong channel. +- MCP server's `makeEngine()` (`cmd/agentrun-mcp/engine.go:25-42`) returns `(Engine, bool, error)` where the bool is `spawnPerTurn`, derived from `cli.Streamer` type assertion. +- `spawnPerTurn` branching appears in **5 consumer call sites** across MCP server and interactive example (`cmd/agentrun-mcp/tools.go:371,479,605`; `examples/interactive/main.go:110,115`). +- `sessionEntry` struct (`cmd/agentrun-mcp/session.go:31`) stores `spawnPerTurn bool` for later dispatch. +- `drainSpawnPerTurn()` (`cmd/agentrun-mcp/tools.go:393-407`) is a simplified drain loop without Send — functionally equivalent to `RunTurn` if `RunTurn` could handle Send-then-drain ordering. +- CLI engine populates `ContextUsedTokens` via `applyContextFill()` (`engine/cli/process.go:468-489`) on mid-turn messages (per-call fill) and `MessageResult` (peak fill). No `ContextSizeTokens` — Claude CLI wire format lacks `contextWindow` in mid-turn events. +- ACP engine emits `MessageContextWindow` via `parseUsageUpdate()` (`engine/acp/update.go:284-328`) with both `ContextSizeTokens` and `ContextUsedTokens`. +- Claude CLI parser `extractTokenUsage()` (`engine/cli/claude/parse.go:299-331`) does NOT capture any `contextWindow` field. No evidence of `contextWindow` in Claude CLI's mid-turn wire format (grep found zero matches across the claude backend). +- `filter` package (`filter/filter.go`) provides `Filter`, `Completed`, `ResultOnly`, `IsDelta` — composable channel middleware. No context-fill-specific filter exists. + +**Assumed:** + +- Claude CLI's result event may include `modelUsage..contextWindow` — referenced in the issue but not confirmed in the wire format captured by the parser. **Assumption is that this field exists on result events but not mid-turn events.** +- Foundry (dors-orchestrator) will have the same `spawnPerTurn` branching pattern once integrated. Currently has ~10 lines for `"opencode-acp"` backend in `internal/agent/agentrun.go:buildEngine`. +- Future backends (API-based like ADK) will have persistent-session semantics similar to ACP (Send blocks, Output streams concurrently). + +**Missing:** + +- Confirmation of Claude CLI's exact `contextWindow` wire field location and availability per event type. +- Whether other CLI backends (Codex, OpenCode) will ever emit per-call usage mid-turn (currently they don't — usage only on result). +- The exact number of Foundry call sites that branch on `spawnPerTurn`. + +### Context Manifest + +Files examined: +- `process.go` (lines 1-51) — Process interface +- `message.go` (lines 1-338) — Message, Usage, MessageType constants +- `runturn.go` (lines 1-76) — RunTurn, drainOutput implementation +- `engine/cli/engine.go` (lines 52-124) — CLI Engine.Start, capability resolution +- `engine/cli/process.go` (lines 100-519) — CLI process.Send, applyContextFill, scanLines +- `engine/cli/interfaces.go` (lines 1-90) — Spawner, Parser, Resumer, Streamer, InputFormatter, Backend +- `engine/acp/update.go` (lines 270-336) — parseUsageUpdate, MessageContextWindow production +- `engine/acp/process.go` (lines 90-188) — ACP process.Send blocking semantics +- `engine/cli/claude/parse.go` (lines 280-348) — extractTokenUsage, no contextWindow capture +- `cmd/agentrun-mcp/engine.go` (lines 1-43) — makeEngine, spawnPerTurn derivation +- `cmd/agentrun-mcp/tools.go` (lines 330-630) — doRunTurn, doSessionStart, doSessionSend branching +- `cmd/agentrun-mcp/session.go` (line 31) — sessionEntry.spawnPerTurn field +- `examples/interactive/main.go` (lines 50-126) — consumer spawnPerTurn branching +- `filter/filter.go` (lines 1-88) — composable channel middleware + +--- + +## 4.2 Solution Pool + +The issue identifies four abstraction leaks. They decompose into two independent design axes: + +- **Axis A**: Turn execution abstraction (`spawnPerTurn` branching, `makeEngine` boolean) +- **Axis B**: Context fill unification (multiple message types, missing `ContextSizeTokens`) + +A third concern — TurnSummary — is orthogonal and can be addressed independently. + +### Axis A: Turn Execution + +#### Candidate A1: Make `RunTurn` Handle All Engine Types Internally + +`RunTurn` currently calls `Send()` in a goroutine and drains `Output()` concurrently. For spawn-per-turn backends, this fails because `Send()` replaces the subprocess and `Output()` channel. The fix: make `RunTurn` aware of send semantics. + +**Approach:** Add an optional interface (e.g., `SendSemantics` or `TurnRunner`) to `Process` that `RunTurn` type-asserts on. When the process signals "send-then-drain" semantics, `RunTurn` calls `Send()` first, then drains `Output()` sequentially. When absent (or "concurrent" semantics), current behavior is preserved. + +**Strengths:** +- Consumers never see `spawnPerTurn` — `RunTurn` is the only call site for all backends. +- No boolean return from engine construction. `makeEngine` returns `(Engine, error)`. +- The semantic knowledge stays inside the process, where it belongs. +- Backward-compatible: existing `RunTurn` callers keep working. + +**Weaknesses:** +- Adds a new interface to the root package (public API surface cost). +- `RunTurn` becomes more complex internally — two code paths in a critical function. +- Process implementations must opt into the interface correctly; a wrong implementation silently breaks. + +**Fit:** +- Matches agentrun's "capabilities via type assertion" pattern (Resumer, Streamer, InputFormatter). +- Root package already has `RunTurn`; enhancing it is natural. + +#### Candidate A2: `RunTurn` Calls `Send` Then Drains for All Engine Types + +Instead of detecting semantics, always call `Send()` first (blocking), then drain `Output()`. For ACP, `Send()` blocks while streaming updates to `Output()` — deadlock if not drained concurrently. + +**Approach:** Not viable. ACP's `Send()` writes to a channel that must be drained concurrently — sequential Send-then-drain deadlocks. + +**Eliminated immediately.** ACP's blocking Send + concurrent Output is a hard constraint. + +#### Candidate A3: Engine Returns a `TurnRunner` Instead of `Process` + +Replace the `Process` interface with a `TurnRunner` that has a single method: `RunTurn(ctx, message, handler) error`. The engine itself encapsulates send semantics. + +**Approach:** Each engine returns a `TurnRunner` wrapping its process. The runner knows whether to call Send+drain concurrently (ACP, streaming CLI) or Send-then-drain sequentially (spawn-per-turn). + +**Strengths:** +- Eliminates the leak entirely — consumers never learn about send semantics. +- `RunTurn` becomes a method, not a free function — cleaner API. +- No need for capability detection; the runner is fully encapsulated. + +**Weaknesses:** +- Fundamentally changes the core `Engine`/`Process` contract — large blast radius. +- Consumers lose direct access to `Output()`, `Stop()`, `Wait()`, `Err()` for advanced use cases (monitoring, custom drain loops, process management). +- Would need to re-expose `Process` methods on `TurnRunner` or wrap them, duplicating the interface. +- Not backward-compatible — breaks every existing consumer. + +**Fit:** +- Too high a cost for a greenfield-but-already-shipping library with existing consumers (MCP server, Foundry, examples). The library-first mindset means API stability matters even without a v1 tag. + +#### Candidate A4: Process Exposes a Method That Reports Send Semantics + +Add a method to `Process` (e.g., `SendMode() SendMode`) that returns whether Send requires sequential or concurrent drain. `RunTurn` inspects this and branches internally. + +**Approach:** Concrete method on Process interface rather than optional interface via type assertion. + +**Strengths:** +- Explicit, always available — no type assertion needed. +- `RunTurn` can branch cleanly. + +**Weaknesses:** +- Adds a required method to `Process` — every implementation must implement it, including mocks and wrappers. +- Breaks the "capabilities via type assertion" pattern used everywhere else. +- API engines (ADK) must return a meaningful value even though they may not have Send semantics. +- A method that returns a constant is a code smell — it's really a type tag, not behavior. + +**Fit:** +- Poor fit. Violates the established pattern. Adds ceremony to every Process implementation. + +### Self-Critique: Axis A + +| Candidate | Strongest counter-argument | Worst case | Hidden cost | +|-----------|---------------------------|------------|-------------| +| **A1 (optional interface)** | Adds a public interface for an implementation detail that consumers shouldn't need to know about. | A third send-mode appears (neither sequential nor concurrent) and the boolean abstraction breaks. | Tests must cover both RunTurn paths; subtle bugs possible if a process incorrectly identifies its semantics. | +| **A3 (TurnRunner)** | Destroys the composable Process model — consumers wrapping Process for logging/metrics lose their integration point. | Every existing consumer breaks; migration cost outweighs benefit. | Dual interface problem — must re-expose Process control methods, or consumers can't manage lifecycle. | +| **A4 (SendMode method)** | Forces every Process mock and wrapper to implement a method that returns a constant — pure ceremony. | Future engine types with dynamic send semantics (e.g., some turns are concurrent, some sequential) need a mutable method, which violates the "constant" assumption. | Every test helper and middleware wrapper must be updated. | + +**Preferred: A1** — optional interface on Process, consumed only by `RunTurn`. + +### Axis B: Context Fill + +#### Candidate B1: CLI Engine Synthesizes `MessageContextWindow` + +When the CLI engine's `applyContextFill()` populates `ContextUsedTokens` on a mid-turn message, it **also** emits a separate `MessageContextWindow` message with the same data. Consumers filter on one message type for all backends. + +**Approach:** In `scanLines`, after enriching a mid-turn message with context fill, synthesize and emit a `MessageContextWindow` message. `ContextSizeTokens` remains 0 for CLI (unknown). Consumers get a unified signal type but must tolerate `ContextSizeTokens == 0` (absolute-only, no percentage). + +**Strengths:** +- Unified message type — consumers write one `case MessageContextWindow:` handler. +- No changes to the root package API. +- CLI's context fill data is already computed; this just re-packages it. + +**Weaknesses:** +- Duplicates data: `ContextUsedTokens` appears on both the content message AND the synthesized `MessageContextWindow`. Consumers that process all messages may double-count. +- `ContextSizeTokens == 0` on CLI means consumers still can't compute percentage — the "fuel gauge" use case is only partially solved. +- Adds complexity to `scanLines` — must emit two messages per parsed line in some cases. +- Message ordering becomes subtle: does the synthesized `MessageContextWindow` come before or after the content message? + +**Fit:** +- Partial solution. Unifies the type but not the data completeness. Consumers still need backend awareness for percentage computation. + +#### Candidate B2: CLI Engine Captures `ContextSizeTokens` from Init/Model Metadata and Synthesizes `MessageContextWindow` + +Extends B1: the CLI engine (or Claude backend parser) captures `contextWindow` from model metadata (init event, result event, or a new metadata source) and stores it as engine state. Synthesized `MessageContextWindow` messages carry both `ContextSizeTokens` and `ContextUsedTokens`. + +**Approach:** Claude backend parser extracts `contextWindow` from the result event's `modelUsage..contextWindow` field (if it exists). The CLI engine stores this as `contextSize` state. On subsequent mid-turn messages with usage data, the engine emits `MessageContextWindow` with both fields populated. + +**Strengths:** +- Full fuel gauge: consumers get both capacity and fill for all backends. +- Truly unified — one message type, one code path, percentage-capable. + +**Weaknesses:** +- **Depends on unconfirmed wire format.** If Claude CLI doesn't expose `contextWindow` on any event, this approach requires hardcoding model context sizes or fetching them externally — both undesirable. +- Context size from the result event arrives **after** mid-turn messages. First-turn mid-turn messages would lack `ContextSizeTokens` until the first result populates it. This creates a "warm-up" gap. +- Couples the CLI engine to Claude-specific wire format details that may change. +- `contextSize` as engine state complicates the stateless-parser model (parser currently returns `(Message, error)` with no side effects). + +**Fit:** +- Ideal if the wire data exists. Risky if it doesn't. The "warm-up" gap is a real UX issue for first-turn fuel gauges. + +#### Candidate B3: Root Package `ContextFill` Helper That Extracts Fill from Any Message + +Instead of synthesizing new messages, provide a helper function in the root package (or filter package) that extracts context fill information from any message type. Consumers call the helper instead of switching on message type. + +**Approach:** Add `func ContextFill(msg Message) (used, size int, ok bool)` to the root package. It checks `MessageContextWindow` first, then falls back to mid-turn `ContextUsedTokens` on content messages. Returns `ok == false` when no fill data is present. + +**Strengths:** +- No message synthesis — no ordering or duplication concerns. +- Simple consumer API: `if used, size, ok := agentrun.ContextFill(msg); ok { ... }`. +- Works with existing message stream — no engine changes. +- `ok == false` lets consumers skip messages cleanly. + +**Weaknesses:** +- Consumers still iterate all messages — the helper just hides the type switch. Not composable with channel-based `filter.Filter()`. +- Doesn't solve the `ContextSizeTokens` gap for CLI — `size` will be 0 for CLI backends. +- A helper function is a band-aid over the real problem (heterogeneous signals). As more message types carry usage data, the helper grows. +- Doesn't enable `filter.Filter(ctx, ch, MessageContextWindow)` for CLI — consumers still need to process multiple types. + +**Fit:** +- Quick win, minimal blast radius. But doesn't achieve the "unified signal" goal from the issue. + +#### Candidate B4: Synthesize `MessageContextWindow` Only (No Duplication on Content Messages) + +CLI engine synthesizes `MessageContextWindow` when context fill data is available, and **removes** `ContextUsedTokens` from the content message. Context fill lives exclusively on `MessageContextWindow` messages, regardless of backend. + +**Approach:** `applyContextFill()` no longer sets `ContextUsedTokens` on mid-turn content messages. Instead, a separate `MessageContextWindow` is emitted. `ContextUsedTokens` on `MessageResult` remains (peak fill). `ContextSizeTokens` is 0 for CLI until a source is confirmed. + +**Strengths:** +- No duplication — context fill has one canonical location per turn interval. +- Clean consumer pattern: filter on `MessageContextWindow` for real-time fill, read `MessageResult` for final summary. +- Content messages stay focused on content. + +**Weaknesses:** +- **Breaking change.** Consumers of CLI mid-turn `ContextUsedTokens` (from PR #39, just merged) lose that data unless they also listen to `MessageContextWindow`. PR #39 was a deliberate design choice; reversing it one PR later signals instability. +- `ContextUsedTokens` on `MessageResult` (peak fill) is a different concept than `MessageContextWindow` (instantaneous fill). Having one on result and one on context_window could confuse consumers. +- If `ContextSizeTokens` is 0, the synthesized message is "used X of unknown" — marginally more useful than the current content-message approach. + +**Fit:** +- Cleaner architecture but high churn cost. The recent PR #39 investment would be partially undone. + +### Self-Critique: Axis B + +| Candidate | Strongest counter-argument | Worst case | Hidden cost | +|-----------|---------------------------|------------|-------------| +| **B1 (synthesize, keep on content)** | Duplicated data on two messages per API call — consumers that sum usage across messages will double-count context fill. | A consumer unknowingly processes both the content message and the synthesized context_window, reporting fill twice. | Message ordering contract needs documentation; output channel buffer pressure increases. | +| **B2 (synthesize with size)** | Depends on unconfirmed wire format data. If contextWindow isn't available mid-turn, the first turn has no capacity until result arrives. | Claude CLI drops the contextWindow field in an update — silent regression. | State management in CLI engine for contextSize; parser loses its stateless property. | +| **B3 (helper function)** | Band-aid — doesn't unify the signal, just hides the switch statement. Not composable with filter package patterns. | Helper grows with every new message type that carries usage data, becoming a maintenance burden. | Consumers must call the helper on every message — can't opt into a filtered channel. | +| **B4 (synthesize, remove from content)** | Breaking change on top of PR #39 (just merged). Signals API instability to early adopters. | Early consumers relying on mid-turn ContextUsedTokens on content messages break silently (field becomes 0). | Documentation/CLAUDE.md updates to reverse PR #39 semantics; test changes across CLI backends. | + +**Preferred: B1 + B3 hybrid** — Synthesize `MessageContextWindow` in CLI engine for channel-based filtering (B1), AND provide a `ContextFill` helper for callback-based consumers (B3). Keep `ContextUsedTokens` on content messages (no breaking change). Accept the duplication with clear documentation. + +### TurnSummary (Orthogonal) + +#### Candidate T1: `TurnSummary` Type + `CollectTurn` Helper + +A `TurnSummary` struct in the root package that collects text, thinking, tool calls, usage, stop reason, and denials from a message stream. A `CollectTurn` function or method iterates messages and populates it. + +**Approach:** `CollectTurn(ctx, proc, message) (TurnSummary, error)` wraps `RunTurn` and collects messages into a summary. Or: `TurnSummary` is a standalone accumulator that consumers feed messages into via `Add(msg)`. + +**Strengths:** Eliminates message-iteration boilerplate. Both MCP server and Foundry benefit. + +**Weaknesses:** Locks in a summary schema — if new message types or metadata fields arrive, the summary must grow. Consumers with custom aggregation needs bypass it anyway. + +#### Candidate T2: Leave Summarization to Consumers + +Status quo. Consumers iterate messages with their own logic. + +**Strengths:** Maximum flexibility. No schema to maintain. + +**Weaknesses:** Repeated boilerplate across every consumer. Bug risk in reimplemented iteration logic. + +**Recommendation:** T1, but deferred. TurnSummary has value but is separable from the context-fill and turn-execution changes. Address it in a follow-up issue to keep blast radius manageable. + +--- + +## 4.3 Decisions + +### ADR-1: RunTurn Handles All Engine Types via Optional Interface + +> In the context of **consumers branching on `spawnPerTurn` to choose between `RunTurn` and `drainSpawnPerTurn`**, facing **three distinct send-drain orderings across CLI streaming, CLI spawn-per-turn, and ACP engines**, we decided **to add an optional interface on Process (discoverable by RunTurn via type assertion) that declares send-then-drain ordering, allowing RunTurn to handle all engine types internally**, and neglected **replacing Process with TurnRunner (A3, too destructive to existing API) and adding a required SendMode method to Process (A4, violates capability-via-type-assertion pattern)**, to achieve **elimination of `spawnPerTurn` as a consumer-visible concept**, accepting **one new public interface in the root package and increased internal complexity in RunTurn**. + +Confidence: **high** — the pattern (optional interface + type assertion) is established in the codebase (Resumer, Streamer, InputFormatter in `engine/cli/interfaces.go`). The two RunTurn code paths (concurrent vs sequential) are well-understood from existing `RunTurn` and `drainSpawnPerTurn` implementations. + +Rejected: **A3 (TurnRunner)** — breaks every consumer by replacing the Process interface. Consumers that wrap Process for logging/metrics lose their integration point. Cost far exceeds benefit. **A4 (SendMode method)** — forces every Process implementation and mock to implement a method returning a constant. Violates the type-assertion pattern used consistently across CLI backends. + +Grounding: `drainSpawnPerTurn` (`cmd/agentrun-mcp/tools.go:393-407`) and `RunTurn` (`runturn.go:21-28`) contain the two code paths that would be unified. The `spawnPerTurn` boolean appears in 5+ consumer call sites (`tools.go:371,479,605`; `interactive/main.go:110,115`). + +### ADR-2: CLI Engine Synthesizes `MessageContextWindow` for Unified Context Fill + +> In the context of **CLI backends piggybacking context fill on content messages while ACP emits dedicated `MessageContextWindow`**, facing **consumers needing different filter/switch logic per backend to monitor context fill**, we decided **to have the CLI engine synthesize `MessageContextWindow` messages whenever `applyContextFill` populates `ContextUsedTokens`, while preserving `ContextUsedTokens` on content messages for backward compatibility**, and neglected **removing `ContextUsedTokens` from content messages (B4, breaking change after PR #39) and helper-function-only approach (B3, not composable with filter package)**, to achieve **a unified `MessageContextWindow` signal that all backends emit, enabling `filter.Filter(ctx, ch, MessageContextWindow)` for fuel-gauge consumers**, accepting **data duplication (context fill on both content message and synthesized context_window) which must be documented clearly**. + +Confidence: **medium** — the synthesis is straightforward mechanically, but the duplication creates a documentation burden and a risk of consumer confusion. The `ContextSizeTokens == 0` gap for CLI means the fuel gauge is absolute-only (no percentage) until a `contextWindow` source is confirmed. + +Assumption: **A-B2-1**: Claude CLI's wire format includes `contextWindow` on result events (referenced in the issue). If confirmed, `ContextSizeTokens` can be populated from the first result event onward, enabling percentage-based gauges. If not confirmed, CLI's `MessageContextWindow` carries `ContextUsedTokens` only (absolute fill). + +Rejected: **B4 (synthesize, remove from content)** — breaking change one PR after #39. Signals API instability. Early consumers relying on mid-turn `ContextUsedTokens` break. **B3 (helper only)** — doesn't compose with `filter.Filter()` channel patterns; consumers still iterate all message types. + +Grounding: `applyContextFill` (`engine/cli/process.go:468-489`) already computes per-call fill and populates `ContextUsedTokens` on mid-turn messages. The synthesis point is after this enrichment, before the message is sent to the output channel. `parseUsageUpdate` (`engine/acp/update.go:284-328`) is the ACP equivalent — same message type, same fields. + +### ADR-3: Provide `ContextFill` Helper for Callback-Based Consumers + +> In the context of **some consumers using `RunTurn` with callback handlers (not channel-based filters)**, facing **callback handlers still needing to switch on message type to extract context fill**, we decided **to add a `ContextFill(msg Message) (used, size int, ok bool)` helper function in the root package**, and neglected **putting it in the filter package (wrong abstraction level — filter operates on channels, this operates on single messages)**, to achieve **a single-call extraction point that works regardless of whether the consumer uses channels or callbacks**, accepting **a minor API surface increase (one exported function)**. + +Confidence: **high** — simple function, no state, easy to test. Independent of ADR-2. + +Rejected: filter package placement — `filter.ContextFill` would be importing root types and returning root values, but filter's abstraction is channel middleware, not per-message extraction. A per-message helper belongs in root. + +### ADR-4: Defer TurnSummary to Follow-Up Issue + +> In the context of **consumers reimplementing message iteration to extract turn results**, facing **the desire to address all four leaks in one change**, we decided **to defer TurnSummary to a separate issue**, and neglected **bundling it with context fill and RunTurn changes**, to achieve **manageable blast radius per change and independent review/testing**, accepting **continued iteration boilerplate in consumers until the follow-up lands**. + +Confidence: **high** — TurnSummary is orthogonal. Bundling it increases review surface and merge risk. + +--- + +## 4.4 Component Specification + +### Component 1: Sequential Send Interface (Root Package) + +A new optional interface in the root package that `Process` implementations may satisfy. The interface signals that `Send()` must complete before `Output()` is drained for the new turn's messages. The interface name should convey "send completes before drain begins" semantics without exposing implementation details like "spawn" or "subprocess." + +The interface has no methods beyond a marker — or alternatively, a single method that `RunTurn` calls instead of the default concurrent pattern. The preferred approach is a marker interface (no methods) because the actual send/drain logic is already implemented by `Process.Send()` and `Process.Output()` — `RunTurn` just needs to know the ordering. + +If a process satisfies this interface, `RunTurn` calls `Send()` first (blocking), checks for errors, then drains `Output()` until `MessageResult` or channel close. If a process does not satisfy this interface, `RunTurn` uses the current concurrent pattern (Send in goroutine, drain in main goroutine). + +### Component 2: Enhanced `RunTurn` (Root Package) + +`RunTurn` gains an internal type assertion on the process. Two paths: + +**Path 1 (default, concurrent):** Current behavior. Send in goroutine, drain in calling goroutine. Used by ACP and CLI streaming backends. + +**Path 2 (sequential):** Send first (blocking, with context), then drain Output() until MessageResult or channel close. Used by spawn-per-turn backends. Error from Send short-circuits — no drain attempted. + +The handler callback contract is unchanged: called for each message including MessageResult. Error from handler stops the drain. Context cancellation stops both paths. + +The `drainOutput` internal function can be reused for path 2 by passing a nil `sendCh` (Send already completed). + +### Component 3: Synthesized `MessageContextWindow` in CLI Engine + +After `applyContextFill()` enriches a mid-turn message with `ContextUsedTokens > 0`, the CLI engine emits an additional `MessageContextWindow` message to the output channel. This happens in `scanLines` after the enriched content message is emitted. + +The synthesized message carries: +- `Type: MessageContextWindow` +- `Usage.ContextUsedTokens`: same value as the content message's `ContextUsedTokens` +- `Usage.ContextSizeTokens`: 0 initially (CLI lacks capacity data), populated from cached model metadata if ADR-2's assumption A-B2-1 is confirmed +- `Timestamp`: same as the triggering content message + +The synthesized message is emitted **after** the content message (content-first ordering). This matches ACP's behavior where `usage_update` notifications arrive between content updates. + +No synthesis on `MessageResult` — result already carries peak `ContextUsedTokens` via `applyContextFill`, and `MessageContextWindow` is a mid-turn signal. + +### Component 4: `ContextFill` Helper (Root Package) + +A pure function that extracts context fill information from any `Message`. Logic: + +1. If `msg.Usage` is nil, return `(0, 0, false)`. +2. If `msg.Usage.ContextUsedTokens > 0` or `msg.Usage.ContextSizeTokens > 0`, return `(used, size, true)`. +3. Otherwise return `(0, 0, false)`. + +This works for both `MessageContextWindow` (ACP and synthesized CLI) and mid-turn content messages with `ContextUsedTokens`. The `ok` return value lets callers skip messages without context fill data. + +### Data Flow + +``` +CLI Engine: + Parser → ParseLine → Message (content) + Engine → applyContextFill → enriches ContextUsedTokens on content message + Engine → emit content message to Output channel + Engine → synthesize MessageContextWindow → emit to Output channel + [consumer receives both; filters on MessageContextWindow for fuel gauge] + +ACP Engine: + Protocol → usage_update notification + Engine → parseUsageUpdate → MessageContextWindow + Engine → emit to Output channel + [consumer receives MessageContextWindow; same filter as CLI] + +Consumer (channel-based): + filter.Filter(ctx, proc.Output(), agentrun.MessageContextWindow) + → receives unified context fill signal from any backend + +Consumer (callback-based): + handler := func(msg Message) error { + if used, size, ok := agentrun.ContextFill(msg); ok { + updateFuelGauge(used, size) + } + // ... handle other message types + } + RunTurn(ctx, proc, prompt, handler) +``` + +--- + +## 4.5 Dependency and Blast-Radius Map + +### Direct Changes + +| File/Component | Change | +|----------------|--------| +| Root package (`agentrun`) | New interface (sequential send marker), `ContextFill` helper function | +| `runturn.go` | Type assertion + sequential path in `RunTurn` | +| `engine/cli/process.go` | CLI process satisfies sequential-send interface (spawn-per-turn backends); synthesize `MessageContextWindow` in scanLines after `applyContextFill` | +| `engine/cli/engine.go` | Possibly: detect spawn-per-turn at Start time and store on process struct for interface satisfaction | + +### Indirect Impact + +| Component | Impact | +|-----------|--------| +| `cmd/agentrun-mcp/` | Can remove `spawnPerTurn` boolean, `drainSpawnPerTurn()`, `sessionEntry.spawnPerTurn`. `makeEngine` returns `(Engine, error)`. All call sites use `RunTurn` uniformly. | +| `examples/interactive/` | Can remove `spawnPerTurn` branching — single `RunTurn` call for all backends. | +| `filter/` | No changes needed. Consumers use existing `filter.Filter(ctx, ch, MessageContextWindow)`. | +| `engine/acp/` | No changes — already emits `MessageContextWindow` natively. | +| `engine/cli/claude/`, `codex/`, `opencode/` | No parser changes. Synthesis happens at engine level, not parser level. | +| `enginetest/clitest/` | May need new compliance test for `MessageContextWindow` synthesis. | + +### Risk Zones + +- **`scanLines` in `engine/cli/process.go`**: This is the hottest path in the CLI engine — every stdout line flows through it. Adding message synthesis increases complexity. Must not block the output channel (check buffer capacity before synthesis, or always emit synchronously). +- **Output channel buffer pressure**: Synthesizing an extra message per mid-turn usage event doubles the context-fill message volume on the output channel. With `OutputBuffer` default of 64, this should be fine, but high-frequency usage events could cause back-pressure. +- **RunTurn sequential path**: The new path must handle context cancellation correctly during the blocking `Send()` call. If `Send` blocks indefinitely and context is cancelled, the function must return `ctx.Err()` promptly — same contract as the concurrent path. + +--- + +## 4.6 Implementation Instructions (Handoff Contract) + +### What to Build + +1. **Sequential-send interface in root package.** A new exported interface that Process implementations can satisfy to signal "Send must complete before Output is drained." Marker interface (no methods required) or a single-method interface — implementation decides. The interface name should be descriptive (e.g., `SequentialSender`) without leaking subprocess terminology. + +2. **RunTurn enhancement.** `RunTurn` type-asserts the process against the new interface. When satisfied, calls `Send(ctx, message)` synchronously, then drains Output using the existing `drainOutput` helper with a nil `sendCh`. When not satisfied, current concurrent behavior is preserved. + +3. **CLI process satisfies the interface.** When the CLI engine's process has spawn-per-turn semantics (Resumer without Streamer), the process satisfies the sequential-send interface. The condition is already computed at Start time in `resolveCapabilities`. + +4. **Synthesize `MessageContextWindow` in CLI engine.** After `applyContextFill` enriches a mid-turn message with `ContextUsedTokens > 0`, emit an additional `MessageContextWindow` message with the same `ContextUsedTokens` and `ContextSizeTokens == 0` (or cached value if available). Emit after the content message. Do not synthesize on `MessageResult` or `MessageInit`. + +5. **`ContextFill` helper function in root package.** `func ContextFill(msg Message) (used, size int, ok bool)`. Returns context fill data from any message type. `ok` is false when no fill data is present. + +### In Scope + +- The five items above. +- Tests for RunTurn's sequential path (using a mock process satisfying the new interface). +- Tests for MessageContextWindow synthesis in CLI engine. +- Tests for ContextFill helper. +- Updating MCP server and interactive example to remove `spawnPerTurn` branching (proof that the abstraction works). +- Updating `MessageContextWindow` godoc to note it is now emitted by CLI engines as well. + +### Out of Scope + +- **TurnSummary** — deferred per ADR-4. Create a follow-up issue. +- **`ContextSizeTokens` for CLI** — depends on confirming Claude CLI wire format. If confirmed, a follow-up captures it. The design accommodates this as a future enhancement (cached `contextSize` state in CLI engine). +- **Removing `ContextUsedTokens` from content messages** — explicitly not done. PR #39 semantics preserved for backward compatibility. +- **Changes to backend parsers** — synthesis happens at engine level. Parsers remain stateless. +- **Changes to ACP engine** — already emits `MessageContextWindow` natively. + +### Affected Files and Components + +| File | Change Type | +|------|-------------| +| Root package: new file or existing file | New interface + ContextFill function | +| `runturn.go` | Modified: sequential path | +| `engine/cli/process.go` | Modified: interface satisfaction + MessageContextWindow synthesis | +| `engine/cli/engine.go` | Possibly modified: pass capability info for interface satisfaction | +| `cmd/agentrun-mcp/engine.go` | Modified: remove spawnPerTurn from makeEngine return | +| `cmd/agentrun-mcp/tools.go` | Modified: remove drainSpawnPerTurn, simplify all call sites | +| `cmd/agentrun-mcp/session.go` | Modified: remove spawnPerTurn from sessionEntry | +| `examples/interactive/main.go` | Modified: remove spawnPerTurn branching | +| Test files for all above | New/modified | + +### Acceptance Criteria + +1. `RunTurn` works correctly for all three send modes (streaming CLI, spawn-per-turn CLI, ACP) without consumers branching on engine type. +2. `makeEngine` (or equivalent) no longer returns a boolean. Consumers do not need to know or track `spawnPerTurn`. +3. CLI backends that report per-call usage mid-turn emit `MessageContextWindow` messages that can be filtered with `filter.Filter(ctx, ch, MessageContextWindow)`. +4. `ContextFill(msg)` returns correct values for `MessageContextWindow` (both ACP and CLI-synthesized), mid-turn content messages with usage, and returns `ok == false` for messages without context fill data. +5. MCP server's `doRunTurn`, `doSessionStart`, and `doSessionSend` use a single code path (no spawnPerTurn branching). +6. Interactive example's `run()` function uses a single code path for first turn and subsequent turns. +7. All existing tests pass. `make qa` is green. +8. No breaking changes to the root package's existing public API (existing `ContextUsedTokens` on content messages is preserved). + +--- + +## 4.7 Verification Criteria + +- **RunTurn sequential correctness**: A test using a mock process with sequential-send semantics verifies that Send completes before Output is drained. Messages emitted by Output after Send are received by the handler. Context cancellation during Send returns `ctx.Err()`. +- **RunTurn concurrent correctness**: Existing RunTurn tests continue passing (regression). +- **MessageContextWindow synthesis**: A CLI engine test verifies that when a backend emits mid-turn messages with usage data, `MessageContextWindow` messages appear in the output channel interleaved after the content messages. The synthesized messages carry the correct `ContextUsedTokens` value. +- **No synthesis on result**: `MessageResult` does not trigger a synthesized `MessageContextWindow`. +- **ContextFill helper**: Returns `(used, size, true)` for MessageContextWindow with non-zero fields, `(used, 0, true)` for CLI content messages with ContextUsedTokens, and `(0, 0, false)` for messages without usage or with zero context fill. +- **MCP server simplification**: The `doSessionSend` function has no `if spawnPerTurn` branch. `sessionEntry` has no `spawnPerTurn` field. +- **Channel-based filtering**: `filter.Filter(ctx, ch, MessageContextWindow)` receives context fill signals from both CLI and ACP backends in an integration-style test. +- **Buffer pressure**: Under sustained mid-turn usage events, the output channel does not deadlock due to synthesized messages (verified by test with small buffer size). + +--- + +## 4.8 Assumptions + +**A1: Claude CLI's `contextWindow` wire field.** The issue references `modelUsage..contextWindow` on result events. This has not been confirmed in the codebase (grep found zero matches). If this field exists, `ContextSizeTokens` can be populated for CLI backends. If not, CLI's `MessageContextWindow` carries absolute fill only. +*Invalidated if:* Claude CLI never exposes context window capacity in any event type. In that case, percentage-based fuel gauges remain ACP-only, and CLI consumers use absolute token counts. + +**A2: RunTurn's concurrent pattern works for all non-spawn-per-turn backends.** All future engine types (API-based) will tolerate `Send()` being called in a goroutine while `Output()` is drained concurrently. +*Invalidated if:* A future engine requires Send-then-drain ordering but isn't a spawn-per-turn subprocess (e.g., an API engine with request-then-poll semantics). The sequential-send interface would accommodate this, but the naming might be misleading. + +**A3: Output channel buffer is sufficient for synthesized messages.** The default `OutputBuffer` of 64 absorbs the doubled message rate from context fill synthesis without back-pressure. +*Invalidated if:* A backend emits usage data on every streaming token (not just per API call), causing hundreds of synthesized messages per turn. Currently, Claude CLI emits usage per `assistant` event (one per API call, typically 1-5 per turn), so this is safe. + +**A4: Spawn-per-turn semantics are detectable at Start time.** The `resolveCapabilities` function in `engine/cli/engine.go` already determines whether a backend is streaming or spawn-per-turn. This determination is stable for the process lifetime. +*Invalidated if:* A backend dynamically switches between streaming and spawn-per-turn modes within a session. No current or planned backend does this. + +--- + +## 4.9 Metadata + +`2026-03-09 | design | unify-context-fill-turn-abstraction` + +Issue: dmora/agentrun#40 — Unify real-time context fill signal across CLI and ACP backends + +Related: dmora/agentrun#39 (ContextUsedTokens on mid-turn messages — predecessor, merged) + +--- + +## Appendix: Implementation Sequencing Suggestion + +While implementation ordering is out of scope for this design, the following sequencing minimizes risk: + +1. **Phase 1**: Sequential-send interface + RunTurn enhancement (pure addition, no breaking changes) +2. **Phase 2**: CLI process satisfies interface + MCP server/example cleanup (proves the abstraction) +3. **Phase 3**: MessageContextWindow synthesis in CLI engine (independent of Phase 1-2) +4. **Phase 4**: ContextFill helper (independent of all above) + +Phases 1-2 and 3-4 can proceed in parallel. diff --git a/docs/designs/2026-03-12-issue-40-remaining-gaps.md b/docs/designs/2026-03-12-issue-40-remaining-gaps.md new file mode 100644 index 0000000..5d12979 --- /dev/null +++ b/docs/designs/2026-03-12-issue-40-remaining-gaps.md @@ -0,0 +1,462 @@ +# Design: Issue #40 — Remaining Gaps After Partial Implementation + +## 4.1 Context + +### Problem Statement + +Issue dmora/agentrun#40 identified four abstraction leaks exposed by the MCP diagnostic server (the first non-trivial consumer). A prior design document (`docs/designs/2026-03-09-unify-context-fill-and-turn-abstraction.md`) proposed solutions. Since then, significant implementation has landed. This document reconciles the current codebase state with the original four leaks and designs solutions for what remains. + +### Leak Status Matrix + +| # | Leak | Status | Evidence | +|---|------|--------|----------| +| 1 | `spawnPerTurn` branching duplicated in consumers | **Resolved** | `SequentialSender` interface (`process.go:68-71`), `RunTurn` type-asserts on it (`runturn.go:29`), CLI wraps spawn-per-turn processes (`engine/cli/process.go:81-87`). MCP server uses `RunTurn` uniformly. | +| 2 | `makeEngine` returns a boolean consumers must track | **Resolved** | `makeEngine` returns `(Engine, error)` only (`cmd/agentrun-mcp/engine.go:24`). `sessionEntry` has no `spawnPerTurn` field (`session.go:27-36`). Interactive example uses `RunTurn` with no branching (`examples/interactive/main.go:188-194`). | +| 3 | No unified turn summary | **Open** | MCP server collects messages into `[]agentrun.Message` and returns raw arrays (`tools.go:367-371`). No `TurnSummary` type in root package. | +| 4 | Context fill requires filtering 3+ message types | **Partially resolved** | CLI engine synthesizes `MessageContextWindow` via `synthesizeContextWindow` (`process.go:387-404`). `ContextFill` helper exists (`contextfill.go:1-17`). **Gap**: `ContextSizeTokens` is always 0 for CLI — no percentage-based fuel gauge. | + +### Remaining Work + +Two gaps remain open: + +**Gap A: TurnSummary** — Every consumer reimplements message-iteration logic to extract text, usage, stop reason, denials, and errors from a turn's message stream. The MCP server (`tools.go:327-340`, `tools.go:367-386`) and any Foundry integration will duplicate this. + +**Gap B: ContextSizeTokens for CLI** — CLI backends emit `MessageContextWindow` with `ContextUsedTokens` but `ContextSizeTokens == 0`. Consumers cannot compute fill percentage. ACP provides both fields natively. The issue hypothesizes that Claude CLI's result event includes `modelUsage..contextWindow`, but this was **not confirmed** in the parser (`engine/cli/claude/parse.go:299-331` — no `contextWindow` extraction). + +### Known (from codebase) + +- `ContextFill(msg)` in `contextfill.go:9-17` returns `(used, size, ok)` — works for any message type. +- `synthesizeContextWindow` in `process.go:387-404` creates `MessageContextWindow` from mid-turn content messages with fill data. Excludes `MessageResult`, `MessageInit`, `MessageError`. +- `emitWithSynthesis` in `process.go:365-382` emits content message then synthesized `MessageContextWindow` in order. +- `applyContextFill` in `process.go:518-539` tracks `maxCallFill` across a turn. Uses `callContextFill` which sums `InputTokens + CacheReadTokens + CacheWriteTokens`. +- `collectTerminalState` in MCP `tools.go:327-340` is the closest thing to a turn summary — but it's MCP-specific and only captures messages, duration, stderr, and exit code. +- `runTurnOutput` in `tools.go:272-278` carries `Messages []agentrun.Message` — raw, unsummarized. +- The `filter` package (`filter/filter.go`) provides `Filter`, `Completed`, `ResultOnly`, `IsDelta`. No context-fill-specific filter (but `Filter(ctx, ch, MessageContextWindow)` already works post-synthesis). +- Claude CLI parser `extractTokenUsage` (`engine/cli/claude/parse.go`) parses `usage.input_tokens`, `cache_creation_input_tokens`, `cache_read_input_tokens`, `output_tokens`, `thinking_tokens`, and `costUSD` from `result` events. No evidence of `contextWindow` or `context_window` field parsing. + +### Assumed + +- **A1**: Claude CLI's wire format may include context window capacity in some event type (issue references `modelUsage..contextWindow` on result events). Unconfirmed — grep across the Claude backend found zero matches. This is the **critical unknown** for Gap B. +- **A2**: Future API engines (ADK) will provide their own context window reporting and won't rely on CLI synthesis. +- **A3**: TurnSummary is needed by at least 2 consumers (MCP server, Foundry) to justify a root-package type. + +### Missing + +- **M1**: Claude CLI wire format sample showing `contextWindow` field (or confirmation it doesn't exist). +- **M2**: Whether Foundry currently reimplements turn message iteration (and in how many places). +- **M3**: Whether consumers need turn-level peak context fill in the summary, or just per-message signals. + +### Context Manifest + +Files examined: +- `process.go` (lines 53-71) — SequentialSender interface +- `runturn.go` (lines 1-91) — RunTurn, drainOutput +- `contextfill.go` (lines 1-17) — ContextFill helper +- `message.go` (lines 1-348) — Message, Usage, MessageType constants +- `engine/cli/process.go` (lines 81-87, 300-539) — sequentialProcess, scanLines, emitWithSynthesis, synthesizeContextWindow, applyContextFill +- `engine/cli/engine.go` (lines 1-178) — Engine.Start, resolveCapabilities +- `engine/acp/update.go` (known from prior analysis) — parseUsageUpdate +- `cmd/agentrun-mcp/tools.go` (lines 1-730) — all tool handlers, doRunTurn, collectTerminalState +- `cmd/agentrun-mcp/engine.go` (lines 1-41) — makeEngine (no boolean return) +- `cmd/agentrun-mcp/session.go` (lines 1-207) — sessionEntry (no spawnPerTurn field) +- `examples/interactive/main.go` (lines 1-221) — no spawnPerTurn branching, uses RunTurn +- `filter/filter.go` (lines 1-88) — Filter, Completed, ResultOnly, IsDelta +- `engine/cli/claude/parse.go` (known from prior analysis) — extractTokenUsage, no contextWindow +- `docs/designs/2026-03-09-unify-context-fill-and-turn-abstraction.md` — prior design document + +--- + +## 4.2 Solution Pool + +### Gap A: TurnSummary + +#### Candidate A1: `TurnSummary` Type + `CollectTurn` Function in Root Package + +A struct that aggregates turn-level data: accumulated text, thinking text, tool calls, total usage, stop reason, denials, errors. A `CollectTurn(ctx, proc, message) (TurnSummary, error)` function wraps `RunTurn` internally and populates the summary. + +**Approach:** `CollectTurn` calls `RunTurn` with an internal handler that dispatches messages into a `TurnSummary` accumulator. Returns the completed summary. Consumers also get a `TurnCollector` (stateful accumulator) for those who need the raw handler callback (e.g., to also stream deltas to a UI). + +**Strengths:** +- Eliminates repeated iteration boilerplate. +- Type-safe access to turn results (stop reason, denials, usage) without manual message filtering. +- Both MCP server and Foundry can adopt immediately. +- `TurnCollector` variant lets streaming consumers accumulate summaries while also forwarding messages. + +**Weaknesses:** +- Locks in a summary schema — new message fields require `TurnSummary` updates. +- Consumers with custom aggregation (e.g., per-tool-call cost attribution) bypass it. +- Increased root package API surface. +- Text accumulation: should it concatenate all `MessageText` content? What about multi-text turns (Claude CLI emits multiple text blocks in tool-use loops)? + +**Fit:** +Good. The MCP server's `collectTerminalState` + raw messages pattern maps directly. The root package already has `RunTurn` and `ContextFill` as consumer helpers. + +#### Candidate A2: `TurnSummary` as a Standalone Accumulator (No `CollectTurn` Wrapper) + +A struct with an `Add(msg Message)` method that consumers call from their existing `RunTurn` handler. No wrapping of `RunTurn` itself. + +**Approach:** Consumers create a `TurnSummary`, pass `summary.Add` as the handler (or call it within their handler), and read fields after the turn completes. + +**Strengths:** +- Composable — works with any message consumption pattern (RunTurn callback, channel iteration, etc.). +- Minimal API surface: one type, one method. +- Consumer retains full control of the handler. + +**Weaknesses:** +- Consumer still writes `RunTurn(ctx, proc, msg, summary.Add)` or equivalent — slight boilerplate remains. +- `Add` must handle all message types correctly, including idempotency if called twice with the same message (e.g., consumer logging + accumulating). +- No "just give me the answer" convenience — always requires manual wiring. + +**Fit:** +Good for library-first philosophy (composable, no hidden magic). Slightly more ceremony than A1 but more flexible. + +#### Candidate A3: Leave Summarization to Consumers (Status Quo) + +No library type. Consumers continue iterating messages with their own logic. + +**Strengths:** Maximum flexibility, no schema maintenance. + +**Weaknesses:** Repeated boilerplate. Bug risk in reimplemented iteration logic (e.g., forgetting to check `msg.Usage` nil guard). Every consumer derives the same 5 fields. + +**Fit:** Acceptable for a library with 1-2 consumers. But with MCP server + Foundry + examples, the duplication is real. + +### Self-Critique: Gap A + +| Candidate | Strongest counter-argument | Worst case | Hidden cost | +|-----------|---------------------------|------------|-------------| +| **A1 (CollectTurn wrapper)** | Hides the message stream — consumers who want both streaming UI and summary must either use TurnCollector (increased API surface) or re-derive the summary from messages anyway. | New message fields added to `Message` but not to `TurnSummary` — summary silently loses data until someone notices. | Schema maintenance: every new metadata field on Message requires a `TurnSummary` update. The summary and the message diverge. | +| **A2 (standalone accumulator)** | `summary.Add` is a one-liner, but consumers still need to know it exists and wire it in. Marginal improvement over manual iteration for simple cases. | Consumer calls `Add` in wrong order or misses messages — accumulator returns partial summary without error. | Thread safety: if multiple goroutines call `Add` (unlikely but possible with custom drain loops), the accumulator needs a mutex or "single-writer only" contract. | +| **A3 (status quo)** | Three consumers already duplicate this logic. Each will independently discover edge cases (nil Usage, missing StopReason, empty Denials). | A consumer mishandles nil `Usage` and panics in production. | Time cost: every new consumer author reads Message godoc and writes iteration logic from scratch. | + +**Preferred: A2 (standalone accumulator)** — composable, minimal API surface, no hidden wrapping. With an optional `CollectTurn` convenience added only if the two-line wiring proves too verbose in practice. + +### Gap B: ContextSizeTokens for CLI + +#### Candidate B1: Capture `contextWindow` from Claude CLI Wire Format (If Available) + +The Claude backend parser extracts a `contextWindow` field from result events (or init events) and stores it on the message. The CLI engine caches this value and populates `ContextSizeTokens` on synthesized `MessageContextWindow` messages. + +**Approach:** Add extraction logic to `extractTokenUsage` in `engine/cli/claude/parse.go`. Store the model's context window size on `Usage.ContextSizeTokens` on the `MessageResult`. The CLI engine caches this value (`contextSize` field on `process`) and uses it for subsequent `MessageContextWindow` synthesis. + +**Strengths:** +- Full fuel gauge: consumers get `ContextSizeTokens` + `ContextUsedTokens` for percentage computation. +- Unified consumer experience across CLI and ACP. + +**Weaknesses:** +- **Depends on unconfirmed wire format.** If the field doesn't exist, this approach is impossible. +- First-turn mid-turn messages lack `ContextSizeTokens` until the first result populates the cache. This creates a "warm-up" gap where the denominator is 0. +- Couples the CLI engine to Claude-specific wire format details. +- Adds state to the CLI process (cached `contextSize`), complicating the stateless-parser model. + +**Fit:** Ideal if wire data exists. Blocked until confirmed. + +#### Candidate B2: Hardcode Model Context Sizes in Claude Backend + +Maintain a map of known Claude model IDs to their context window sizes. Populate `ContextSizeTokens` from this map based on `InitMeta.Model`. + +**Approach:** `claude.ModelContextSize` map. On `MessageInit` (which carries `InitMeta.Model`), the engine looks up the context size and caches it for `MessageContextWindow` synthesis. + +**Strengths:** +- Available immediately (no wire format dependency). +- First-turn `MessageContextWindow` messages carry `ContextSizeTokens` from the start. + +**Weaknesses:** +- **Maintenance burden.** New Claude model releases require library updates. Stale map → wrong percentages. +- Anthropic has released models with varying context sizes (100K, 200K). A wrong size is worse than no size. +- Violates the "backends parse wire data" principle — this is static config masquerading as dynamic data. +- Other CLI backends (Codex, OpenCode) still lack `ContextSizeTokens`. + +**Fit:** Fragile. A library should not maintain a model registry. + +#### Candidate B3: Consumer-Supplied Context Size via Session Option + +Add `OptionContextSize` to the root package. Consumers set it when they know the model's context size (e.g., from an API call, config file, or prior turn's result). The CLI engine uses this value for `MessageContextWindow` synthesis. + +**Approach:** Consumers who need percentage-based fuel gauges provide `session.Options[agentrun.OptionContextSize] = "200000"`. The CLI engine reads this at Start time and populates `ContextSizeTokens` on synthesized messages. If not set, `ContextSizeTokens` remains 0 (absolute-only). + +**Strengths:** +- No wire format dependency. +- Consumer controls the value — can use any source (API metadata, config, hardcoded). +- Works for all CLI backends (not just Claude). +- Library stays out of the model-registry business. + +**Weaknesses:** +- Consumer must know the context size out-of-band. Many won't. +- `ContextSizeTokens` varies by model — if the model changes mid-session (unlikely but possible), the cached value is stale. +- Adds an option constant to the root package for a niche use case. + +**Fit:** Pragmatic escape hatch. Doesn't solve the common case (consumer doesn't know context size) but enables power users. + +#### Candidate B4: Accept CLI Limitation; Document as Backend Difference + +CLI's `MessageContextWindow` carries absolute `ContextUsedTokens` only. Consumers who need percentage use ACP (which provides both fields). Document this as a known backend capability difference. + +**Approach:** No code changes. Update godoc to clearly state that `ContextSizeTokens == 0` on CLI means "capacity unknown — use absolute value only." Consumers check `ContextSizeTokens > 0` before computing percentage. + +**Strengths:** +- Zero complexity. No maintenance burden. +- Honest — if the wire data doesn't exist, don't fabricate it. +- Consumers already need to handle `ContextSizeTokens == 0` (per `Usage.ContextSizeTokens` godoc). + +**Weaknesses:** +- CLI consumers can't build percentage-based fuel gauges. The core issue's motivating use case is only half-solved for CLI. +- "Just use ACP" isn't always an option — Claude Code CLI is the most common backend. + +**Fit:** Acceptable as a near-term position while waiting for wire format confirmation. Unacceptable as a permanent answer if the wire data can be obtained. + +### Self-Critique: Gap B + +| Candidate | Strongest counter-argument | Worst case | Hidden cost | +|-----------|---------------------------|------------|-------------| +| **B1 (wire capture)** | Blocked on unconfirmed wire data. May never be possible. | Anthropic changes the field location or semantics in a Claude CLI update — silent data regression. | State management (cached contextSize on process struct). Parser loses stateless property for one field. | +| **B2 (hardcoded map)** | A library maintaining a model registry is a maintenance anti-pattern. First wrong value shipped to consumers poisons trust. | New model released with 1M context window. Library shows 200K. Consumer thinks they have 80% remaining when they really have 20%. | Every model release cycle requires a library patch. Semantic versioning pressure to add models without breaking changes. | +| **B3 (consumer option)** | Shifts the burden to every consumer. Most won't know the value. The option exists but solves the problem for ~10% of use cases. | Consumer hardcodes 200K for a model that actually has 128K. Fill gauge shows 60% when the real fill is 93%. Agent runs out of context unexpectedly. | Option validation (must be positive int). Documentation explaining where consumers can find context sizes. | +| **B4 (accept limitation)** | The issue's motivating example is a fuel gauge. "No percentage for CLI" is a non-answer for the primary use case. | Consumer builds a fuel gauge, deploys it, discovers it only works for ACP backends — has to rearchitect after shipping. | None, but opportunity cost: the issue stays partially unresolved indefinitely. | + +**Preferred: B4 (accept limitation) as immediate position + B1 (wire capture) as follow-up when confirmed.** + +The rationale: fabricating `ContextSizeTokens` from a hardcoded map (B2) or consumer input (B3) creates false precision that's worse than no value. The honest answer is "CLI doesn't report this." If the wire data is confirmed (B1), it should be captured — but that's a separate, gated task. + +--- + +## 4.3 Decisions + +### ADR-1: Provide `TurnSummary` as Standalone Accumulator + +> In the context of **multiple consumers (MCP server, Foundry, future orchestrators) reimplementing message-iteration logic to extract turn results**, facing **the desire for a reusable summary without hiding the raw message stream**, we decided **to add a `TurnSummary` struct with an `Add(Message)` method in the root package**, and neglected **a `CollectTurn` wrapper around RunTurn (A1, hides message stream from consumers who want both streaming and summary) and status quo (A3, continued duplication and nil-guard bugs)**, to achieve **a composable, opt-in accumulator that consumers wire into any message consumption pattern**, accepting **marginal wiring boilerplate (`summary.Add` in handler) and a schema that must grow with new Message metadata fields**. + +Confidence: **medium** — the pattern is sound, but the exact field set needs validation with both MCP server and Foundry to ensure the summary captures what both consumers actually need. The schema risk (growing with new fields) is real but manageable if fields are added conservatively. + +Rejected: **A1 (CollectTurn)** — wrapping RunTurn hides the message stream. Consumers who want both live streaming (delta forwarding) and post-turn summary can't use it without the TurnCollector variant, which doubles the API surface. **A3 (status quo)** — three+ consumers reimplementing the same nil-guarded iteration is a bug farm. + +Grounding: MCP server's `collectTerminalState` (`tools.go:327-340`) already derives duration, exit code, and error from process state. `doRunTurn` (`tools.go:367-386`) collects messages into a slice but doesn't extract text, usage, or stop reason. Foundry's integration (per memory: "~10 lines to add backend in `internal/agent/agentrun.go:buildEngine`") will need the same extraction. + +### ADR-2: Accept CLI ContextSizeTokens Limitation; Gate Wire Capture on Confirmation + +> In the context of **CLI backends emitting `MessageContextWindow` with `ContextUsedTokens` but `ContextSizeTokens == 0`**, facing **the inability to compute fill percentage without knowing context window capacity**, we decided **to document this as a known backend capability difference and defer `ContextSizeTokens` capture until the Claude CLI wire format is confirmed to include a `contextWindow` field**, and neglected **hardcoding model context sizes (B2, maintenance anti-pattern, false precision) and consumer-supplied option (B3, shifts burden, solves <10% of cases)**, to achieve **honest reporting (absolute fill when capacity is unknown) without false precision**, accepting **that CLI consumers cannot build percentage-based fuel gauges until the wire format is confirmed or a future solution emerges**. + +Confidence: **medium** — this is the right near-term position, but it leaves the motivating use case (fuel gauge percentage) unsolved for CLI. If the wire format is confirmed, B1 should be implemented immediately as an amendment. + +Rejected: **B2 (hardcoded map)** — a library maintaining a model context-size registry is a maintenance anti-pattern. First wrong value shipped to consumers poisons trust in the fuel gauge. **B3 (consumer option)** — shifts the burden. Most consumers don't know model context sizes. False precision from wrong values is worse than no value. + +Grounding: Claude backend parser (`engine/cli/claude/parse.go:299-331`) parses 6 token/cost fields from result events. No `contextWindow` field is extracted. Grep across the `claude` backend found zero matches for "contextWindow" or "context_window". + +### ADR-3: Add Convenience `CollectTurn` Only If Accumulator Wiring Proves Verbose + +> In the context of **the `TurnSummary` accumulator requiring consumers to wire `summary.Add` into their handler**, facing **potential feedback that two lines of wiring is too much ceremony for the common case**, we decided **to ship the accumulator first (ADR-1) and add `CollectTurn` only if consumer feedback indicates the wiring is a pain point**, and neglected **shipping both simultaneously (increases API surface before validating need)**, to achieve **minimal initial API surface with a clear upgrade path**, accepting **a brief period where consumers write `summary.Add` manually**. + +Confidence: **high** — YAGNI until proven otherwise. Adding `CollectTurn` later is backward-compatible and risk-free. + +--- + +## 4.4 Component Specification + +### Component 1: `TurnSummary` Type (Root Package) + +A struct that accumulates turn-level data from individual messages. Designed for single-goroutine use (not concurrent-safe — callers protect with their own synchronization if needed, matching the typical `RunTurn` handler pattern which is single-goroutine). + +Fields the summary should expose: +- **Text**: accumulated assistant text (`MessageText` content, concatenated in order). +- **Thinking**: accumulated thinking text (`MessageThinking` content, concatenated in order). +- **ToolCalls**: slice of `ToolCall` from `MessageToolUse` messages. +- **Usage**: the `*Usage` from the `MessageResult` (turn-level totals). Nil if no result received. +- **StopReason**: from `MessageResult`. +- **Denials**: from `MessageResult`. +- **IsError**: from `MessageResult`. +- **Errors**: slice of error messages (`MessageError` content) encountered during the turn. +- **Result**: boolean indicating whether a `MessageResult` was received. +- **Messages**: the raw `[]Message` for consumers who need full access. Accumulated in order. + +The `Add(Message)` method dispatches on `msg.Type`: +- `MessageText`: appends `msg.Content` to Text. +- `MessageThinking`: appends `msg.Content` to Thinking. +- `MessageToolUse`: appends `msg.Tool` to ToolCalls. +- `MessageError`: appends `msg.Content` to Errors. +- `MessageResult`: captures Usage, StopReason, Denials, IsError. Sets Result to true. +- All types: appends to Messages. +- Delta types (`IsDelta`): ignored for accumulation (deltas are partial; complete messages follow). + +Design tension: should `Add` return an error? No — accumulation is infallible. A consumer whose handler needs to return errors should call `summary.Add` inside their handler and return errors separately. + +### Data Flow + +``` +Consumer with RunTurn + TurnSummary: + var summary agentrun.TurnSummary + err := agentrun.RunTurn(ctx, proc, msg, func(m agentrun.Message) error { + summary.Add(m) + // optional: also forward deltas to UI + return nil + }) + // summary.Text, summary.Usage, summary.StopReason, etc. are populated + +Consumer with channel iteration: + var summary agentrun.TurnSummary + for msg := range proc.Output() { + summary.Add(msg) + if msg.Type == agentrun.MessageResult { + break + } + } +``` + +--- + +## 4.5 Dependency and Blast-Radius Map + +### Direct Changes + +| File/Component | Change | +|----------------|--------| +| Root package (new file, e.g., `turnsummary.go`) | `TurnSummary` type + `Add` method | +| Root package test file | Tests for `TurnSummary.Add` across all message types | + +### Indirect Impact + +| Component | Impact | +|-----------|--------| +| `cmd/agentrun-mcp/tools.go` | Can replace `[]agentrun.Message` accumulation + `collectTerminalState` with `TurnSummary`. `runTurnOutput` can derive fields from summary. | +| `examples/interactive/` | Optional adoption — the interactive example streams deltas and may not need a summary. | +| Foundry (external) | Can adopt `TurnSummary` when integrating agentrun. | + +### Risk Zones + +- **TurnSummary schema growth**: Every new metadata field on `Message` (e.g., future `ReasoningEffort` on result) needs a `TurnSummary` field. This is the primary maintenance cost. +- **Text concatenation semantics**: Multi-text turns (Claude tool-use loops emit multiple `MessageText` per turn) produce concatenated text. Consumers wanting per-block text need `Messages` instead of `Text`. This must be documented. +- **No breaking changes**: `TurnSummary` is purely additive. No existing API is modified. + +--- + +## 4.6 Implementation Instructions (Handoff Contract) + +### What to Build + +1. **`TurnSummary` type in root package.** A struct with exported fields for text, thinking, tool calls, usage, stop reason, denials, errors, and raw messages. An `Add(Message)` method that dispatches by message type. Single-goroutine use only (no internal mutex). + +2. **Tests for `TurnSummary`.** Cover all message types including: normal turn (text + result), thinking turn, tool-use turn, error turn, turn with denials, delta-only messages (should be ignored for accumulation), multi-text turns, and the degenerate case (no result received). + +### In Scope + +- `TurnSummary` type and `Add` method. +- Comprehensive tests. +- Godoc explaining usage patterns (RunTurn callback, channel iteration). +- Documenting text concatenation behavior for multi-text turns. + +### Out of Scope + +- **`CollectTurn` convenience function** — deferred per ADR-3. Add later if warranted by feedback. +- **MCP server migration to `TurnSummary`** — consumers adopt at their own pace. Not required for this change. +- **`ContextSizeTokens` for CLI** — deferred per ADR-2. Gated on wire format confirmation (see Questions). +- **Changes to synthesized `MessageContextWindow`** — already working correctly. +- **Changes to `ContextFill` helper** — already working correctly. + +### Affected Files and Components + +| File | Change Type | +|------|-------------| +| Root package: `turnsummary.go` (new) | New type + method | +| Root package: `turnsummary_test.go` (new) | Tests | + +### Acceptance Criteria + +1. `TurnSummary.Add` correctly accumulates text from `MessageText`, thinking from `MessageThinking`, tool calls from `MessageToolUse`, errors from `MessageError`, and result metadata from `MessageResult`. +2. Delta message types are ignored for field accumulation (not appended to Text/Thinking). They ARE still appended to `Messages`. +3. `TurnSummary.Usage` is nil before `MessageResult` arrives and non-nil after (when the result carries usage). +4. Multi-text turns concatenate correctly (e.g., two `MessageText` messages → Text contains both, separated by content boundaries). +5. `TurnSummary` is usable with both `RunTurn` callback and direct channel iteration patterns. +6. `make qa` is green. +7. No changes to existing public API. + +--- + +## 4.7 Verification Criteria + +- **Normal turn**: `TurnSummary` after a text + result turn has non-empty `Text`, non-nil `Usage`, populated `StopReason`, `Result == true`, and 2 entries in `Messages`. +- **Thinking turn**: `Thinking` is populated from `MessageThinking`. `Text` is populated from `MessageText`. Both coexist. +- **Tool-use turn**: `ToolCalls` contains entries with correct `Name` and `Input`. Multiple tool calls accumulate. +- **Error turn**: `Errors` contains error messages. `Result` may be true or false depending on whether the turn completed. +- **Denial turn**: `Denials` is populated from `MessageResult.Denials`. +- **Delta messages**: `Text` does not contain delta fragments. `Messages` does contain delta messages (for consumers who need the full stream). +- **No result**: `Result == false`, `Usage == nil`, `StopReason` is empty. +- **IsError result**: `IsError == true` when result carries `is_error: true`. +- **Idempotency**: Calling `Add` with the same message twice does not corrupt state (it double-adds, which is the caller's bug, but it shouldn't panic or produce inconsistent types). + +--- + +## 4.8 Assumptions + +**A1: Claude CLI `contextWindow` wire field.** +The issue references `modelUsage..contextWindow` on result events. Not confirmed in the parser or any captured wire samples. This is the blocker for percentage-based fuel gauges on CLI. +*Invalidated if:* The field is confirmed to exist → implement B1 (wire capture) as an amendment. +*Also invalidated if:* Confirmed the field does NOT exist in any Claude CLI event → B4 (accept limitation) becomes the permanent position. Consider B3 (consumer option) as a fallback for power users. + +**A2: TurnSummary field set is sufficient for MCP server and Foundry.** +The proposed fields (Text, Thinking, ToolCalls, Usage, StopReason, Denials, IsError, Errors, Result, Messages) cover what `collectTerminalState` and `doRunTurn` derive today. Foundry's needs are assumed to be similar. +*Invalidated if:* Foundry needs fields not in this set (e.g., per-tool-call latency, intermediate context fill snapshots). In that case, extend `TurnSummary` or Foundry uses `Messages` for custom derivation. + +**A3: Single-goroutine `Add` is sufficient.** +`RunTurn`'s handler is called from a single goroutine (`drainOutput`). Channel iteration is also single-goroutine. No known consumer calls a handler from multiple goroutines. +*Invalidated if:* A consumer with parallel message processing needs concurrent `Add`. In that case, add an internal mutex (simple, backward-compatible). + +**A4: Text concatenation without separator is acceptable.** +`MessageText` content typically ends with natural boundaries. Concatenating without a separator matches the "stream reassembly" pattern. If this proves wrong, a newline separator can be added. +*Invalidated if:* Consumers report garbled text from multi-text turns. Add `\n` separator. + +--- + +## 4.9 Metadata + +`2026-03-12 | design | issue-40-remaining-gaps` + +Issue: dmora/agentrun#40 — Unify real-time context fill signal across CLI and ACP backends +Predecessor: `docs/designs/2026-03-09-unify-context-fill-and-turn-abstraction.md` + +Status of original ADRs: +- **ADR-1 (SequentialSender)**: Implemented. `SequentialSender` interface in `process.go:68-71`. +- **ADR-2 (Synthesize MessageContextWindow)**: Implemented. `synthesizeContextWindow` in `engine/cli/process.go:387-404`. +- **ADR-3 (ContextFill helper)**: Implemented. `contextfill.go:1-17`. +- **ADR-4 (Defer TurnSummary)**: This document picks up TurnSummary as the next deliverable. + +--- + +## Clarification Questions for Operator Validation + +### Critical (blocks design decisions) + +**Q1: Does Claude CLI's wire format include a `contextWindow` field on any event type?** +The issue references `modelUsage..contextWindow` on result events. The parser doesn't extract it, and grep found no evidence of it in the codebase. If this field exists, we can implement percentage-based fuel gauges for CLI (B1). If not, CLI is permanently limited to absolute fill values. **Can you capture a raw Claude CLI result event JSON and check for a `contextWindow` or `context_window` field?** + +**Q2: Is the `TurnSummary` scope as described sufficient for Foundry's needs?** +The proposed fields are: Text, Thinking, ToolCalls, Usage, StopReason, Denials, IsError, Errors, Result, Messages. Does Foundry need anything beyond this (e.g., per-tool-call timing, intermediate context fill snapshots, turn duration)? + +### Important (affects design nuances) + +**Q3: Text concatenation strategy for multi-text turns.** +When Claude does a tool-use loop, it emits multiple `MessageText` messages in one turn. Should `TurnSummary.Text` concatenate them with no separator, a newline separator, or store them as `[]string` (breaking the "single Text field" simplicity)? The issue doesn't specify. Current proposal: concatenate with no separator (matches stream reassembly). Newline separator is an easy alternative. + +**Q4: Should `TurnSummary` track context fill peak?** +The MCP server currently doesn't derive peak context fill from messages. But a fuel gauge consumer might want "peak fill this turn" as a summary field. Should `TurnSummary` track the max `ContextUsedTokens` seen across all messages in the turn? This adds one field and one comparison in `Add`. + +**Q5: Should `TurnSummary` be in the root package or a new `turn` package?** +Root package keeps it discoverable and co-located with `RunTurn`. A separate package reduces root API surface but adds import friction. Current proposal: root package (matches `ContextFill` placement). + +### Nice to Know (informs priority) + +**Q6: How many call sites in Foundry currently reimplement turn message iteration?** +This validates assumption A3 (≥2 consumers justify a root-package type). If Foundry only has 1 call site, the urgency is lower. + +**Q7: Is TurnSummary higher priority than the remaining ContextSizeTokens gap?** +Both are open. TurnSummary is unblocked; ContextSizeTokens is blocked on Q1. Should TurnSummary proceed immediately, or wait until both gaps can be addressed together? + +--- + +## Appendix: Implementation Sequencing + +Since leaks #1, #2, and #4 (partial) are already resolved, the remaining work is: + +1. **Phase 1 (unblocked)**: `TurnSummary` type + `Add` method + tests. Root package addition. +2. **Phase 2 (gated on Q1)**: If Claude CLI `contextWindow` confirmed → wire capture in Claude parser + CLI engine caching + `ContextSizeTokens` on synthesized `MessageContextWindow`. +3. **Phase 3 (gated on feedback)**: Optional `CollectTurn` convenience wrapper if consumer feedback warrants it. +4. **Phase 4 (optional)**: MCP server migration to `TurnSummary` (consumer-side cleanup, not a library change). + +Phase 1 can proceed immediately. Phase 2 is independently blocked on wire format confirmation. diff --git a/engine/acp/engine_test.go b/engine/acp/engine_test.go index b3f3959..b3c8316 100644 --- a/engine/acp/engine_test.go +++ b/engine/acp/engine_test.go @@ -1385,3 +1385,34 @@ func TestWithStderrWriter_ACP_Nil_NoOp(t *testing.T) { _ = proc.Stop(context.Background()) // No panic = success. } + +func TestEngine_SendBlocks(t *testing.T) { + proc, ctx := startProc(t) + + // Drain init. + <-proc.Output() + + bs, ok := proc.(agentrun.BlockSender) + if !ok { + t.Fatal("expected process to satisfy BlockSender") + } + + blocks := []agentrun.ContentBlock{ + agentrun.TextBlock("describe this"), + agentrun.ImageBase64Block("image/png", "SGVsbG8="), + } + if err := bs.SendBlocks(ctx, blocks...); err != nil { + t.Fatalf("SendBlocks: %v", err) + } + + msgs := collectUntilResult(proc.Output()) + if len(msgs) == 0 { + t.Fatal("no messages received after SendBlocks") + } + + // Verify we got the standard mock response (text + result) + deltaText := concatContent(msgs, agentrun.MessageTextDelta) + if deltaText != mockTextContent { + t.Errorf("text deltas = %q, want %q", deltaText, mockTextContent) + } +} diff --git a/engine/acp/process.go b/engine/acp/process.go index ae842a4..48aab4d 100644 --- a/engine/acp/process.go +++ b/engine/acp/process.go @@ -66,6 +66,7 @@ type process struct { } var _ agentrun.Process = (*process)(nil) +var _ agentrun.BlockSender = (*process)(nil) // newProcess creates a process shell. The Conn and ReadLoop are wired up // by Engine.Start after construction. @@ -91,6 +92,13 @@ func (p *process) Output() <-chan agentrun.Message { // Blocks until the turn completes (RPC response received) or ctx expires. // The caller must drain Output() concurrently — see updateQueueSize. func (p *process) Send(ctx context.Context, message string) error { + return p.SendBlocks(ctx, agentrun.TextBlock(message)) +} + +// SendBlocks transmits structured content blocks to the active session. +// Blocks until the turn completes (RPC response received) or ctx expires. +// The caller must drain Output() concurrently. +func (p *process) SendBlocks(ctx context.Context, blocks ...agentrun.ContentBlock) error { if p.stopping.Load() { return agentrun.ErrTerminated } @@ -100,6 +108,11 @@ func (p *process) Send(ctx context.Context, message string) error { default: } + // Validate blocks. + if err := agentrun.ValidateBlocks(blocks); err != nil { + return fmt.Errorf("acp: validate blocks: %w", err) + } + p.turnMu.Lock() defer p.turnMu.Unlock() @@ -109,8 +122,14 @@ func (p *process) Send(ctx context.Context, message string) error { } // --- Fence: wait for previous turn's RPC goroutine to exit --- - // Conn.Call returns immediately on ctx cancel (conn.go:110-112), - // so this wait is fast. Ensures clean RPC state before new turn. + if err := p.waitPreviousTurn(ctx); err != nil { + return err + } + return p.executeSendBlocks(ctx, blocks) +} + +// waitPreviousTurn blocks until the previous turn's RPC goroutine exits. +func (p *process) waitPreviousTurn(ctx context.Context) error { if p.rpcDone != nil { select { case <-p.rpcDone: @@ -120,7 +139,11 @@ func (p *process) Send(ctx context.Context, message string) error { return ctx.Err() } } + return nil +} +// executeSendBlocks executes the prompt RPC after locks and fences are resolved. +func (p *process) executeSendBlocks(ctx context.Context, blocks []agentrun.ContentBlock) error { // --- Create per-turn collectors --- td := &turnDenials{} ta := &turnAccumulator{} @@ -136,10 +159,20 @@ func (p *process) Send(ctx context.Context, message string) error { p.turnAccum.Store(nil) }() + // Translate agentrun.ContentBlock to acp.contentBlock + acpBlocks := make([]contentBlock, len(blocks)) + for i, b := range blocks { + acpBlocks[i] = contentBlock{ + Type: b.Type, + Text: b.Text, + Source: b.Source, + } + } + // Send session/prompt request. params := promptParams{ SessionID: p.sessionID, - Prompt: []contentBlock{{Type: "text", Text: message}}, + Prompt: acpBlocks, } var result promptResult diff --git a/engine/acp/protocol.go b/engine/acp/protocol.go index 10827c0..0987f93 100644 --- a/engine/acp/protocol.go +++ b/engine/acp/protocol.go @@ -152,10 +152,11 @@ type configOptionChoice struct { // --- Prompt --- -// contentBlock is a single content element in a prompt (MVP: text-only). +// contentBlock is a single content element in a prompt. type contentBlock struct { - Type string `json:"type"` - Text string `json:"text"` + Type string `json:"type"` + Text string `json:"text,omitempty"` + Source json.RawMessage `json:"source,omitempty"` } // promptParams sends a user message to the session. diff --git a/engine/cli/agy/agy.go b/engine/cli/agy/agy.go new file mode 100644 index 0000000..905b2cc --- /dev/null +++ b/engine/cli/agy/agy.go @@ -0,0 +1,180 @@ +package agy + +import ( + "errors" + "os" + "regexp" + "strings" + "sync/atomic" + + "github.com/dmora/agentrun" + "github.com/dmora/agentrun/engine/cli" + "github.com/dmora/agentrun/engine/cli/internal/jsonutil" + "github.com/dmora/agentrun/engine/cli/internal/optutil" +) + +// Session option keys specific to the agy CLI backend. +const ( + OptionDangerouslySkipPermissions = "agy.dangerously_skip_permissions" + OptionSandbox = "agy.sandbox" +) + +const defaultBinary = "agy" + +// validConversationID matches the UUID format used by agy conversation IDs. +var validConversationID = regexp.MustCompile( + `Created conversation ([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})`, +) + +// shellWrapper is the sh -c script that runs agy (via $0) and emits a JSON +// MessageResult sentinel on clean exit. Using "$0" instead of embedding the +// binary path in the script prevents shell injection via WithBinary. +const shellWrapper = `"$0" "$@"; _E=$?; [ $_E -eq 0 ] && printf '{"type":"result","stop_reason":"end_turn"}\n'; exit $_E` + +// Backend is an Antigravity CLI backend for agentrun. +// It implements cli.Spawner, cli.Parser, and cli.Resumer for a spawn-per-turn model. +type Backend struct { + binary string + logFile string // path to the temporary log file for capturing the session ID + resumeID atomic.Pointer[string] +} + +// Compile-time interface satisfaction checks. +var ( + _ cli.Backend = (*Backend)(nil) + _ cli.Spawner = (*Backend)(nil) + _ cli.Parser = (*Backend)(nil) + _ cli.Resumer = (*Backend)(nil) +) + +// Option configures a Backend at construction time. +type Option func(*Backend) + +// WithBinary overrides the Antigravity CLI binary path. +// Empty values are ignored; the default is "agy". +func WithBinary(path string) Option { + return func(b *Backend) { + if path != "" { + b.binary = path + } + } +} + +// New creates an agy CLI backend with the given options. +func New(opts ...Option) *Backend { + b := &Backend{binary: defaultBinary} + for _, opt := range opts { + opt(b) + } + return b +} + +// buildWrapperArgs wraps agyArgs + prompt in a sh -c invocation that emits +// the MessageResult sentinel on success. The binary is passed as argv[0] ($0) +// to avoid shell injection via binary path metacharacters. +func (b *Backend) buildWrapperArgs(agyArgs []string, prompt string) (string, []string) { + args := make([]string, 0, len(agyArgs)+5) + // argv[0] = b.binary (becomes $0 in the script); remaining args become $@. + args = append(args, "-c", shellWrapper, b.binary) + args = append(args, agyArgs...) + args = append(args, "--print", prompt) + return "sh", args +} + +// SpawnArgs builds exec.Cmd arguments for a new agy session. +func (b *Backend) SpawnArgs(session agentrun.Session) (string, []string) { + // Create a temp log file so agy records the new conversation ID. + // If creation fails we omit --log-file; ResumeArgs falls back to OptionResumeID. + if f, err := os.CreateTemp("", "agy-*.log"); err == nil { + f.Close() + b.logFile = f.Name() + } + + var agyArgs []string + if b.logFile != "" { + agyArgs = append(agyArgs, "--log-file", b.logFile) + } + agyArgs = appendSessionArgs(agyArgs, session) + + prompt := session.Prompt + if jsonutil.ContainsNull(prompt) { + prompt = "" + } + + return b.buildWrapperArgs(agyArgs, prompt) +} + +// ResumeArgs builds exec.Cmd arguments to resume an existing agy session. +func (b *Backend) ResumeArgs(session agentrun.Session, initialPrompt string) (string, []string, error) { + if jsonutil.ContainsNull(initialPrompt) { + return "", nil, errors.New("agy: initial prompt contains null bytes") + } + if err := optutil.ValidateModeHITL("agy", session.Options); err != nil { + return "", nil, err + } + + // Determine the conversation UUID to resume. + var uuid string + if captured := b.resumeID.Load(); captured != nil { + uuid = *captured + } + + if uuid == "" && b.logFile != "" { + // First resume: read the log file written by SpawnArgs to get the UUID. + if data, err := os.ReadFile(b.logFile); err == nil { + if m := validConversationID.FindSubmatch(data); len(m) == 2 { + uuid = string(m[1]) + b.resumeID.Store(&uuid) + } + } + _ = os.Remove(b.logFile) + b.logFile = "" + } + + // Fallback to explicitly-provided resume ID. + if uuid == "" { + uuid = session.Options[agentrun.OptionResumeID] + } + + if uuid == "" { + return "", nil, errors.New("agy: no conversation ID available (not captured from log and OptionResumeID not set)") + } + + agyArgs := []string{"--conversation", uuid} + agyArgs = appendSessionArgs(agyArgs, session) + + binary, args := b.buildWrapperArgs(agyArgs, initialPrompt) + return binary, args, nil +} + +func appendSessionArgs(args []string, session agentrun.Session) []string { + if session.Model != "" && !jsonutil.ContainsNull(session.Model) && !strings.HasPrefix(session.Model, "-") { + args = append(args, "--model", session.Model) + } + + args = optutil.AppendAddDirs(args, session.Options, "--add-dir") + args = appendPermissionArgs(args, session.Options) + + if sandbox, _, _ := agentrun.ParseBoolOption(session.Options, OptionSandbox); sandbox { + args = append(args, "--sandbox") + } + + return args +} + +// appendPermissionArgs applies the HITL/permission flag following the +// "root set → backend-specific option ignored" precedence rule. +func appendPermissionArgs(args []string, opts map[string]string) []string { + if hitl := opts[agentrun.OptionHITL]; hitl != "" { + // Root OptionHITL governs; backend-specific option is ignored. + if agentrun.HITL(hitl) == agentrun.HITLOff { + args = append(args, "--dangerously-skip-permissions") + } + return args + } + // Root not set: use backend-specific option. + if skip, _, _ := agentrun.ParseBoolOption(opts, OptionDangerouslySkipPermissions); skip { + args = append(args, "--dangerously-skip-permissions") + } + return args +} diff --git a/engine/cli/agy/agy_test.go b/engine/cli/agy/agy_test.go new file mode 100644 index 0000000..df372d7 --- /dev/null +++ b/engine/cli/agy/agy_test.go @@ -0,0 +1,123 @@ +package agy + +import ( + "os" + "path/filepath" + "strings" + "testing" + + "github.com/dmora/agentrun" +) + +func TestBackend_SpawnArgs(t *testing.T) { + b := New() + session := agentrun.Session{ + Prompt: "hello world", + Model: "gemini-test", + Options: map[string]string{ + OptionDangerouslySkipPermissions: "true", + }, + } + + bin, args := b.SpawnArgs(session) + if bin != "sh" { + t.Errorf("SpawnArgs binary = %q, want sh", bin) + } + + // args[2] is the binary (argv[0]/$0 inside the script), not a literal "sh". + if len(args) < 3 || args[0] != "-c" || args[2] != "agy" { + t.Errorf("SpawnArgs wrapper shell signature mismatch: %q", args) + } + + wrapperScript := args[1] + if !strings.Contains(wrapperScript, `"$0" "$@"`) { + t.Errorf("Wrapper script missing injection-safe invocation: %s", wrapperScript) + } + if !strings.Contains(wrapperScript, `{"type":"result","stop_reason":"end_turn"}`) { + t.Errorf("Wrapper script missing MessageResult sentinel: %s", wrapperScript) + } + + // The remaining args are passed to agy + agyArgs := args[3:] + + // Check log file injection + if len(agyArgs) < 2 || agyArgs[0] != "--log-file" { + t.Errorf("Missing --log-file injection: %q", agyArgs) + } + if b.logFile == "" { + t.Error("Backend.logFile not set by SpawnArgs") + } + + // Check model + foundModel := false + for i, arg := range agyArgs { + if arg == "--model" && i+1 < len(agyArgs) && agyArgs[i+1] == "gemini-test" { + foundModel = true + } + } + if !foundModel { + t.Errorf("Missing --model flag: %q", agyArgs) + } + + // Check prompt and skip perms + if agyArgs[len(agyArgs)-2] != "--print" || agyArgs[len(agyArgs)-1] != "hello world" { + t.Errorf("Prompt not properly appended: %q", agyArgs) + } + + foundSkip := false + for _, arg := range agyArgs { + if arg == "--dangerously-skip-permissions" { + foundSkip = true + } + } + if !foundSkip { + t.Errorf("Missing skip permissions flag: %q", agyArgs) + } +} + +func TestBackend_ResumeArgs(t *testing.T) { + b := New() + + // Mock a log file + tmpDir := t.TempDir() + logPath := filepath.Join(tmpDir, "test.log") + logContent := "server.go:753] Created conversation d8e79181-5db2-4ea9-88e2-eea15ddab587\n" + if err := os.WriteFile(logPath, []byte(logContent), 0600); err != nil { + t.Fatal(err) + } + + b.logFile = logPath + + session := agentrun.Session{ + Options: map[string]string{}, + } + + bin, args, err := b.ResumeArgs(session, "turn 2") + if err != nil { + t.Fatalf("ResumeArgs failed: %v", err) + } + + if bin != "sh" { + t.Errorf("ResumeArgs binary = %q, want sh", bin) + } + + agyArgs := args[3:] + if len(agyArgs) < 2 || agyArgs[0] != "--conversation" || agyArgs[1] != "d8e79181-5db2-4ea9-88e2-eea15ddab587" { + t.Errorf("ResumeArgs did not properly parse/inject conversation ID: %q", agyArgs) + } + + // Verify log file was deleted + if _, err := os.Stat(logPath); !os.IsNotExist(err) { + t.Errorf("Log file was not deleted after ResumeArgs") + } + + // Second resume should use atomic pointer + _, args2, err := b.ResumeArgs(session, "turn 3") + if err != nil { + t.Fatalf("Second ResumeArgs failed: %v", err) + } + agyArgs2 := args2[3:] + if len(agyArgs2) < 2 || agyArgs2[0] != "--conversation" || agyArgs2[1] != "d8e79181-5db2-4ea9-88e2-eea15ddab587" { + t.Errorf("Second ResumeArgs did not reuse conversation ID: %q", agyArgs2) + } +} diff --git a/engine/cli/agy/compliance_test.go b/engine/cli/agy/compliance_test.go new file mode 100644 index 0000000..fdb5156 --- /dev/null +++ b/engine/cli/agy/compliance_test.go @@ -0,0 +1,29 @@ +package agy_test + +import ( + "testing" + + "github.com/dmora/agentrun/engine/cli" + "github.com/dmora/agentrun/engine/cli/agy" + "github.com/dmora/agentrun/enginetest/clitest" +) + +func TestCompliance(t *testing.T) { + factory := func() cli.Backend { + return agy.New() + } + + t.Run("Spawner", func(t *testing.T) { + clitest.RunSpawnerTests(t, func() cli.Spawner { return factory() }) + }) + + // Note: We deliberately do NOT run clitest.RunParserTests here. + // clitest.RunParserTests assumes that the backend expects JSON and asserts + // that passing "not json" returns an error. The agy CLI backend is + // plain-text based and treats any non-JSON line as agentrun.MessageText. + // We verify its parser logic extensively in parse_test.go instead. + + t.Run("Resumer", func(t *testing.T) { + clitest.RunResumerTests(t, func() cli.Resumer { return factory().(cli.Resumer) }) + }) +} diff --git a/engine/cli/agy/doc.go b/engine/cli/agy/doc.go new file mode 100644 index 0000000..45ff421 --- /dev/null +++ b/engine/cli/agy/doc.go @@ -0,0 +1,2 @@ +// Package agy provides an agentrun.Engine backend for the Antigravity CLI. +package agy diff --git a/engine/cli/agy/parse.go b/engine/cli/agy/parse.go new file mode 100644 index 0000000..10efe65 --- /dev/null +++ b/engine/cli/agy/parse.go @@ -0,0 +1,34 @@ +package agy + +import ( + "strings" + "time" + + "github.com/dmora/agentrun" + "github.com/dmora/agentrun/engine/cli" +) + +// ParseLine implements cli.Parser for the agy backend. +// It maps plain text lines to agentrun.MessageText, and detects the +// synthesized MessageResult sentinel emitted by the shell wrapper. +func (b *Backend) ParseLine(line string) (agentrun.Message, error) { + trimmed := strings.TrimSpace(line) + if trimmed == "" { + return agentrun.Message{}, cli.ErrSkipLine + } + + // Detect the exact sentinel emitted by the shell wrapper. + if trimmed == `{"type":"result","stop_reason":"end_turn"}` { + return agentrun.Message{ + Type: agentrun.MessageResult, + StopReason: agentrun.StopEndTurn, + Timestamp: time.Now(), + }, nil + } + + return agentrun.Message{ + Type: agentrun.MessageText, + Content: line, + Timestamp: time.Now(), + }, nil +} diff --git a/engine/cli/agy/parse_test.go b/engine/cli/agy/parse_test.go new file mode 100644 index 0000000..2dfc479 --- /dev/null +++ b/engine/cli/agy/parse_test.go @@ -0,0 +1,87 @@ +package agy + +import ( + "errors" + "testing" + + "github.com/dmora/agentrun" + "github.com/dmora/agentrun/engine/cli" +) + +func TestParseLine(t *testing.T) { + b := New() + + tests := []struct { + name string + line string + want agentrun.Message + wantErr error + }{ + { + name: "empty line", + line: " \t ", + wantErr: cli.ErrSkipLine, + }, + { + name: "result sentinel", + line: `{"type":"result","stop_reason":"end_turn"}`, + want: agentrun.Message{ + Type: agentrun.MessageResult, + StopReason: agentrun.StopEndTurn, + }, + }, + { + name: "text output", + line: "Here is the code you requested:", + want: agentrun.Message{ + Type: agentrun.MessageText, + Content: "Here is the code you requested:", + }, + }, + { + name: "json-like text", + line: `{"type":"not_result"}`, + want: agentrun.Message{ + Type: agentrun.MessageText, + Content: `{"type":"not_result"}`, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got, err := b.ParseLine(tc.line) + if tc.wantErr != nil { + if !errors.Is(err, tc.wantErr) { + t.Fatalf("ParseLine(%q) err = %v, want %v", tc.line, err, tc.wantErr) + } + return + } + if err != nil { + t.Fatalf("ParseLine(%q) unexpected error: %v", tc.line, err) + } + if got.Type != tc.want.Type { + t.Errorf("Type = %v, want %v", got.Type, tc.want.Type) + } + if got.Content != tc.want.Content { + t.Errorf("Content = %v, want %v", got.Content, tc.want.Content) + } + if got.StopReason != tc.want.StopReason { + t.Errorf("StopReason = %v, want %v", got.StopReason, tc.want.StopReason) + } + }) + } +} + +func FuzzParseLine(f *testing.F) { + f.Add("") + f.Add(`{"type":"result","stop_reason":"end_turn"}`) + f.Add("Some random text output") + f.Add("\x00\x00\x00") + + b := New() + f.Fuzz(func(_ *testing.T, line string) { + // ParseLine should never panic + _, _ = b.ParseLine(line) + }) +} diff --git a/engine/cli/claude/args_test.go b/engine/cli/claude/args_test.go index a3d2d25..6a25165 100644 --- a/engine/cli/claude/args_test.go +++ b/engine/cli/claude/args_test.go @@ -8,6 +8,8 @@ import ( "github.com/dmora/agentrun" ) +const roleUser = "user" + // --- SpawnArgs tests --- func TestSpawnArgs_Base(t *testing.T) { @@ -759,14 +761,14 @@ func TestFormatInput(t *testing.T) { if err := json.Unmarshal(data[:len(data)-1], &parsed); err != nil { t.Fatalf("invalid JSON: %v", err) } - if parsed["type"] != "user" { + if parsed["type"] != roleUser { t.Errorf("type = %v, want user", parsed["type"]) } msg, ok := parsed["message"].(map[string]any) if !ok { t.Fatal("missing message field") } - if msg["role"] != "user" { + if msg["role"] != roleUser { t.Errorf("role = %v, want user", msg["role"]) } if msg["content"] != testPrompt { @@ -817,6 +819,88 @@ func TestFormatInput_Empty(t *testing.T) { } } +// --- FormatInputBlocks tests --- + +func TestFormatInputBlocks(t *testing.T) { + b := New() + blocks := []agentrun.ContentBlock{ + agentrun.TextBlock("describe this"), + agentrun.ImageBase64Block("image/png", "SGVsbG8="), + } + data, err := b.FormatInputBlocks(blocks) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if data[len(data)-1] != '\n' { + t.Error("output should end with newline") + } + + var parsed map[string]any + if err := json.Unmarshal(data[:len(data)-1], &parsed); err != nil { + t.Fatalf("invalid JSON: %v", err) + } + if parsed["type"] != roleUser { + t.Errorf("type = %v, want user", parsed["type"]) + } + msg, ok := parsed["message"].(map[string]any) + if !ok { + t.Fatal("missing message field") + } + if msg["role"] != roleUser { + t.Errorf("role = %v, want user", msg["role"]) + } + + // content should be a list of blocks + content, ok := msg["content"].([]any) + if !ok { + t.Fatalf("content is not an array: %T", msg["content"]) + } + if len(content) != 2 { + t.Fatalf("expected 2 content blocks, got %d", len(content)) + } + + b1, ok := content[0].(map[string]any) + if !ok { + t.Fatal("content[0] is not a map") + } + if b1["type"] != "text" || b1["text"] != "describe this" { + t.Errorf("unexpected content[0]: %v", b1) + } + + b2, ok := content[1].(map[string]any) + if !ok { + t.Fatal("content[1] is not a map") + } + if b2["type"] != "image" { + t.Errorf("unexpected content[1] type: %v", b2["type"]) + } + source, ok := b2["source"].(map[string]any) + if !ok { + t.Fatal("missing or invalid source field on content[1]") + } + if source["type"] != "base64" || source["media_type"] != "image/png" || source["data"] != "SGVsbG8=" { + t.Errorf("unexpected source: %v", source) + } +} + +func TestFormatInputBlocks_ValidationAndNullBytes(t *testing.T) { + b := New() + + // Empty blocks list + _, err := b.FormatInputBlocks(nil) + if err == nil { + t.Error("expected error for empty blocks list") + } + + // Null bytes in text block + _, err = b.FormatInputBlocks([]agentrun.ContentBlock{ + agentrun.TextBlock("hello\x00world"), + }) + if err == nil { + t.Error("expected error for text block with null bytes") + } +} + // --- Mode and HITL tests (SpawnArgs) --- func TestSpawnArgs_ModeAndHITL(t *testing.T) { diff --git a/engine/cli/claude/claude.go b/engine/cli/claude/claude.go index c75e8fe..4536241 100644 --- a/engine/cli/claude/claude.go +++ b/engine/cli/claude/claude.go @@ -88,6 +88,7 @@ var ( _ cli.Resumer = (*Backend)(nil) _ cli.Streamer = (*Backend)(nil) _ cli.InputFormatter = (*Backend)(nil) + _ cli.BlockFormatter = (*Backend)(nil) ) // Option configures a Backend at construction time. @@ -215,6 +216,31 @@ func (b *Backend) FormatInput(message string) ([]byte, error) { return append(data, '\n'), nil } +// FormatInputBlocks encodes user content blocks for delivery to a Claude stdin pipe. +// Validates blocks using agentrun.ValidateBlocks and checks for null bytes. +func (b *Backend) FormatInputBlocks(blocks []agentrun.ContentBlock) ([]byte, error) { + if err := agentrun.ValidateBlocks(blocks); err != nil { + return nil, fmt.Errorf("claude: %w", err) + } + for _, blk := range blocks { + if blk.Type == "text" && jsonutil.ContainsNull(blk.Text) { + return nil, errors.New("claude: text block contains null bytes") + } + } + stdinMsg := map[string]any{ + "type": "user", + "message": map[string]any{ + "role": "user", + "content": blocks, + }, + } + data, err := json.Marshal(stdinMsg) + if err != nil { + return nil, fmt.Errorf("claude: marshal stdin: %w", err) + } + return append(data, '\n'), nil +} + // baseArgs returns the common CLI flags for all command modes. func baseArgs() []string { return []string{ diff --git a/engine/cli/engine_test.go b/engine/cli/engine_test.go index a23e36b..34c6bdd 100644 --- a/engine/cli/engine_test.go +++ b/engine/cli/engine_test.go @@ -648,6 +648,112 @@ func TestSend_Stdin(t *testing.T) { _ = p.Stop(ctx) } +type testBlockStreamerBackend struct { + testStreamerBackend + formatBlocksFn func([]agentrun.ContentBlock) ([]byte, error) +} + +func (b *testBlockStreamerBackend) FormatInputBlocks(blocks []agentrun.ContentBlock) ([]byte, error) { + return b.formatBlocksFn(blocks) +} + +func TestSendBlocks_BlockFormatter(t *testing.T) { + b := &testBlockStreamerBackend{ + testStreamerBackend: testStreamerBackend{ + testBackend: testBackend{ + spawnFn: func(_ agentrun.Session) (string, []string) { + return binCat, nil + }, + parseFn: textParser, + }, + streamFn: func(_ agentrun.Session) (string, []string) { + return binCat, nil + }, + formatFn: func(msg string) ([]byte, error) { + return []byte(msg + "\n"), nil + }, + }, + formatBlocksFn: func(blocks []agentrun.ContentBlock) ([]byte, error) { + var sb strings.Builder + for _, blk := range blocks { + sb.WriteString(blk.Type + ":" + blk.Text + ";") + } + return []byte(sb.String() + "\n"), nil + }, + } + eng := cli.NewEngine(b) + p, err := eng.Start(testCtx(t), agentrun.Session{CWD: tempDir(t)}) + if err != nil { + t.Fatalf("Start: %v", err) + } + + bs, ok := p.(agentrun.BlockSender) + if !ok { + t.Fatal("expected process to satisfy BlockSender") + } + + err = bs.SendBlocks(testCtx(t), agentrun.TextBlock("hello"), agentrun.TextBlock("world")) + if err != nil { + t.Fatalf("SendBlocks: %v", err) + } + + msg := <-p.Output() + if msg.Content != "text:hello;text:world;" { + t.Fatalf("expected 'text:hello;text:world;', got %q", msg.Content) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = p.Stop(ctx) +} + +func TestSendBlocks_Fallback(t *testing.T) { + // Only implements InputFormatter, not BlockFormatter + b := &testStreamerBackend{ + testBackend: testBackend{ + spawnFn: func(_ agentrun.Session) (string, []string) { + return binCat, nil + }, + parseFn: textParser, + }, + streamFn: func(_ agentrun.Session) (string, []string) { + return binCat, nil + }, + formatFn: func(msg string) ([]byte, error) { + return []byte(msg + "\n"), nil + }, + } + eng := cli.NewEngine(b) + p, err := eng.Start(testCtx(t), agentrun.Session{CWD: tempDir(t)}) + if err != nil { + t.Fatalf("Start: %v", err) + } + + bs, ok := p.(agentrun.BlockSender) + if !ok { + t.Fatal("expected process to satisfy BlockSender") + } + + // Should degrade by extracting text and using FormatInput + err = bs.SendBlocks(testCtx(t), agentrun.TextBlock("hello"), agentrun.TextBlock("world")) + if err != nil { + t.Fatalf("SendBlocks: %v", err) + } + + msg1 := <-p.Output() + if msg1.Content != "hello" { + t.Fatalf("expected 'hello', got %q", msg1.Content) + } + msg2 := <-p.Output() + if msg2.Content != "world" { + t.Fatalf("expected 'world', got %q", msg2.Content) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = p.Stop(ctx) +} + func TestSend_Resume(t *testing.T) { b := &testResumerBackend{ testBackend: testBackend{ diff --git a/engine/cli/firstturn_test.go b/engine/cli/firstturn_test.go new file mode 100644 index 0000000..f438618 --- /dev/null +++ b/engine/cli/firstturn_test.go @@ -0,0 +1,112 @@ +//go:build !windows + +package cli_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/dmora/agentrun" + "github.com/dmora/agentrun/engine/cli" +) + +// TestRunFirstTurn_SpawnPerTurnDrainsWithoutResume is the regression test for +// the run_turn/session_start double-prompt bug: for a spawn-per-turn backend, +// the first turn is initiated by Start (prompt baked into the spawn), so +// RunFirstTurn must drain it WITHOUT calling Send/ResumeArgs. +// +// ResumeArgs here always errors, mimicking a resume-only backend whose session +// ID has not been captured yet (Codex thread.started, OpenCode step_start, agy +// log file). Before the fix, the harness called RunTurn → Send → ResumeArgs and +// failed with exactly this error. RunFirstTurn must not trigger it. +func TestRunFirstTurn_SpawnPerTurnDrainsWithoutResume(t *testing.T) { + resumeCalled := false + b := &testResumerBackend{ + testBackend: testBackend{ + spawnFn: func(s agentrun.Session) (string, []string) { + return binPrintf, []string{"%s\\n__RESULT__\\n", s.Prompt} + }, + parseFn: resultParser, + }, + resumeFn: func(_ agentrun.Session, _ string) (string, []string, error) { + resumeCalled = true + return "", nil, errors.New("no session ID captured yet") + }, + } + const prompt = "hello" + eng := cli.NewEngine(b) + p, err := eng.Start(testCtx(t), agentrun.Session{CWD: tempDir(t), Prompt: prompt}) + if err != nil { + t.Fatalf("Start: %v", err) + } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = p.Stop(ctx) + }() + + var msgs []agentrun.Message + err = agentrun.RunFirstTurn(testCtx(t), p, prompt, func(m agentrun.Message) error { + msgs = append(msgs, m) + return nil + }) + if err != nil { + t.Fatalf("RunFirstTurn: %v (resume must not be called for the first turn)", err) + } + if resumeCalled { + t.Error("ResumeArgs was called for the first turn; RunFirstTurn should drain only") + } + if len(msgs) != 2 || msgs[0].Content != prompt || msgs[1].Type != agentrun.MessageResult { + t.Fatalf("first turn messages = %+v; want [text:hello, result]", msgs) + } +} + +// TestRunFirstTurn_SpawnPerTurnThenResume verifies the full lifecycle: the +// first turn is drained (no resume), and the SECOND turn resumes normally. +func TestRunFirstTurn_SpawnPerTurnThenResume(t *testing.T) { + b := &testResumerBackend{ + testBackend: testBackend{ + spawnFn: func(s agentrun.Session) (string, []string) { + return binPrintf, []string{"%s\\n__RESULT__\\n", s.Prompt} + }, + parseFn: resultParser, + }, + resumeFn: func(_ agentrun.Session, prompt string) (string, []string, error) { + return binPrintf, []string{"%s\\n__RESULT__\\n", prompt}, nil + }, + } + eng := cli.NewEngine(b) + p, err := eng.Start(testCtx(t), agentrun.Session{CWD: tempDir(t), Prompt: "turn1"}) + if err != nil { + t.Fatalf("Start: %v", err) + } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = p.Stop(ctx) + }() + + var first []agentrun.Message + if err := agentrun.RunFirstTurn(testCtx(t), p, "turn1", func(m agentrun.Message) error { + first = append(first, m) + return nil + }); err != nil { + t.Fatalf("RunFirstTurn: %v", err) + } + if len(first) != 2 || first[0].Content != "turn1" { + t.Fatalf("first turn = %+v; want [turn1, result]", first) + } + + var second []agentrun.Message + if err := agentrun.RunTurn(testCtx(t), p, "turn2", func(m agentrun.Message) error { + second = append(second, m) + return nil + }); err != nil { + t.Fatalf("RunTurn turn2: %v", err) + } + if len(second) != 2 || second[0].Content != "turn2" { + t.Fatalf("second turn = %+v; want [turn2, result]", second) + } +} diff --git a/engine/cli/interfaces.go b/engine/cli/interfaces.go index 9d94f6d..5efc6d3 100644 --- a/engine/cli/interfaces.go +++ b/engine/cli/interfaces.go @@ -87,3 +87,17 @@ type Backend interface { type InputFormatter interface { FormatInput(message string) ([]byte, error) } + +// BlockFormatter encodes structured content blocks for delivery to a +// subprocess stdin pipe. BlockFormatter is optional — the CLIEngine +// discovers it via type assertion: +// +// if f, ok := backend.(BlockFormatter); ok { +// data, err := f.FormatInputBlocks(blocks) +// } +// +// A backend can implement Streamer, InputFormatter, BlockFormatter, or any +// combination of them. +type BlockFormatter interface { + FormatInputBlocks(blocks []agentrun.ContentBlock) ([]byte, error) +} diff --git a/engine/cli/process.go b/engine/cli/process.go index fc92ef5..be139a6 100644 --- a/engine/cli/process.go +++ b/engine/cli/process.go @@ -77,6 +77,7 @@ type process struct { } var _ agentrun.Process = (*process)(nil) +var _ agentrun.BlockSender = (*process)(nil) // sequentialProcess wraps a CLI process to satisfy agentrun.SequentialSender. // Used for spawn-per-turn backends (Resumer without Streamer). @@ -85,6 +86,7 @@ type sequentialProcess struct{ *process } func (s *sequentialProcess) SequentialSend() {} var _ agentrun.SequentialSender = (*sequentialProcess)(nil) +var _ agentrun.BlockSender = (*sequentialProcess)(nil) // newProcess creates and starts a process with its initial readLoop. func newProcess( @@ -129,24 +131,32 @@ func (p *process) Output() <-chan agentrun.Message { // Send transmits a user message to the subprocess. func (p *process) Send(ctx context.Context, message string) error { + return p.SendBlocks(ctx, agentrun.TextBlock(message)) +} + +// SendBlocks transmits structured content blocks to the subprocess. +func (p *process) SendBlocks(ctx context.Context, blocks ...agentrun.ContentBlock) error { if p.stopping.Load() { return agentrun.ErrTerminated } + // Validate blocks. + if err := agentrun.ValidateBlocks(blocks); err != nil { + return fmt.Errorf("cli: validate blocks: %w", err) + } + // Check if the session has ended. select { case <-p.done: // For Resumer backends, a clean subprocess exit (termErr == nil) is // the normal end of a turn, not the end of the session. Restart by // spawning a new subprocess with ResumeArgs. - // - // termErr must be read under mu because resumeAfterCleanExit resets - // it under the same lock — concurrent Send() calls would race otherwise. p.mu.Lock() cleanExit := p.caps.resumer != nil && p.termErr == nil p.mu.Unlock() if cleanExit { - return p.resumeAfterCleanExit(ctx, message) + text := agentrun.TextFromBlocks(blocks) + return p.resumeAfterCleanExit(ctx, text) } return agentrun.ErrTerminated default: @@ -154,12 +164,22 @@ func (p *process) Send(ctx context.Context, message string) error { // Path 1: stdin pipe (Streamer mode). if p.stdin != nil { - return p.sendStdin(message) + if bf, ok := p.backend.(BlockFormatter); ok { + data, err := bf.FormatInputBlocks(blocks) + if err != nil { + return fmt.Errorf("cli: format input blocks: %w", err) + } + return p.sendStdinData(data) + } + // Fallback: standard InputFormatter with text degradation. + text := agentrun.TextFromBlocks(blocks) + return p.sendStdin(text) } // Path 2: Resumer (subprocess replacement while running). if p.caps.resumer != nil { - return p.replaceSubprocess(ctx, message) + text := agentrun.TextFromBlocks(blocks) + return p.replaceSubprocess(ctx, text) } // Defensive guard — Start() validates send capability; unreachable. @@ -176,6 +196,11 @@ func (p *process) sendStdin(message string) error { if err != nil { return fmt.Errorf("cli: format input: %w", err) } + return p.sendStdinData(data) +} + +// sendStdinData writes pre-formatted data to the subprocess stdin pipe. +func (p *process) sendStdinData(data []byte) error { p.mu.Lock() stdin := p.stdin p.mu.Unlock() @@ -246,14 +271,31 @@ func (p *process) Err() error { } } -// finish sets the terminal error and closes output+done channels. -// Called exactly once via sync.Once. +// finalizeTurn finishes the process unless a concurrent Send has claimed +// subprocess replacement. The check and the close happen under p.mu so they +// are atomic with respect to a Send deciding which resume path to take. +func (p *process) finalizeTurn(err error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.replacing { + return + } + p.finishLocked(err) +} + +// finishLocked sets the terminal error and closes output+done channels. +// Called exactly once via sync.Once. Caller MUST hold p.mu. +// +// Closing under p.mu is what makes the turn boundary race-free: a concurrent +// Send decides between reusing the channels (replaceSubprocess) and allocating +// fresh ones (resumeAfterCleanExit) under the same lock, so it can never reuse +// a channel this function just closed. // // Close order matters: done must close before output so that Err() // returns the terminal error immediately after a consumer's range // over Output() exits. Closing output first creates a race — // the consumer goroutine can call Err() before done is closed. -func (p *process) finish(err error) { +func (p *process) finishLocked(err error) { p.finishOnce.Do(func() { p.termErr = err close(p.done) @@ -292,13 +334,7 @@ func (p *process) readLoop(ctx context.Context, stdout io.ReadCloser) { waitErr = agentrun.ErrTerminated } - p.mu.Lock() - replacing := p.replacing - p.mu.Unlock() - - if !replacing { - p.finish(waitErr) - } + p.finalizeTurn(waitErr) // Always signal cmdDone so Stop/replaceSubprocess can proceed. p.cmdDone <- struct{}{} @@ -570,7 +606,8 @@ func applyStopReasonCarryForward(msg *agentrun.Message, last agentrun.StopReason return last } -// replaceSubprocess performs the Resumer subprocess-replacement pattern. +// replaceSubprocess performs the Resumer subprocess-replacement pattern: the +// running subprocess is interrupted and a fresh one is spawned to resume. func (p *process) replaceSubprocess(ctx context.Context, message string) error { binary, args, err := p.caps.resumer.ResumeArgs(p.session, message) if err != nil { @@ -581,8 +618,29 @@ func (p *process) replaceSubprocess(ctx context.Context, message string) error { return fmt.Errorf("%w: %s: %w", agentrun.ErrUnavailable, binary, err) } - // Signal old process to terminate. + // Claim replacement under p.mu, re-checking the terminal state. The + // readLoop's exit defer (finalizeTurn) makes its finish decision under the + // same lock, so exactly one of two things happens here: + // - done still open: we set replacing, so the readLoop skips finish and + // the channels are preserved for the replacement subprocess. + // - done already closed: the subprocess finalized between our caller's + // check and now, closing the channels — so we must allocate fresh ones + // via spawnCleanResume rather than reuse the closed channels. This is + // the fix for the turn-boundary race. p.mu.Lock() + select { + case <-p.done: + // Mirror the outer SendBlocks guard: only resume on a clean exit. A turn + // that finalized with an error must surface as terminated, not be + // silently resumed (which would also discard the terminal error). + termErr := p.termErr + p.mu.Unlock() + if termErr != nil { + return agentrun.ErrTerminated + } + return p.spawnCleanResume(resolvedBinary, args) + default: + } p.replacing = true oldCancel := p.cancelRead if p.stdin != nil { @@ -613,8 +671,8 @@ func (p *process) replaceSubprocess(ctx context.Context, message string) error { func (p *process) failReplacement(err error) { p.mu.Lock() p.replacing = false + p.finishLocked(err) p.mu.Unlock() - p.finish(err) select { case p.cmdDone <- struct{}{}: default: @@ -643,7 +701,14 @@ func (p *process) resumeAfterCleanExit(ctx context.Context, message string) erro if err != nil { return fmt.Errorf("%w: %s: %w", agentrun.ErrUnavailable, binary, err) } + return p.spawnCleanResume(resolvedBinary, args) +} +// spawnCleanResume spawns a resume subprocess after the previous one has exited, +// allocating fresh output/done channels (and resetting finishOnce) so the new +// readLoop can finalize the next turn. Used by the normal clean-exit path and by +// replaceSubprocess when the previous subprocess finalized concurrently. +func (p *process) spawnCleanResume(resolvedBinary string, args []string) error { cmd, stdin, stdout, err := spawnCmd(resolvedBinary, args, p.session.CWD, p.caps.streamer != nil, p.env, p.opts.StderrWriter) if err != nil { return fmt.Errorf("cli: resume: %w", err) diff --git a/engine/cli/race_test.go b/engine/cli/race_test.go new file mode 100644 index 0000000..110836e --- /dev/null +++ b/engine/cli/race_test.go @@ -0,0 +1,131 @@ +//go:build !windows + +package cli_test + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/dmora/agentrun" + "github.com/dmora/agentrun/engine/cli" +) + +// drainToResult reads from the process output until MessageResult, returning +// without waiting for the channel to close. This mirrors what agentrun.RunTurn +// does: it returns as soon as the turn completes, then the caller sends the +// next turn. It is precisely this "send right after the result, before the +// subprocess's readLoop closes the channel" timing that exposes the +// turn-boundary race. +func drainToResult(ctx context.Context, p agentrun.Process) error { + for { + select { + case msg, ok := <-p.Output(): + if !ok { + return fmt.Errorf("output channel closed without a result: %w", p.Err()) + } + if msg.Type == agentrun.MessageResult { + return nil + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// TestResumeRace_SendImmediatelyAfterResult reproduces the spawn-per-turn +// turn-boundary race: Send is called right after MessageResult arrives, while +// the previous subprocess's readLoop is still running its exit defer. If Send's +// terminal-state check and the readLoop's finish() are not synchronized, the +// resume reuses an output channel that finish() just closed — dropping the +// resumed turn (closed channel, no result) or tripping the race detector. +// +// Run with -race. Many iterations because the bad interleaving is timing +// dependent; with printf's near-instant turns it reproduces readily. +func TestResumeRace_SendImmediatelyAfterResult(t *testing.T) { + b := &testResumerBackend{ + testBackend: testBackend{ + spawnFn: func(s agentrun.Session) (string, []string) { + return binPrintf, []string{"%s\\n__RESULT__\\n", s.Prompt} + }, + parseFn: resultParser, + }, + resumeFn: func(_ agentrun.Session, prompt string) (string, []string, error) { + return binPrintf, []string{"%s\\n__RESULT__\\n", prompt}, nil + }, + } + eng := cli.NewEngine(b) + p, err := eng.Start(testCtx(t), agentrun.Session{CWD: tempDir(t), Prompt: "turn0"}) + if err != nil { + t.Fatalf("Start: %v", err) + } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = p.Stop(ctx) + }() + + ctx := testCtx(t) + const turns = 300 + for i := 0; i < turns; i++ { + // Drain the current turn up to its result (channel stays open), then + // immediately resume — the exact RunTurn timing that triggers the race. + if err := drainToResult(ctx, p); err != nil { + t.Fatalf("turn %d: %v", i, err) + } + if err := p.Send(ctx, fmt.Sprintf("turn%d", i+1)); err != nil { + t.Fatalf("turn %d send: %v", i, err) + } + } +} + +// TestReplaceFallback_CrashedTurnNotSilentlyResumed covers the regression in the +// replaceSubprocess done-closed fallback: when a turn finalizes with an error +// WHILE a Send is mid-flight (after the outer SendBlocks check, before the inner +// locked re-check), the fallback must surface ErrTerminated — not silently +// resume the crashed session (which would also swallow the exit error). +// +// The interleaving is made deterministic: turn 1 emits its result, then stays +// alive briefly before exiting non-zero (so the outer check sees done open and +// routes to replaceSubprocess); resumeFn then blocks long enough for turn 1 to +// finalize with the exit error, so the inner locked re-check observes done +// closed with a non-nil termErr — exactly the fallback path. +func TestReplaceFallback_CrashedTurnNotSilentlyResumed(t *testing.T) { + b := &testResumerBackend{ + testBackend: testBackend{ + spawnFn: func(_ agentrun.Session) (string, []string) { + // Emit result, linger 150ms (done stays open past the outer check), + // then crash. + return binBash, []string{"-c", `printf 'x\n__RESULT__\n'; sleep 0.15; exit 1`} + }, + parseFn: resultParser, + }, + resumeFn: func(_ agentrun.Session, _ string) (string, []string, error) { + // Block past turn 1's crash so the inner re-check sees done closed. + time.Sleep(400 * time.Millisecond) + return binPrintf, []string{"resumed\\n__RESULT__\\n"}, nil + }, + } + eng := cli.NewEngine(b) + p, err := eng.Start(testCtx(t), agentrun.Session{CWD: tempDir(t), Prompt: "turn1"}) + if err != nil { + t.Fatalf("Start: %v", err) + } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = p.Stop(ctx) + }() + + if err := drainToResult(testCtx(t), p); err != nil { + t.Fatalf("drain turn1: %v", err) + } + // Send mid-flight: outer check sees done open -> replaceSubprocess; resumeFn + // blocks while turn 1 crashes; inner re-check must yield ErrTerminated. + err = p.Send(testCtx(t), "turn2") + if !errors.Is(err, agentrun.ErrTerminated) { + t.Fatalf("Send after crashed turn = %v; want ErrTerminated (no silent resume)", err) + } +} diff --git a/examples/interactive/main.go b/examples/interactive/main.go index b3bb00d..7bc2c53 100644 --- a/examples/interactive/main.go +++ b/examples/interactive/main.go @@ -104,8 +104,9 @@ func run(backendName, binaryName, argsStr, resumeID string) error { _ = proc.Stop(stopCtx) }() - // First turn — RunTurn handles all engine types. - if err := executeTurn(ctx, proc, firstPrompt); err != nil { + // First turn — RunFirstTurn handles all engine types (drains the + // Start-initiated turn for spawn-per-turn backends; sends for streaming/ACP). + if err := executeTurn(ctx, proc, firstPrompt, true); err != nil { return err } @@ -174,7 +175,7 @@ func repl(ctx context.Context, proc agentrun.Process, scanner *bufio.Scanner) er if line == "exit" || line == "quit" { break } - if err := executeTurn(ctx, proc, line); err != nil { + if err := executeTurn(ctx, proc, line, false); err != nil { return err } } @@ -183,14 +184,21 @@ func repl(ctx context.Context, proc agentrun.Process, scanner *bufio.Scanner) er return nil } -// executeTurn runs one conversation turn via RunTurn, printing messages -// with delta-aware formatting. -func executeTurn(ctx context.Context, proc agentrun.Process, message string) error { +// executeTurn runs one conversation turn, printing messages with delta-aware +// formatting. When first is true it uses [agentrun.RunFirstTurn], which drains +// the turn that Start already initiated for spawn-per-turn backends (Codex, +// OpenCode) instead of sending a redundant turn; subsequent turns use +// [agentrun.RunTurn]. +func executeTurn(ctx context.Context, proc agentrun.Process, message string, first bool) error { var sawDelta bool - return agentrun.RunTurn(ctx, proc, message, func(msg agentrun.Message) error { + handler := func(msg agentrun.Message) error { sawDelta = handleStreamingMessage(msg, sawDelta) return nil - }) + } + if first { + return agentrun.RunFirstTurn(ctx, proc, message, handler) + } + return agentrun.RunTurn(ctx, proc, message, handler) } // handleStreamingMessage prints a message with delta-aware formatting. diff --git a/runfirstturn_test.go b/runfirstturn_test.go new file mode 100644 index 0000000..0a47e2b --- /dev/null +++ b/runfirstturn_test.go @@ -0,0 +1,89 @@ +package agentrun + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" +) + +// TestRunFirstTurn_SequentialDrainsWithoutSend verifies that for a +// spawn-per-turn process (SequentialSender), RunFirstTurn drains the turn +// that Start already initiated and does NOT call Send (which would spawn a +// redundant — and for resume-only backends, broken — second turn). +func TestRunFirstTurn_SequentialDrainsWithoutSend(t *testing.T) { + proc := newSequentialMockProcess() + var sendCalls atomic.Int32 + proc.sendFn = func(_ context.Context, _ string) error { + sendCalls.Add(1) + return nil + } + + // The first turn's messages are already flowing (Start spawned it). + proc.output <- Message{Type: MessageText, Content: "turn 1 output"} + proc.output <- Message{Type: MessageResult} + + var got []Message + err := RunFirstTurn(context.Background(), proc, "the prompt", func(m Message) error { + got = append(got, m) + return nil + }) + if err != nil { + t.Fatalf("RunFirstTurn: %v", err) + } + if n := sendCalls.Load(); n != 0 { + t.Errorf("Send called %d times; want 0 (Start already initiated turn 1)", n) + } + if len(got) != 2 || got[0].Content != "turn 1 output" || got[1].Type != MessageResult { + t.Errorf("drained messages = %+v; want [text, result]", got) + } +} + +// TestRunFirstTurn_NonSequentialSends verifies that for a streaming/persistent +// process (not a SequentialSender), RunFirstTurn sends the prompt to initiate +// the first turn, then drains. +func TestRunFirstTurn_NonSequentialSends(t *testing.T) { + proc := newMockProcess() + var sentMessage atomic.Value + proc.sendFn = func(_ context.Context, message string) error { + sentMessage.Store(message) + // Emit the turn result once the prompt has been sent. + proc.output <- Message{Type: MessageResult} + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + var got []Message + err := RunFirstTurn(ctx, proc, "hello agent", func(m Message) error { + got = append(got, m) + return nil + }) + if err != nil { + t.Fatalf("RunFirstTurn: %v", err) + } + if v, _ := sentMessage.Load().(string); v != "hello agent" { + t.Errorf("Send received %q; want %q", v, "hello agent") + } + if len(got) != 1 || got[0].Type != MessageResult { + t.Errorf("drained messages = %+v; want [result]", got) + } +} + +// TestRunFirstTurn_SequentialChannelCloseNoResult verifies RunFirstTurn returns +// the process terminal error if the first turn's channel closes without a +// MessageResult (e.g., subprocess died before producing output). +func TestRunFirstTurn_SequentialChannelCloseNoResult(t *testing.T) { + proc := newSequentialMockProcess() + proc.termErr = ErrNoResult + proc.close() // channel closed immediately, no result + + err := RunFirstTurn(context.Background(), proc, "prompt", func(Message) error { + return nil + }) + if !errors.Is(err, ErrNoResult) { + t.Errorf("RunFirstTurn err = %v; want ErrNoResult", err) + } +} diff --git a/runturn.go b/runturn.go index 49d6175..5c296b7 100644 --- a/runturn.go +++ b/runturn.go @@ -42,6 +42,32 @@ func RunTurn(ctx context.Context, proc Process, message string, handler func(Mes return drainOutput(ctx, proc, sendCh, handler) } +// RunFirstTurn runs the first turn of a freshly started session, hiding the +// difference between the two backend execution models so callers don't have to +// branch on backend type: +// +// - Spawn-per-turn CLI backends (which satisfy [SequentialSender]) initiate +// the first turn during Start — the prompt is baked into the spawn command. +// RunFirstTurn drains that turn's output without sending. The prompt argument +// is ignored for these backends (it was already supplied via Session.Prompt +// at Start); pass it anyway for uniformity. +// +// - Streaming and persistent backends (e.g. Claude streaming, ACP) do not run +// a turn at Start. RunFirstTurn sends the prompt to initiate the first turn, +// then drains — identical to [RunTurn]. +// +// Use [RunTurn] for every subsequent turn. Like RunTurn, the handler is called +// for each message including MessageResult, and the caller should provide a +// context with a deadline or timeout. +func RunFirstTurn(ctx context.Context, proc Process, prompt string, handler func(Message) error) error { + if _, ok := proc.(SequentialSender); ok { + // Start already initiated the first turn (prompt baked into the spawn); + // drain it without sending to avoid spawning a redundant second turn. + return drainOutput(ctx, proc, nil, handler) + } + return RunTurn(ctx, proc, prompt, handler) +} + // drainOutput reads from proc.Output() until MessageResult, channel close, // or context cancellation. Checks sendCh for Send errors. func drainOutput(ctx context.Context, proc Process, sendCh <-chan error, handler func(Message) error) error {