From 71f864e2e10e90e4319835c4d1d3bc7debdbd80d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Ruiz=20Garc=C3=ADa?= Date: Wed, 24 Jun 2026 13:04:12 +0200 Subject: [PATCH 1/3] fix: recover from opencode stalls and control-plane timeouts --- .../resume-server-death-resilience-plan.md | 624 ++++++++++++++++++ README.md | 53 ++ tests/test_codecome_runner.py | 54 +- tests/test_harness_recovery_restart.py | 142 ++++ tests/test_runner_resume_health.py | 207 ++++++ tests/test_session.py | 43 ++ tests/test_sse_busy_resilience.py | 454 +++++++++++++ tools/codecome/harness.py | 138 +++- tools/codecome/phase_1.py | 226 ++++--- tools/codecome/runner.py | 144 +++- tools/codecome/session.py | 121 +++- tools/codecome/status.py | 29 + tools/events/base.py | 29 + tools/events/phase_loop.py | 163 ++++- tools/events/sse_client.py | 143 +++- tools/opencode/serve.py | 53 +- tools/phases/completion.py | 15 + 17 files changed, 2466 insertions(+), 172 deletions(-) create mode 100644 .project/resume-server-death-resilience-plan.md create mode 100644 tests/test_harness_recovery_restart.py create mode 100644 tests/test_runner_resume_health.py create mode 100644 tests/test_sse_busy_resilience.py create mode 100644 tools/codecome/status.py diff --git a/.project/resume-server-death-resilience-plan.md b/.project/resume-server-death-resilience-plan.md new file mode 100644 index 00000000..7af95fc3 --- /dev/null +++ b/.project/resume-server-death-resilience-plan.md @@ -0,0 +1,624 @@ +# Resume Resilience: Fast-Fail on Server Death, Exit-Code Logging, and Server Restart Recovery + +Date: 2026-06-12 +Status: Implemented +Follow-up issue: https://github.com/pruiz/CodeCome/issues/56 +Target: `tools/codecome/runner.py`, `tools/opencode/serve.py`, `tools/codecome/phase_1.py`, `tools/codecome/harness.py` + +--- + +## 1. Problem + +During a `make phase-1` run (PID 34685, 2026-06-10), Phase 1b failed with a 120-second +`ResumeSessionNotReady` timeout. Investigation of the serve log and transcript files +confirmed the sequence: + +1. Phase 1a (`ses_1515e017...`) completed cleanly — session went `idle`. +2. Phase 1b (`ses_1515a691...`) was created, ran sandbox validation (T1–T6 passed), + and was mid-response when **the opencode server process crashed**. +3. The serve log (`tmp/opencode-serve-34685-1781046310.log`, 69 lines) ends abruptly + with the Phase 1b session still `busy`. No `idle` transition, no error entry, + no exit code — the process simply died. +4. The auto-resume retry called `_wait_for_resume_idle()`, which polls + `get_session_status()` in a loop. Every call threw an HTTP exception + (server dead), returning `None`. This was recorded as `blocked_unknown` + (not `blocked_busy`) — 22 consecutive failures over 120 seconds. +5. The retry gave up with `ResumeSessionNotReady`. The user saw "exit code 2" + with a transcript reference, but **no information about the server crash**. + +Three cooperating weaknesses caused this wasteful 120-second failure: + +### Weakness 1: `_wait_for_resume_idle` cannot distinguish "server dead" from "session busy" + +`tools/codecome/runner.py:71-103` — The function polls `get_session_status()` in a loop +for up to 120s. It does not differentiate between: + +| Status | Meaning | Wait makes sense? | +|--------|---------|-------------------| +| `"busy"` | Server alive, session processing | Yes | +| `None` | Server unreachable / dead | **No** — will never resolve | + +Both paths get the same treatment: sleep, poll, repeat until timeout. When the server +is dead, the harness burns the full 120s doing nothing useful. + +### Weakness 2: Server exit code is never recorded + +`tools/opencode/serve.py` — `ServerRunner.start()` spawns opencode as a subprocess +(`start_new_session=True`), redirects stdout/stderr to a log file, but **never monitors +the child process for exit**. When the server crashes, nothing writes the exit code, +signal number, or crash reason to the log. The log just stops, leaving no diagnostic +trail. + +### Weakness 3: No server restart between retry attempts + +`tools/codecome/phase_1.py:408-598` (`_run_subphase` retry loop) and +`tools/codecome/harness.py:168-346` (`run_phase_mode` retry loop) — Both attempt retries +against the same server process. If the server died during the first attempt, every +retry will hit a dead server. Neither loop checks `runner.info.proc.poll()` or attempts +a server restart. + +For Phase 1 (subphases 1a/1b/1c), the server is started once and reused across +all subphases. A server crash during 1b also makes 1c unreachable — the user gets +multiple opaque failures with no indication of the root cause. + +--- + +## 2. Design + +### 2.1 Fix 1 — Fast-fail on server unavailability (`runner.py`) + +Add a consecutive-`None` counter to `_wait_for_resume_idle`. After N consecutive +`None` returns from `get_session_status()`, raise immediately with a diagnostic +message instead of polling for the full timeout. + +```python +# Pseudocode — not to be executed +CONSECUTIVE_NONE_MAX = int(os.environ.get("CODECOME_RESUME_SERVER_UNAVAILABLE_THRESHOLD", "3")) + +def _wait_for_resume_idle(...): + consecutive_none = 0 + while True: + status = get_session_status(...) + if status == "idle": + return + + if status is None: + consecutive_none += 1 + if consecutive_none >= CONSECUTIVE_NONE_MAX: + raise ResumeSessionNotReady( + f"server at {base_url} appears to be unreachable or dead " + f"after {consecutive_none} consecutive failed status checks; " + f"session {session_id} cannot be resumed" + ) + else: + consecutive_none = 0 + + event_type = "codecome.resume.blocked_busy" if status == "busy" else "codecome.resume.blocked_unknown" + _record_codecome_event(transcript, event_type, ...) + + if time.monotonic() >= deadline: + raise ResumeSessionNotReady(...) + time.sleep(max(poll_s, 0.1)) +``` + +The `blocked_unknown` event is still recorded (for transcript traceability), but the +wait cuts short once we have evidence the server is gone. The threshold is env-var +configurable so users can tune for slow/flaky networks. + +**Rationale**: This is the minimum-change, highest-impact fix. It turns a guaranteed +120-second waste into a ~15-second fast-failure (3 polls × 5s HTTP timeout). The +existing timeout path (for genuinely `busy` sessions) is untouched. + +### 2.2 Fix 2 — Capture server exit code in serve log (`serve.py`) + +Add a daemon thread to `ServerRunner` that monitors the child process and appends +an exit record to the serve log when the process terminates. + +```python +# Pseudocode — not to be executed +import threading + +def _monitor_child(proc: subprocess.Popen, log_path: Path) -> None: + exit_code = proc.wait() + with open(log_path, "a") as f: + if exit_code < 0: + f.write(f"opencode serve killed by signal {-exit_code} (exit code {exit_code})\n") + else: + f.write(f"opencode serve exited with code {exit_code}\n") + +# In ServerRunner.start(), after successful health check: +threading.Thread( + target=_monitor_child, args=(proc, log_path), + name=f"serve-monitor-{proc.pid}", daemon=True +).start() +``` + +The thread is daemon=True so it doesn't block process shutdown. It calls `proc.wait()` +which blocks until the child exits, then writes one line. The `ServerInfo` dataclass +already stores `proc` and `log_path`, so no new fields are needed. + +**Rationale**: With this, future server crashes will leave a trace like +`opencode serve killed by signal 9 (exit code -9)` (OOM kill) or +`opencode serve exited with code 1` (process error). Without it, all we see is +a log that stops mid-line with no explanation. + +### 2.3 Fix 3 — Detect server death and restart in retry loops + +Modify `_run_single_attempt` to accept an optional `ServerRunner` reference and +return a distinct finish reason when the server is unreachable (not just when the +session isn't ready). Then modify the calling retry loops to restart the server +and create a fresh session. + +**Step 3a — Return distinct reason for server death** (`runner.py`) + +Currently, `ResumeSessionNotReady` maps to `resume_not_ready` indiscriminately. +Introduce a new synthetic finish reason `server_unreachable` for the fast-fail +path from Fix 1 (when consecutive `None` threshold is hit). The existing timeout +path (session stays busy for 120s) keeps `resume_not_ready`. + +**Step 3b — Server-death propagation from `run_phase_1`** (`phase_1.py`) + +`_run_subphase` does NOT restart the server. When it detects `server_unreachable`, +it returns `RunStatus.SERVER_UNREACHABLE`. `run_phase_1()` converts this into a +`Phase1Outcome(status=RunStatus.SERVER_UNREACHABLE, failed_subphase="1a"|"1b"|"1c")`. + +`run_phase_1()` does **not** start, stop, or restart `opencode serve`. It only +reports which subphase failed so the harness can restart and re-enter at the +same subphase. + +Resume status polling distinguishes two cases. + +**Authoritative liveness signal: the child process handle, not HTTP probes.** + +A later incident (PID 57813, 2026-06-12, port `:61694`) proved that HTTP probes +are an unreliable death signal. During a long busy turn, opencode 1.17.4's HTTP +control plane — including `/session/status` AND `/global/health` — blocks and +times out while the process is perfectly alive and the session is still `busy`. +Three failed polls (~32s, dominated by 5s socket timeouts) wrongly declared the +server dead; the harness then SIGTERM'd a live, busy server and exhausted the +restart budget. + +Fix: + +- `_wait_for_resume_idle()` accepts an optional `liveness_check: Callable[[], bool]`. +- Phase callers build it via `make_liveness_check(runner)` in `opencode/serve.py`, + which consults `ServerInfo.proc.poll()` (the OS child handle). +- When `/session/status` returns `None`: + - `liveness_check()` is `True` (process alive): record + `codecome.resume.status_unavailable_process_alive`, do not count toward death, + keep waiting until the idle timeout (then `resume_not_ready`). + - `liveness_check()` is `False` (process exited): raise + `ResumeSessionServerUnreachable` immediately (`processExited=True`). + - No `liveness_check` provided (e.g. tests, non-phase callers): fall back to the + `/global/health` probe + `CODECOME_RESUME_SERVER_UNAVAILABLE_THRESHOLD`. +- Resume probe socket timeouts were lowered to `CODECOME_RESUME_PROBE_TIMEOUT` + (default 2s) so a poll cycle approximates `CODECOME_RESUME_IDLE_POLL` instead of + ~11s, letting the 120s idle window cover many polls. +- Chat mode is unaffected: it never calls `_run_single_attempt`/`_wait_for_resume_idle`. + +**Step 3c — Server restart in `run_phase_mode`** (`harness.py`) + +The harness owns the `ServerRunner` for all phases. For Phase 1 it calls +`run_phase_1(..., start_at="1a")`. If the outcome is `SERVER_UNREACHABLE`, the +harness restarts the server with `runner.restart(...)`, then re-enters +`run_phase_1(..., start_at=outcome.failed_subphase)`. This avoids rerunning +completed subphases. + +For phases 2-6, the existing harness retry loop also handles `server_unreachable` +with `runner.restart(...)`. + +**Rationale**: Without server restart, a crash during any subphase/phase means +the entire run fails irrecoverably. With restart, the harness can self-heal and +give the user a completed phase rather than a cryptic exit code 2. + +### 2.4 Design principle: `_run_subphase` never touches the server + +`_run_subphase` receives a `runner: ServerRunner` and `base_url: str` from its +caller so it can use the current server password and endpoint. It does **not** +own the server lifecycle. When it detects that the server died +(`server_unreachable`), it returns `RunStatus.SERVER_UNREACHABLE`. + +`run_phase_1()` also does not own server lifecycle. It returns a structured +`Phase1Outcome` to the harness. Only the harness calls `runner.restart(...)`. + +### 2.5 `ServerRunner.restart()` method + +Added to `tools/opencode/serve.py` — a single method that encapsulates the +stop/start sequence, so callers don't duplicate the launch logic: + +```python +def restart(self, **kwargs: Any) -> ServerInfo: + self.stop() + return self.start(**kwargs) +``` + +### 2.6 Scope / non-goals + +- **Not fixing** why the opencode server crashed — that's an opencode bug + (likely OOM, segfault, or provider disconnect handling). This plan focuses + on resilience. +- **Not adding** a general-purpose server health-check heartbeat — the existing + health URL is sufficient; we just need to use it. +- **Not backporting** to `ServerRunner._kill` or the signal handling — those + are working correctly for normal shutdown. + +--- + +## 3. Implementation order + +| Step | File | Complexity | Description | +|------|------|-----------|-------------| +| Fix 1 — fast-fail | `runner.py:71-103` | Low | Consecutive-`None` counter in `_wait_for_resume_idle` | +| Fix 2 — exit-code logging | `serve.py:126-218` | Low | Daemon monitor thread, `_start_exit_monitor` | +| `restart()` method | `serve.py:239-247` | Low | Single stop/start encapsulation on `ServerRunner` | +| Fix 3a — `server_unreachable` reason | `runner.py:26-28, 215-226` | Low | `ResumeSessionServerUnreachable` subclass + distinct finish reason | +| Fix 3a.1 — health-confirm server death | `session.py`, `runner.py` | Low | Status probe failures only consume restart budget when `/global/health` also fails | +| Status enum | `status.py` | Low | Explicit `RunStatus` values replace magic return code 3 | +| Fix 3 — Phase 1 propagation | `phase_1.py` | Medium | `Phase1Outcome(status, failed_subphase)`; no server lifecycle in phase code | +| Fix 3 — server restart in harness | `harness.py` | Medium | `runner.restart()` called from harness retry loop; Phase 1 re-enters at failed subphase | +| Resume opener for `server_unreachable` | `completion.py:429-475` | Low | Context-specific resume prompt opener | + +Recommended order: 1 → 2 → 3a → 3b → 3c. + +Fixes 1 and 2 can be done independently. Fix 3a is a prerequisite for 3b/3c. + +--- + +## 4. Testing + +### 4.1 Unit tests for Fix 1 + +- Mock `get_session_status` to return `None` repeatedly → assert + `ResumeSessionNotReady` raised after `CONSECUTIVE_NONE_MAX` polls. +- Mock to return `"busy"` for 120s → assert timeout path still works. +- Mock to return alternating `None`/`"busy"` → assert counter resets on non-`None`. + +### 4.2 Integration test for Fix 3 + +- Kill the opencode server mid-phase (SIGKILL) → assert harness detects + `server_unreachable`, restarts server, and retries with a fresh session. + +### 4.3 Regression test for Fix 2 + +- Start server, verify normal exit produces `exited with code 0` in serve log. +- Start server, kill with SIGTERM, verify `killed by signal 15` in serve log. + +--- + +## 5. Open questions + +1. For Fix 1, is `CONSECUTIVE_NONE_MAX=3` a reasonable default? Each poll involves + a 5-second HTTP timeout on the status endpoint, so 3 consecutive failures ≈ 15s + before fast-failing. Could be 5 for flaky networks. +2. For Fix 3, should the server-restart budget be capped (e.g., 2 restarts)? + Without a cap, a persistently-crashing server could loop forever. + +--- + +## 6. Fix 4 — Do not abandon a still-busy turn (the real root cause) + +Date: 2026-06-15 +Status: Implemented +Verified against: opencode v1.17.7 (issue reproduced identically on 1.17.7). + +### 6.1 What actually happened + +Across PIDs 57813, 73953, and 11183, the failure was **never** a dead server. +The serve logs show the Phase 1b session staying `busy` continuously until the +harness itself SIGTERM'd it. The sequence: + +1. A long model turn runs. opencode emits `session.status:{type:"busy"}` on the + SSE `/event` stream throughout. +2. The first attempt's `PhaseEventLoop` stops **before** the terminal + `session.status:{type:"idle"}` — the SSE stream dropped and the + `SseClient` reconnect budget (`max_reconnects=10`) / heartbeat timeout + exhausted while the turn was still genuinely running. +3. The loop returned a non-terminal `RunResult` with `last_finish_reason="tool-calls"` + (a `_FINISH_MID_TURN` reason — really just a step boundary), so CodeCome + misread a still-running turn as a "mid-turn cutoff" and triggered auto-resume. +4. The resume pre-flight (`_wait_for_resume_idle`) then polled the REST + `GET /session/status` endpoint, which is workspace-`forward`ed and unreliable + while the target session is in a long busy turn — returning nothing for the + full 120s → `ResumeSessionNotReady` (exit code 2). + +### 6.2 Source confirmation (opencode v1.17.7) + +- `packages/opencode/src/server/routes/.../handlers/session.ts`: `status` handler + returns `Object.fromEntries(statusSvc.list())`. +- `packages/opencode/src/session/status.ts`: on `idle`, the session is + **deleted** from the status map (`data.delete(sessionID)`); `busy`/`retry` are + stored. So idle = absent, busy = `{type:"busy"}`. +- `packages/opencode/src/server/shared/workspace-routing.ts`: `/session/status` + is `action:"forward"`. The REST status call is therefore subject to + forwarding/blocking under load — which is why our polls returned nothing while + the SSE stream still reported `busy`. + +Conclusion: **the SSE `/event` stream is the authoritative busy/idle source.** +The REST `/session/status` poll in the resume pre-flight is redundant and flaky. + +### 6.3 The fix + +`SseClient` gains an optional `should_continue: Callable[[], bool]`. +`PhaseEventLoop` supplies `_should_keep_consuming()`, which is True while: + +- the last SSE-observed `session.status` for the session is `busy`, AND +- the server process is alive (`liveness_check`, built from + `ServerInfo.proc.poll()` via `make_liveness_check`). + +When `should_continue()` is True, `SseClient` keeps reconnecting past +`max_reconnects` (resetting the counter to keep backoff bounded) instead of +raising `SSE reconnect budget exhausted`. A long model turn is therefore never +abandoned: the loop keeps consuming until it observes the genuine terminal +`idle` (or the process dies / true terminal finish reason). + +Consequence: a still-busy turn is no longer misclassified as a mid-turn cutoff, +so the auto-resume + `_wait_for_resume_idle` path fires only on genuine +terminal-but-incomplete states. The flaky REST status poll is no longer on the +hot path for long turns. + +Wiring: `liveness_check` is threaded +`harness/phase_1 → _run_single_attempt → _consume_events → PhaseEventLoop → +SseClient.should_continue`. Chat mode is unaffected (it never calls +`_run_single_attempt`/`PhaseEventLoop` with these hooks). + +### 6.4 Tests + +`tests/test_sse_busy_resilience.py`: +- `SseClient` honors the budget without `should_continue`. +- `SseClient` keeps consuming past the budget while `should_continue` is True, + and stops when it flips to False (or raises). +- `PhaseEventLoop._should_keep_consuming()` truth table: not-busy → False; + busy+alive → True; busy+dead → False; busy+no-liveness → True; idle clears + busy; `session.idle` event clears busy. + +--- + +## 7. Fix 5 — Bounded busy-wait (stall watchdog) + restart/retry + +Date: 2026-06-15 +Status: Implemented +Verified against: opencode v1.17.7 (live hang reproduced, PID 58436). + +### 7.1 What happened + +Fix 4 (keep consuming while busy+alive) removed false server kills but +introduced an **unbounded** wait: a genuinely hung model turn (provider stalled +mid-generation) keeps the session `busy` with the process alive forever, so the +SSE loop never returns. + +Live evidence (PID 58436): the model ran two bash validation tool calls that +completed, the step ended with finish reason `tool-calls` (a step boundary, not +a turn end), it began the next assistant message, and then the provider produced +no further tokens. CodeCome's transcript froze; the serve log kept emitting +`busy` for ~11 min, then went silent entirely while the process stayed alive. + +### 7.2 The fix + +A no-progress watchdog in `PhaseEventLoop`: + +- `_note_progress(event)` updates `_last_progress_at` on **every** SSE event + except `server.heartbeat` and `server.connected` (pure connection lifecycle). +- `_stalled()` is True once `now - _last_progress_at >= CODECOME_BUSY_STALL_TIMEOUT` + (default **180s**; `0` disables). +- `_should_keep_consuming()` returns False (and sets `_session_stalled`) once + stalled, even while busy+alive — so the SSE client stops at the next + reconnect/heartbeat checkpoint instead of waiting forever. +- The stalled run surfaces `RunResult.session_stalled=True` and + `last_finish_reason="session_stalled"`. + +Propagation and recovery: + +- `runner.py` records `codecome.session.stalled` and returns the stalled result. +- `phase_1._run_subphase` maps it to new status `RunStatus.SESSION_STALLED`; + `run_phase_1` returns `Phase1Outcome(SESSION_STALLED, failed_subphase)`. +- `phases/completion.py` adds a `session_stalled` resume opener (no PROMPT_EXTRA). +- `harness.py` treats `session_stalled` exactly like `server_unreachable`: + restart the server and retry, **sharing the same `CODECOME_MAX_SERVER_RESTARTS` + budget** (default 2), with a clear `[Auto-Recovery] ... stalled ...` message. + +### 7.3 De-duplication + +The restart+retry logic was duplicated across the Phase 1 branch and the two +Phase 2-6 recovery blocks. Consolidated: + +- `_RECOVERABLE_RESTART_REASONS` maps `server_unreachable` / `session_stalled` + to their user-facing wording. +- `_restart_server(runner, *, log_level)` centralizes the single + `runner.restart(...)` invocation. +- Phases 2-6 now use one recovery block driven by `_recovery_reason`; Phase 1 + uses the shared map + helper. + +### 7.4 Tests + +- `tests/test_sse_busy_resilience.py`: stall stops consuming even when busy+alive; + non-heartbeat events reset the timer; heartbeat/connected do not; timeout=0 + disables; `RunResult.session_stalled` default. +- `tests/test_runner_resume_health.py`: `_run_subphase` maps a stalled run to + `SESSION_STALLED`. +- `tests/test_harness_recovery_restart.py`: both reasons restart+retry then + succeed; shared budget exhaustion yields a non-zero terminal status. + +--- + +## 8. Fix 6 — Reachable stall watchdog on a silent-but-open SSE stream + +Date: 2026-06-15 +Status: Implemented +Verified against: opencode v1.17.7 (live hang reproduced, PID 74577). + +### 8.1 Root cause (confirmed in opencode v1.17.7 source) + +`packages/opencode/src/server/routes/instance/httpapi/handlers/event.ts` builds +the `/event` SSE response by merging the event output with +`Stream.tick("10 seconds")` heartbeats, scheduled on the Effect runtime. +`global.ts` does the same for `/global/health`. When a model turn (or a wedged +`@explore` subagent) hangs synchronously, the Effect runtime scheduler blocks and +**no heartbeats fire on either stream**. The live hang (PID 74577) had **0 +`server.heartbeat` events** in the serve log; the HTTP connection stayed open +(`X-Accel-Buffering: no`, no FIN) but silent. + +CodeCome's stall watchdog (Fix 5) was logically correct but **unreachable** here: +- `_should_keep_consuming()` (which holds the 180s no-progress check) is only + evaluated at SSE reconnect/budget checkpoints. +- The heartbeat-timeout check lived in `_on_event`, which only runs when an event + arrives. +- The blocking line read (`for byte_line in resp`) did not surface a wall-clock + timeout on a silent-but-open socket, so neither check ever ran. + +### 8.2 The fix (read tick; no background threads → chat-safe) + +`SseClient._open_stream` now: +- Sets an explicit per-read socket timeout via `_set_stream_timeout(resp, tick)` + with `tick = CODECOME_SSE_READ_TICK` (default 10s). +- Reads with manual `resp.readline()` and converts `socket.timeout`/`TimeoutError` + into an inactivity **tick** (`yield None`) instead of blocking forever; EOF + becomes `SSE stream closed by server`. + +`SseClient.events()` handles a `None` tick by evaluating `should_continue()` +(when provided): if it returns False (e.g. stall watchdog tripped), it stops the +stream cleanly. With `should_continue=None` (chat mode) ticks are ignored and the +loop keeps reading — chat behavior is unchanged, and chat consumers never see a +`None` (ticks are absorbed inside `events()`). + +This makes Fix 5's 180s no-progress watchdog reachable on a silent-but-open +stream: a stall is detected within ~one tick (~10s) of crossing 180s (worst case +~190s). + +### 8.3 Heartbeat-loss as a secondary (faster) signal + +Heartbeat detection is now **independent of heartbeat arrival** and no longer +false-positives when a server emits none: +- `_on_event` records heartbeat arrival but does not raise on stale heartbeats; + a late non-heartbeat event is valid progress and must not be discarded. +- `SseClient.seconds_since_heartbeat()` exposes the gap (or None if never seen). +- `PhaseEventLoop._stalled()` keeps an optional secondary trigger: if heartbeats + were seen and then stopped for `CODECOME_HEARTBEAT_STALL_TIMEOUT` while busy, + flag a stall early. This is disabled by default because live Phase 1a runs + showed opencode can pause heartbeats during valid long turns. + +### 8.4 New env knobs + +- `CODECOME_SSE_READ_TICK` (default 10s) — inactivity read tick / socket timeout. +- `CODECOME_HEARTBEAT_STALL_TIMEOUT` (default 0s / disabled) — optional + secondary heartbeat-gap stall. +- (existing) `CODECOME_BUSY_STALL_TIMEOUT` (default 180s) — primary no-progress. + +### 8.5 Tests + +`tests/test_sse_busy_resilience.py`: +- `None` tick + `should_continue()==False` stops the stream; `==True` keeps + reading; `should_continue=None` (chat) ignores ticks and still delivers events. +- Heartbeat-loss triggers a stall; never-seen heartbeats do not trigger via the +- Heartbeat-loss can trigger a stall only when explicitly enabled; never-seen + heartbeats do not trigger via the heartbeat path; a recent heartbeat does not + trigger. +- `_on_event` does not discard late events after a heartbeat gap; + `seconds_since_heartbeat()` reflects the gap after a heartbeat. + +--- + +## 9. Fix 7 — Avoid false stalls when SSE misses terminal idle + +Date: 2026-06-16 +Status: Implemented +Verified against: opencode v1.17.7 (Phase 1a restart-budget exhaustion). + +### 9.1 What happened + +Live Phase 1a attempts produced valid model/tool progress and then CodeCome +classified the turn as `session_stalled` after exhausting the restart budget. +The transcript ended near a pending write/tool boundary plus heartbeat/reconnect +events, but the matching serve log later showed the main Phase 1a session reached +`idle` before CodeCome killed the server. + +### 9.2 Root cause + +Two independent false-positive paths combined: + +- `SseClient._on_event()` treated a stale heartbeat as fatal on any subsequent + non-heartbeat event, so a valid late event (including a terminal idle) could be + dropped instead of delivered to `PhaseEventLoop`. +- `PhaseEventLoop` cached `_session_busy=True` from the last SSE status and did + not verify `/session/status` before turning the no-progress timeout into a + hard `session_stalled` result. opencode's status endpoint only returns busy + sessions; absence from that map means idle. + +### 9.3 The fix + +- `SseClient._on_event()` no longer raises on stale heartbeats; heartbeat gaps + are telemetry, while silence is handled by read ticks plus the phase watchdog. +- `CODECOME_HEARTBEAT_STALL_TIMEOUT` defaults to `0` (disabled) and is documented + as optional/diagnostic. +- Before marking a busy turn stalled, `PhaseEventLoop` probes `/session/status` + with the existing short probe timeout. If the target session is idle, it clears + `_session_busy`, records `_session_idle_via_status`, and exits without setting + `session_stalled`. +- When idle is observed via status probe, `PhaseEventLoop` performs a final + `_sync_session_messages()` before returning so a missed terminal `step_finish` + can still be recovered from persisted session messages. +- `runner.py` now records `codecome.session.stalled` on the normal result path; + the previous marker was unreachable after an unconditional return. + +### 9.4 Tests + +`tests/test_sse_busy_resilience.py` now covers: + +- stale heartbeat followed by a valid late event does not raise; +- status-probe idle prevents a false stall; +- status-probe busy still yields a true stall; +- heartbeat-gap stalls are disabled by default; +- final sync after status-idle recovery can recover a terminal `step_finish`. + +--- + +## 10. Fix 8 — Treat control-plane timeouts as recoverable server unresponsiveness + +Date: 2026-06-16 +Status: Implemented +Verified against: opencode v1.17.7 (Phase 1c `POST /session` timeout after CodeQL). + +### 10.1 What happened + +After Phase 1b completed and CodeQL ran successfully, Phase 1c failed before it +could create a session: + +- transcript contained only `codecome.attempt.started` and + `codecome.attempt.failed` with `TimeoutError: timed out`; +- there was no `codecome.session.ready`, so failure happened inside + `create_session()` (`POST /session`, hard-coded 10s timeout); +- the matching serve log showed the previous Phase 1b session reached `idle` + around the same moment, so the server process was alive but the HTTP control + plane was temporarily unresponsive/backpressured. + +### 10.2 Root cause + +`create_session()` did not retry transient failures and `_run_single_attempt()` +treated a `POST /session` timeout as a generic fatal `RunStatus.ERROR`. That +bypassed the Phase 1 harness recovery path, which only restarts/retries on +structured `server_unreachable` or `session_stalled` outcomes. + +Prompt submission had a related gap: `send_prompt_to_session()` retried transient +failures, but if retries were exhausted it raised a generic `RuntimeError`, also +classified as fatal instead of recoverable infrastructure unresponsiveness. + +### 10.3 The fix + +- `codecome.session.OpenCodeRequestError` now carries `retriable` and `operation` + metadata for opencode HTTP failures. +- `create_session()` retries transient failures (`TimeoutError`, `URLError`, + `OSError`, HTTP 5xx, HTTP 429) with the existing backoff schedule. +- Exhausted transient `create_session()` and `send_prompt_to_session()` failures + are mapped by `_run_single_attempt()` to `RunStatus.INCOMPLETE` with + `RunResult.last_finish_reason="server_unreachable"`. +- Phase 1 and phase harness wording now says the server is unreachable or + unresponsive, not only dead. +- Non-retriable failures (for example empty session IDs or HTTP 4xx) remain fatal. + +### 10.4 Tests + +- `tests/test_session.py`: create-session timeouts retry and eventually succeed; + exhausted create-session and prompt-send timeouts raise retriable + `OpenCodeRequestError`. +- `tests/test_codecome_runner.py`: retriable prompt-send and create-session + failures return a recoverable `server_unreachable` result rather than invoking + the fatal-error path. diff --git a/README.md b/README.md index 84089728..4f0b56e6 100644 --- a/README.md +++ b/README.md @@ -501,6 +501,59 @@ CodeCome ships reusable phase prompts under `prompts/`: CODECOME_MODEL= # pin model per phase, e.g. anthropic/claude-opus-4-7 CODECOME_MODEL_VARIANT= # pin model variant, e.g. high, max +### Resilience and recovery environment variables + +CodeCome consumes the opencode SSE event stream and owns the `opencode serve` +lifecycle. These knobs tune how it detects a dead, unresponsive, or hung +server/session and how it recovers. Defaults are sensible; override only if you +hit edge cases. + + # --- Stall detection (hung model turn while the session stays "busy") --- + CODECOME_BUSY_STALL_TIMEOUT=180 # seconds with no meaningful SSE event + # (heartbeats/connected don't count) + # before a busy turn is treated as + # stalled. 0 disables. + CODECOME_SSE_READ_TICK=10 # SSE socket read tick (seconds). Forces + # the reader to wake on a silent-but- + # open stream so the stall watchdog + # can run. Lower = faster detection. + CODECOME_HEARTBEAT_STALL_TIMEOUT=0 # optional diagnostic signal: if + # server.heartbeat was flowing and + # then stops for this long while busy, + # flag a stall early. Disabled by + # default because opencode can pause + # heartbeats during valid long turns. + + # --- Server restart / retry budget (shared by death + stall recovery) --- + CODECOME_MAX_SERVER_RESTARTS=2 # how many times CodeCome restarts + # opencode serve and retries the + # failed phase/subphase before giving + # up (covers server death or + # unresponsiveness and session stalls). + CODECOME_MAX_FATAL_RETRIES=2 # retries for transient infrastructure + # errors (timeouts, connection blips). + CODECOME_MAX_ITERATION_RETRIES= # auto-resume budget for genuine mid-turn + # model/provider cutoffs (default 1 for + # Phase 1 subphases, 3 for phases 2-6). + + # --- Resume readiness (waiting for an existing session to go idle) --- + CODECOME_RESUME_IDLE_TIMEOUT=120 # max seconds to wait for a resumed + # session to report idle. + CODECOME_RESUME_IDLE_POLL=1 # poll interval (seconds) while waiting. + CODECOME_RESUME_PROBE_TIMEOUT=2 # per-probe HTTP timeout for + # /session/status and /global/health. + CODECOME_RESUME_SERVER_UNAVAILABLE_THRESHOLD=3 + # consecutive failed status+health probes + # (no process-liveness signal) before + # declaring the server unreachable. + +Recovery behavior: when a server death or a session stall is detected, CodeCome +restarts `opencode serve` and retries the affected phase (Phase 1 re-enters at the +failed subphase `1a`/`1b`/`1c`); both conditions draw from the single +`CODECOME_MAX_SERVER_RESTARTS` budget. A long but healthy model turn is never +abandoned — CodeCome keeps consuming the stream as long as the session is `busy` +and the server process is alive, up to the stall timeout. + ### Model resolution and thinking display The wrapper resolves the effective model in this order: diff --git a/tests/test_codecome_runner.py b/tests/test_codecome_runner.py index c5c2d369..a487c9b0 100644 --- a/tests/test_codecome_runner.py +++ b/tests/test_codecome_runner.py @@ -162,7 +162,11 @@ def test_run_single_attempt_records_prompt_timeout(mock_args, mock_console, monk monkeypatch.setattr(runner, "_consume_events", lambda *a, **kw: RunResult()) def fake_send(*_a, **_kw): - raise TimeoutError("timed out") + raise runner.OpenCodeRequestError( + "Failed to send prompt: timed out", + retriable=True, + operation="send_prompt", + ) monkeypatch.setattr(runner, "send_prompt_to_session", fake_send) @@ -172,23 +176,55 @@ def fake_send(*_a, **_kw): fake_transcript.write_event.side_effect = events.append monkeypatch.setattr(Transcript, "for_phase", classmethod(lambda cls, p, f: fake_transcript)) - fatal_errors = [] code, session_id, _res, _path = runner._run_single_attempt( mock_args, mock_console, "do work", "model", "var", "http://base", "auth", "dir", lambda *a: None, - emit_fatal_error_fn=lambda _console, _title, msg: fatal_errors.append(msg), + emit_fatal_error_fn=lambda *_a: pytest.fail("retriable prompt timeout should not be fatal"), ) - assert code == 1 - assert session_id == "" - assert fatal_errors == ["timed out"] + assert code == 2 + assert session_id == "new_session" event_types = [event["type"] for event in events] assert "codecome.prompt.send_started" in event_types assert "codecome.prompt.send_failed" in event_types - assert "codecome.attempt.failed" in event_types + assert "codecome.attempt.incomplete" in event_types + assert "codecome.attempt.failed" not in event_types failed = next(event for event in events if event["type"] == "codecome.prompt.send_failed") - assert failed["properties"]["errorType"] == "TimeoutError" - assert failed["properties"]["message"] == "timed out" + assert failed["properties"]["errorType"] == "OpenCodeRequestError" + assert failed["properties"]["message"] == "Failed to send prompt: timed out" + + +def test_run_single_attempt_create_session_timeout_is_recoverable(mock_args, mock_console, monkeypatch): + def fake_create(*_a, **_kw): + raise runner.OpenCodeRequestError( + "Failed to create session: timed out", + retriable=True, + operation="create_session", + ) + + monkeypatch.setattr(runner, "create_session", fake_create) + monkeypatch.setattr(runner, "_consume_events", lambda *a, **kw: pytest.fail("should not consume events")) + monkeypatch.setattr(runner, "send_prompt_to_session", lambda *a, **kw: pytest.fail("should not send prompt")) + + events = [] + fake_transcript = MagicMock(spec=Transcript) + fake_transcript.path = Path("fake.jsonl") + fake_transcript.write_event.side_effect = events.append + monkeypatch.setattr(Transcript, "for_phase", classmethod(lambda cls, p, f: fake_transcript)) + + code, session_id, res, _path = runner._run_single_attempt( + mock_args, mock_console, "do work", "model", "var", + "http://base", "auth", "dir", lambda *a: None, + emit_fatal_error_fn=lambda *_a: pytest.fail("retriable create timeout should not be fatal"), + ) + + assert code == 2 + assert session_id == "" + assert res.last_finish_reason == "server_unreachable" + event_types = [event["type"] for event in events] + assert "codecome.session.create_failed" in event_types + assert "codecome.attempt.incomplete" in event_types + assert "codecome.session.ready" not in event_types def test_existing_session_busy_guard_blocks_resume_prompt(mock_args, mock_console, monkeypatch): diff --git a/tests/test_harness_recovery_restart.py b/tests/test_harness_recovery_restart.py new file mode 100644 index 00000000..f96fcd14 --- /dev/null +++ b/tests/test_harness_recovery_restart.py @@ -0,0 +1,142 @@ +from __future__ import annotations + +import argparse +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "tools")) + +from events.phase_loop import RunResult + + +class _FakeRuntimeConfig: + model = "op/test" + variant = None + model_source = "stub" + variant_source = "stub" + thinking_on = False + thinking_source = "stub" + + +class _FakeServerInfo: + base_url = "http://localhost" + password = "fake" + pid = 1 + + +class _FakeServerRunner: + def __init__(self): + self.start_calls = 0 + self.restart_calls = 0 + + def start(self, **_kw): + self.start_calls += 1 + return _FakeServerInfo() + + def restart(self, **_kw): + self.restart_calls += 1 + return _FakeServerInfo() + + def stop(self): + pass + + @property + def info(self): + return _FakeServerInfo() + + +def _args(phase: str = "2") -> argparse.Namespace: + return argparse.Namespace( + phase=phase, label="test", agent="auditor", + prompt_file="prompts/phase-2.md", finding=None, chat=False, + show_model=False, debug=False, color="never", log_level="WARN", + read_display_lines=None, write_content_lines=None, + write_diff_limit=None, edit_diff_lines=None, + ) + + +def _server_unreachable_result() -> RunResult: + return RunResult(last_finish_reason="server_unreachable", last_session_id="ses_x") + + +def _stalled_result() -> RunResult: + return RunResult( + last_finish_reason="session_stalled", + any_step_finish_seen=True, + step_finish_count=2, + session_stalled=True, + last_session_id="ses_x", + ) + + +def _terminal_ok_result() -> RunResult: + return RunResult(last_finish_reason="stop", any_step_finish_seen=True, step_finish_count=1) + + +def _wire(monkeypatch, fake_runner, attempts): + from codecome import harness as harness_mod + from codecome import runner as runner_mod + + it = iter(attempts) + monkeypatch.setattr(harness_mod, "ServerRunner", lambda: fake_runner) + monkeypatch.setattr(harness_mod, "load_prompt", lambda *_a, **_kw: "prompt") + monkeypatch.setattr(harness_mod, "resolve_runtime_config", lambda _agent: _FakeRuntimeConfig()) + monkeypatch.setattr(harness_mod, "configure_rendering", lambda *_a, **_kw: None) + monkeypatch.setattr(harness_mod, "check_phase_graceful_completion", lambda *_a, **_kw: (True, [])) + monkeypatch.setattr(runner_mod, "_run_single_attempt", lambda *_a, **_kw: next(it)) + import subprocess + from unittest.mock import MagicMock + monkeypatch.setattr(subprocess, "run", lambda *_a, **_kw: MagicMock(returncode=0)) + return harness_mod + + +def _t(harness_mod): + return harness_mod.ROOT / "tmp" / "fake.jsonl" + + +def test_server_unreachable_triggers_restart_then_succeeds(monkeypatch): + fake = _FakeServerRunner() + from codecome import harness as harness_mod + transcript = _t(harness_mod) + harness_mod = _wire(monkeypatch, fake, [ + (2, "ses_x", _server_unreachable_result(), transcript), + (0, "ses_x", _terminal_ok_result(), transcript), + ]) + monkeypatch.setattr(harness_mod, "run_frontmatter_validation", lambda *_a, **_kw: (0, ""), raising=False) + + rc = harness_mod.run_phase_mode(_args()) + assert rc == 0 + assert fake.restart_calls == 1 + + +def test_session_stalled_triggers_restart_then_succeeds(monkeypatch): + fake = _FakeServerRunner() + from codecome import harness as harness_mod + transcript = _t(harness_mod) + harness_mod = _wire(monkeypatch, fake, [ + (0, "ses_x", _stalled_result(), transcript), + (0, "ses_x", _terminal_ok_result(), transcript), + ]) + monkeypatch.setattr(harness_mod, "run_frontmatter_validation", lambda *_a, **_kw: (0, ""), raising=False) + + rc = harness_mod.run_phase_mode(_args()) + assert rc == 0 + assert fake.restart_calls == 1 + + +def test_recovery_shares_budget_and_exhausts(monkeypatch): + fake = _FakeServerRunner() + from codecome import harness as harness_mod + transcript = _t(harness_mod) + monkeypatch.setenv("CODECOME_MAX_SERVER_RESTARTS", "2") + # One stall + two unreachable = 3 recoverable conditions, budget is 2. + harness_mod = _wire(monkeypatch, fake, [ + (0, "ses_x", _stalled_result(), transcript), + (2, "ses_x", _server_unreachable_result(), transcript), + (2, "ses_x", _server_unreachable_result(), transcript), + ]) + + rc = harness_mod.run_phase_mode(_args()) + # Budget exhausted after 2 restarts → non-zero terminal status. + assert rc != 0 + assert fake.restart_calls == 2 diff --git a/tests/test_runner_resume_health.py b/tests/test_runner_resume_health.py new file mode 100644 index 00000000..1c5f0bb5 --- /dev/null +++ b/tests/test_runner_resume_health.py @@ -0,0 +1,207 @@ +from __future__ import annotations + +import json +import sys +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "tools")) + + +def _transcript(tmp_path: Path): + from codecome.transcript import Transcript + + path = tmp_path / "resume.jsonl" + return Transcript(path, path.open("x", encoding="utf-8", buffering=1)) + + +def _events(path: Path) -> list[dict]: + return [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines()] + + +def test_resume_status_none_with_healthy_server_waits_until_timeout(monkeypatch, tmp_path): + from codecome import runner as runner_mod + + transcript = _transcript(tmp_path) + monkeypatch.setenv("CODECOME_RESUME_IDLE_TIMEOUT", "0") + monkeypatch.setattr(runner_mod, "get_session_status", lambda *_a, **_kw: None) + monkeypatch.setattr(runner_mod, "is_server_healthy", lambda *_a, **_kw: True) + + with pytest.raises(runner_mod.ResumeSessionNotReady): + runner_mod._wait_for_resume_idle( + "http://127.0.0.1:1", "ses_test", "pw", "/workspace", transcript + ) + transcript.close() + + event_types = [event["type"] for event in _events(transcript.path)] + assert "codecome.resume.status_unavailable_server_healthy" in event_types + assert "codecome.resume.server_unreachable" not in event_types + + +def test_resume_status_none_with_unhealthy_server_raises_unreachable(monkeypatch, tmp_path): + from codecome import runner as runner_mod + + transcript = _transcript(tmp_path) + monkeypatch.setenv("CODECOME_RESUME_IDLE_TIMEOUT", "120") + monkeypatch.setenv("CODECOME_RESUME_SERVER_UNAVAILABLE_THRESHOLD", "2") + monkeypatch.setattr(runner_mod, "get_session_status", lambda *_a, **_kw: None) + monkeypatch.setattr(runner_mod, "is_server_healthy", lambda *_a, **_kw: False) + monkeypatch.setattr(runner_mod.time, "sleep", lambda *_a, **_kw: None) + + with pytest.raises(runner_mod.ResumeSessionServerUnreachable): + runner_mod._wait_for_resume_idle( + "http://127.0.0.1:1", "ses_test", "pw", "/workspace", transcript + ) + transcript.close() + + event_types = [event["type"] for event in _events(transcript.path)] + assert "codecome.resume.server_unreachable" in event_types + + +def test_resume_status_none_with_live_process_does_not_kill_server(monkeypatch, tmp_path): + """A busy/blocked control plane on a live process must NOT be server death. + + Regression for the case where /session/status and /global/health both block + during a long busy turn while the opencode process is still alive. With a + liveness_check reporting alive, we keep waiting and eventually time out as + resume_not_ready instead of declaring the server unreachable. + """ + from codecome import runner as runner_mod + + transcript = _transcript(tmp_path) + monkeypatch.setenv("CODECOME_RESUME_IDLE_TIMEOUT", "0") + # Health endpoint also blocked/unhealthy under load, but the process is alive. + monkeypatch.setattr(runner_mod, "get_session_status", lambda *_a, **_kw: None) + monkeypatch.setattr(runner_mod, "is_server_healthy", lambda *_a, **_kw: False) + + with pytest.raises(runner_mod.ResumeSessionNotReady) as excinfo: + runner_mod._wait_for_resume_idle( + "http://127.0.0.1:1", "ses_test", "pw", "/workspace", transcript, + liveness_check=lambda: True, + ) + transcript.close() + + assert not isinstance(excinfo.value, runner_mod.ResumeSessionServerUnreachable) + event_types = [event["type"] for event in _events(transcript.path)] + assert "codecome.resume.status_unavailable_process_alive" in event_types + assert "codecome.resume.server_unreachable" not in event_types + + +def test_resume_status_none_with_dead_process_raises_unreachable(monkeypatch, tmp_path): + """When liveness_check reports the process exited, declare it unreachable.""" + from codecome import runner as runner_mod + + transcript = _transcript(tmp_path) + monkeypatch.setenv("CODECOME_RESUME_IDLE_TIMEOUT", "120") + monkeypatch.setattr(runner_mod, "get_session_status", lambda *_a, **_kw: None) + monkeypatch.setattr(runner_mod.time, "sleep", lambda *_a, **_kw: None) + + with pytest.raises(runner_mod.ResumeSessionServerUnreachable): + runner_mod._wait_for_resume_idle( + "http://127.0.0.1:1", "ses_test", "pw", "/workspace", transcript, + liveness_check=lambda: False, + ) + transcript.close() + + events = _events(transcript.path) + unreachable = [e for e in events if e["type"] == "codecome.resume.server_unreachable"] + assert unreachable + assert unreachable[0]["properties"].get("processExited") is True + + +class _FakeProc: + def __init__(self, returncode): + self._returncode = returncode + + def poll(self): + return self._returncode + + +class _FakeInfo: + def __init__(self, returncode): + self.proc = _FakeProc(returncode) + + +class _FakeRunner: + def __init__(self, info): + self.info = info + + +def test_make_liveness_check_uses_process_handle(): + from opencode.serve import make_liveness_check + + # poll() is None → process still running → alive + assert make_liveness_check(_FakeRunner(_FakeInfo(None)))() is True + # poll() returns an exit code → process exited → not alive + assert make_liveness_check(_FakeRunner(_FakeInfo(-15)))() is False + # no server info yet → not alive + assert make_liveness_check(_FakeRunner(None))() is False + + +# --------------------------------------------------------------------------- +# Stall propagation: _run_subphase maps a stalled run to SESSION_STALLED +# --------------------------------------------------------------------------- + +import argparse # noqa: E402 + + +class _FakeRuntimeConfig: + model = "op/test" + variant = None + model_source = "stub" + variant_source = "stub" + thinking_on = False + thinking_source = "stub" + + +def _stall_args(phase: str = "1b") -> argparse.Namespace: + return argparse.Namespace( + phase=phase, label="test", agent="recon", + prompt_file="prompts/phase-1b-sandbox.md", finding=None, + chat=False, show_model=False, debug=False, color="never", log_level="WARN", + read_display_lines=None, write_content_lines=None, + write_diff_limit=None, edit_diff_lines=None, + ) + + +def test_run_subphase_maps_stalled_runresult_to_session_stalled(monkeypatch): + from codecome import phase_1 as p1 + from codecome.status import RunStatus + from events.phase_loop import RunResult + + transcript = p1.ROOT / "tmp" / "fake.jsonl" + stalled = RunResult( + last_finish_reason="session_stalled", + any_step_finish_seen=True, + step_finish_count=3, + session_stalled=True, + last_session_id="ses_test", + ) + + runner = MagicMock() + runner.info = _FakeInfo(None) + runner.info.password = "pw" + + monkeypatch.setattr(p1, "load_prompt", lambda *_a, **_kw: "prompt") + monkeypatch.setattr(p1, "resolve_runtime_config", lambda _agent: _FakeRuntimeConfig()) + monkeypatch.setattr(p1, "configure_rendering", lambda *_a, **_kw: None) + monkeypatch.setattr( + p1, "_run_single_attempt", + lambda *_a, **_kw: (RunStatus.OK, "ses_test", stalled, transcript), + ) + + rc = p1._run_subphase( + args=_stall_args("1b"), + console=None, + rendering_ctx=None, + runner=runner, + base_url="http://localhost", + phase_id="1b", + label="test", + agent="recon", + prompt_file="prompts/phase-1b-sandbox.md", + ) + + assert rc == RunStatus.SESSION_STALLED diff --git a/tests/test_session.py b/tests/test_session.py index 6a3f6f12..1b29ebf7 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -101,6 +101,35 @@ def test_create_session_empty_id_raises(self, mock_urlopen): with pytest.raises(RuntimeError, match="empty session ID"): module.create_session("http://localhost:8080", "1", "recon", None, None, None) + @patch("time.sleep") + @patch("urllib.request.urlopen") + def test_create_session_retries_on_timeout(self, mock_urlopen, mock_sleep): + module = _load_session_module() + mock_resp = MagicMock() + mock_resp.read.return_value = json.dumps({"id": "sess-after-retry"}).encode("utf-8") + mock_urlopen.side_effect = [TimeoutError("timed out"), mock_resp] + + sid = module.create_session("http://localhost:8080", "1", "recon", None, None, None) + + assert sid == "sess-after-retry" + assert mock_urlopen.call_count == 2 + assert mock_sleep.call_count == 1 + + @patch("time.sleep") + @patch("urllib.request.urlopen") + def test_create_session_exhausted_timeout_is_retriable_request_error(self, mock_urlopen, mock_sleep): + module = _load_session_module() + mock_urlopen.side_effect = TimeoutError("timed out") + + with pytest.raises(module.OpenCodeRequestError) as excinfo: + module.create_session("http://localhost:8080", "1", "recon", None, None, None) + + assert excinfo.value.retriable is True + assert excinfo.value.operation == "create_session" + assert "Failed to create session: timed out" in str(excinfo.value) + assert mock_urlopen.call_count == 3 + assert mock_sleep.call_count == 2 + class TestCreateChatSession: @patch("urllib.request.urlopen") @@ -210,6 +239,20 @@ def test_send_prompt_retries_exhausted_raises(self, mock_urlopen, mock_sleep): assert mock_urlopen.call_count == 3 # default max retries + @patch("time.sleep") + @patch("urllib.request.urlopen") + def test_send_prompt_exhausted_timeout_is_retriable_request_error(self, mock_urlopen, mock_sleep): + module = _load_session_module() + mock_urlopen.side_effect = TimeoutError("timed out") + + with pytest.raises(module.OpenCodeRequestError) as excinfo: + module.send_prompt_to_session( + "http://localhost:8080", "sess-1", "hello", "recon", None, None, None, None + ) + + assert excinfo.value.retriable is True + assert excinfo.value.operation == "send_prompt" + @patch("time.sleep") @patch("urllib.request.urlopen") def test_send_prompt_no_retry_on_4xx(self, mock_urlopen, mock_sleep): diff --git a/tests/test_sse_busy_resilience.py b/tests/test_sse_busy_resilience.py new file mode 100644 index 00000000..0e44c51c --- /dev/null +++ b/tests/test_sse_busy_resilience.py @@ -0,0 +1,454 @@ +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "tools")) + +from events.sse_client import SseClient, SseClientError +from events.phase_loop import PhaseEventLoop + + +# --------------------------------------------------------------------------- +# SseClient: should_continue keeps the stream alive past the reconnect budget +# --------------------------------------------------------------------------- + +def _drop_after(events_per_stream, total_streams): + """Build a fake _open_stream that yields a stream then raises a drop.""" + streams = iter([list(events_per_stream) for _ in range(total_streams)]) + + def fake_open_stream(): + try: + stream = next(streams) + except StopIteration: + raise SseClientError("no more streams") + for event in stream: + yield event + raise SseClientError("drop") + + return fake_open_stream + + +def test_reconnect_budget_exhausted_without_should_continue(): + client = SseClient("http://localhost:8080", reconnect=True, max_reconnects=2) + client._wait_backoff = lambda: None + client._open_stream = _drop_after([{"type": "server.heartbeat"}], total_streams=10) + + with pytest.raises(SseClientError): + list(client.events()) + # Budget was honored: stopped at max_reconnects. + assert client._reconnect_count == client.max_reconnects + + +def test_should_continue_keeps_consuming_past_budget(): + """While should_continue() is True, the budget never terminates the stream.""" + client = SseClient("http://localhost:8080", reconnect=True, max_reconnects=2) + client._wait_backoff = lambda: None + + # Allow plenty of reconnects, then flip should_continue to False so it stops. + calls = {"n": 0} + + def should_continue(): + calls["n"] += 1 + return calls["n"] < 6 # keep going for the first few budget checks + + client.should_continue = should_continue + client._open_stream = _drop_after([{"type": "server.heartbeat"}], total_streams=50) + + with pytest.raises(SseClientError): + list(client.events()) + + # We must have reconnected well past the nominal budget of 2. + assert calls["n"] >= 6 + + +def test_should_continue_exception_is_treated_as_stop(): + client = SseClient("http://localhost:8080", reconnect=True, max_reconnects=1) + client._wait_backoff = lambda: None + + def should_continue(): + raise RuntimeError("boom") + + client.should_continue = should_continue + client._open_stream = _drop_after([{"type": "server.heartbeat"}], total_streams=10) + + with pytest.raises(SseClientError): + list(client.events()) + + +# --------------------------------------------------------------------------- +# PhaseEventLoop: _should_keep_consuming combines busy-state + liveness +# --------------------------------------------------------------------------- + +def _loop(liveness_check=None): + return PhaseEventLoop( + "http://localhost:8080", "sess-1", None, "1", "recon", + liveness_check=liveness_check, + ) + + +def test_keep_consuming_false_when_not_busy(): + loop = _loop(liveness_check=lambda: True) + assert loop._session_busy is False + assert loop._should_keep_consuming() is False + + +def test_keep_consuming_true_when_busy_and_alive(): + loop = _loop(liveness_check=lambda: True) + loop._track_session_busy( + {"type": "session.status", "properties": {"status": {"type": "busy"}}} + ) + assert loop._session_busy is True + assert loop._should_keep_consuming() is True + + +def test_keep_consuming_false_when_busy_but_process_dead(): + loop = _loop(liveness_check=lambda: False) + loop._track_session_busy( + {"type": "session.status", "properties": {"status": {"type": "busy"}}} + ) + assert loop._should_keep_consuming() is False + + +def test_keep_consuming_true_when_busy_and_no_liveness_signal(): + loop = _loop(liveness_check=None) + loop._track_session_busy( + {"type": "session.status", "properties": {"status": {"type": "busy"}}} + ) + assert loop._should_keep_consuming() is True + + +def test_idle_status_clears_busy(): + loop = _loop(liveness_check=lambda: True) + loop._track_session_busy( + {"type": "session.status", "properties": {"status": {"type": "busy"}}} + ) + assert loop._session_busy is True + loop._track_session_busy( + {"type": "session.status", "properties": {"status": {"type": "idle"}}} + ) + assert loop._session_busy is False + assert loop._should_keep_consuming() is False + + +def test_session_idle_event_clears_busy(): + loop = _loop(liveness_check=lambda: True) + loop._track_session_busy( + {"type": "session.status", "properties": {"status": {"type": "busy"}}} + ) + loop._track_session_busy({"type": "session.idle", "properties": {"sessionID": "sess-1"}}) + assert loop._session_busy is False + + +# --------------------------------------------------------------------------- +# Stall watchdog: bounded busy-wait +# --------------------------------------------------------------------------- + +def _busy(loop): + loop._track_session_busy( + {"type": "session.status", "properties": {"status": {"type": "busy"}}} + ) + + +def test_stall_detected_stops_consuming_even_when_busy_and_alive(monkeypatch): + monkeypatch.setenv("CODECOME_BUSY_STALL_TIMEOUT", "180") + loop = _loop(liveness_check=lambda: True) + _busy(loop) + loop._fetch_session_status = lambda timeout=2.0: "busy" + assert loop._should_keep_consuming() is True + + # Advance the monotonic clock past the stall window with no progress. + base = loop._last_progress_at + monkeypatch.setattr("events.phase_loop.time.monotonic", lambda: base + 181.0) + + assert loop._should_keep_consuming() is False + assert loop._session_stalled is True + + +def test_status_idle_prevents_false_stall(monkeypatch): + monkeypatch.setenv("CODECOME_BUSY_STALL_TIMEOUT", "180") + loop = _loop(liveness_check=lambda: True) + _busy(loop) + loop._fetch_session_status = lambda timeout=2.0: "idle" + + base = loop._last_progress_at + monkeypatch.setattr("events.phase_loop.time.monotonic", lambda: base + 181.0) + + assert loop._should_keep_consuming() is False + assert loop._session_stalled is False + assert loop._session_busy is False + assert loop._session_idle_via_status is True + + +def test_status_busy_still_allows_stall(monkeypatch): + monkeypatch.setenv("CODECOME_BUSY_STALL_TIMEOUT", "180") + loop = _loop(liveness_check=lambda: True) + _busy(loop) + loop._fetch_session_status = lambda timeout=2.0: "busy" + + base = loop._last_progress_at + monkeypatch.setattr("events.phase_loop.time.monotonic", lambda: base + 181.0) + + assert loop._should_keep_consuming() is False + assert loop._session_stalled is True + + +def test_non_heartbeat_event_resets_stall_timer(monkeypatch): + loop = _loop(liveness_check=lambda: True) + _busy(loop) + base = loop._last_progress_at + + # Far in the future, but a meaningful event arrives → timer resets. + monkeypatch.setattr("events.phase_loop.time.monotonic", lambda: base + 500.0) + loop._note_progress({"type": "message.part.updated", "properties": {}}) + assert loop._stalled() is False + + +def test_heartbeat_and_connected_do_not_reset_stall_timer(monkeypatch): + loop = _loop(liveness_check=lambda: True) + _busy(loop) + base = loop._last_progress_at + + monkeypatch.setattr("events.phase_loop.time.monotonic", lambda: base + 500.0) + loop._note_progress({"type": "server.heartbeat", "properties": {}}) + loop._note_progress({"type": "server.connected", "properties": {}}) + # Neither counts as progress → still stalled. + assert loop._stalled() is True + + +def test_stall_timeout_zero_disables_watchdog(monkeypatch): + monkeypatch.setenv("CODECOME_BUSY_STALL_TIMEOUT", "0") + loop = _loop(liveness_check=lambda: True) + _busy(loop) + base = loop._last_progress_at + monkeypatch.setattr("events.phase_loop.time.monotonic", lambda: base + 10_000.0) + assert loop._stalled() is False + assert loop._should_keep_consuming() is True + + +def test_runresult_carries_session_stalled_default(): + from events.phase_loop import RunResult + + assert RunResult().session_stalled is False + assert RunResult(session_stalled=True).session_stalled is True + + +# --------------------------------------------------------------------------- +# Read-tick: silent-but-open stream must not block forever +# --------------------------------------------------------------------------- + +def test_tick_with_should_continue_false_stops_stream(monkeypatch): + """A None tick + should_continue()==False makes events() stop cleanly.""" + client = SseClient("http://localhost:8080", reconnect=True, max_reconnects=10) + client.should_continue = lambda: False # e.g. stall watchdog tripped + + # Open stream yields one event, then nothing but inactivity ticks. + def fake_open_stream(): + yield {"type": "session.status", "properties": {"status": {"type": "busy"}}} + while True: + yield None # inactivity ticks + + client._open_stream = fake_open_stream + out = list(client.events()) + # The busy event is delivered; the stream stops at the first tick. + assert [e["type"] for e in out] == ["session.status"] + assert client._stopped is True + + +def test_tick_with_should_continue_true_keeps_reading(monkeypatch): + """A None tick + should_continue()==True keeps consuming (no error/stop).""" + calls = {"ticks": 0} + + def should_continue(): + calls["ticks"] += 1 + return calls["ticks"] < 3 # keep going for 2 ticks, then stop + + client = SseClient("http://localhost:8080", reconnect=True, max_reconnects=10) + client.should_continue = should_continue + + def fake_open_stream(): + while True: + yield None + + client._open_stream = fake_open_stream + out = list(client.events()) + assert out == [] # only ticks, no events + assert calls["ticks"] >= 3 + assert client._stopped is True + + +def test_tick_with_no_should_continue_keeps_reading_then_event(monkeypatch): + """Chat mode (should_continue=None): ticks are ignored, events still flow.""" + client = SseClient("http://localhost:8080", reconnect=False, max_reconnects=1) + assert client.should_continue is None + + def fake_open_stream(): + yield None # tick (ignored in chat mode) + yield {"type": "session.status", "properties": {"status": {"type": "idle"}}} + # Stream ends (server closed) → with reconnect=False this raises out. + raise SseClientError("closed") + + client._open_stream = fake_open_stream + out = [] + with pytest.raises(SseClientError): + for e in client.events(): + out.append(e) + assert [e["type"] for e in out] == ["session.status"] + + +# --------------------------------------------------------------------------- +# Heartbeat-loss secondary stall signal +# --------------------------------------------------------------------------- + +class _FakeClientHB: + def __init__(self, since_hb): + self._since = since_hb + + def seconds_since_heartbeat(self): + return self._since + + +def test_heartbeat_loss_triggers_stall(monkeypatch): + loop = _loop(liveness_check=lambda: True) + _busy(loop) + # Primary no-progress timer is fresh, but heartbeats were seen and stopped. + monkeypatch.setenv("CODECOME_HEARTBEAT_STALL_TIMEOUT", "45") + loop._heartbeat_stall_timeout_s = 45.0 + loop._client = _FakeClientHB(since_hb=60.0) + assert loop._stalled() is True + + +def test_heartbeat_never_seen_does_not_trigger_via_heartbeat_path(monkeypatch): + loop = _loop(liveness_check=lambda: True) + _busy(loop) + loop._heartbeat_stall_timeout_s = 45.0 + loop._client = _FakeClientHB(since_hb=None) # no heartbeats ever seen + # Primary timer fresh, heartbeat path inactive → not stalled. + assert loop._stalled() is False + + +def test_heartbeat_recent_does_not_trigger(monkeypatch): + loop = _loop(liveness_check=lambda: True) + _busy(loop) + loop._heartbeat_stall_timeout_s = 45.0 + loop._client = _FakeClientHB(since_hb=12.0) # heartbeat 12s ago, healthy + assert loop._stalled() is False + + +def test_heartbeat_stall_disabled_by_default(monkeypatch): + monkeypatch.delenv("CODECOME_HEARTBEAT_STALL_TIMEOUT", raising=False) + loop = _loop(liveness_check=lambda: True) + _busy(loop) + loop._client = _FakeClientHB(since_hb=10_000.0) + assert loop._heartbeat_stall_timeout_s == 0.0 + assert loop._stalled() is False + + +# --------------------------------------------------------------------------- +# SseClient heartbeat bookkeeping +# --------------------------------------------------------------------------- + +def test_on_event_no_false_heartbeat_timeout_when_none_seen(): + """Without any heartbeat, _on_event must not raise a heartbeat timeout.""" + import events.sse_client as sse_mod + + client = SseClient("http://localhost:8080") + # Pretend a long time has passed but no heartbeat was ever observed. + client._last_heartbeat = 0.0 + client._heartbeats_seen = False + # Should not raise (silence handled by the read-tick path instead). + client._on_event({"type": "session.status", "properties": {"status": {"type": "busy"}}}) + assert client.seconds_since_heartbeat() is None + + +def test_on_event_stale_heartbeat_does_not_drop_late_event(monkeypatch): + import events.sse_client as sse_mod + + client = SseClient("http://localhost:8080") + monkeypatch.setattr(sse_mod.time, "time", lambda: 1000.0) + client._on_event({"type": "server.heartbeat", "properties": {}}) + monkeypatch.setattr(sse_mod.time, "time", lambda: 1100.0) + + # A non-heartbeat event after a long heartbeat gap is still valid progress; + # it must not be discarded by raising SseClientError. + client._on_event({"type": "session.status", "properties": {"status": {"type": "idle"}}}) + assert client.seconds_since_heartbeat() == 100.0 + + +def test_seconds_since_heartbeat_after_heartbeat(monkeypatch): + import events.sse_client as sse_mod + + client = SseClient("http://localhost:8080") + monkeypatch.setattr(sse_mod.time, "time", lambda: 1000.0) + client._on_event({"type": "server.heartbeat", "properties": {}}) + assert client._heartbeats_seen is True + monkeypatch.setattr(sse_mod.time, "time", lambda: 1005.0) + assert client.seconds_since_heartbeat() == 5.0 + + +def test_run_status_idle_final_sync_recovers_terminal_finish(monkeypatch): + monkeypatch.setenv("CODECOME_BUSY_STALL_TIMEOUT", "1") + + now = {"value": 0.0} + monkeypatch.setattr("events.phase_loop.time.monotonic", lambda: now["value"]) + + class FakeSseClient: + def __init__(self, *args, should_continue=None, **kwargs): + self.should_continue = should_continue + + def events(self): + yield { + "type": "session.status", + "properties": { + "sessionID": "sess-1", + "status": {"type": "busy"}, + }, + } + now["value"] = 2.0 + if self.should_continue is not None: + self.should_continue() + return + + def seconds_since_heartbeat(self): + return None + + def stop(self): + pass + + monkeypatch.setattr("events.phase_loop.SseClient", FakeSseClient) + + loop = _loop(liveness_check=lambda: True) + loop._fetch_session_status = lambda timeout=2.0: "idle" + loop._fetch_session_messages = lambda: [ + { + "info": { + "id": "msg-1", + "role": "assistant", + "sessionID": "sess-1", + "tokens": {"input": 1}, + }, + "parts": [ + { + "id": "finish-1", + "type": "step-finish", + "reason": "stop", + "tokens": {"total": 1}, + } + ], + } + ] + + rendered = [] + recorded = [] + result = loop.run( + lambda console, phase, label, event: rendered.append(event), + record_raw_event_fn=recorded.append, + ) + + assert result.session_stalled is False + assert result.any_step_finish_seen is True + assert result.last_finish_reason == "stop" + assert any(event.get("type") == "step_finish" for event in rendered) + assert recorded[-1]["properties"]["status"]["type"] == "idle" diff --git a/tools/codecome/harness.py b/tools/codecome/harness.py index 7e420e1e..c90d5f55 100644 --- a/tools/codecome/harness.py +++ b/tools/codecome/harness.py @@ -20,13 +20,14 @@ from pathlib import Path from typing import Any, Optional -from opencode.serve import ServerRunner, ServerRunnerError +from opencode.serve import ServerRunner, ServerRunnerError, make_liveness_check from codecome.console import build_console, _emit_fatal_error from rendering.dispatch import _get_rendering_ctx, configure_rendering, render_event from rendering.output import get_output, T from rendering.events import _FINISH_TERMINAL_OK, _FINISH_MID_TURN, _FINISH_BUDGET, _FINISH_FAILURE from codecome.config import ROOT, resolve_color_mode, load_prompt, resolve_runtime_config +from codecome.status import RunStatus from phases.completion import ( check_phase_graceful_completion, build_phase_resume_prompt, build_frontmatter_resume_prompt, @@ -93,6 +94,28 @@ def _write_phase3_noop_summary() -> Path: return path +# Recovery conditions that are handled by restarting the opencode server and +# retrying. Maps the finish reason to its user-facing recovery wording. +_RECOVERABLE_RESTART_REASONS: dict[str, str] = { + "server_unreachable": ( + "The opencode server is unreachable or unresponsive." + ), + "session_stalled": ( + "The model turn produced no activity for an extended period (stalled)." + ), +} + + +def _restart_server(runner: ServerRunner, *, log_level: str) -> Any: + """Restart the opencode server, returning the new ServerInfo. + + Centralizes the single stop/start invocation so the Phase 1 and Phase 2-6 + recovery paths never duplicate the restart call. Raises ServerRunnerError on + failure (the caller decides how to surface it). + """ + return runner.restart(hostname="127.0.0.1", log_level=log_level) + + def run_phase_mode(args: argparse.Namespace) -> int: """Run a single phase with auto-retry/resume. @@ -143,7 +166,48 @@ def _p1_forward_signal(signum: int, _frame: Any) -> None: _p1_prev_sigterm = signal.signal(signal.SIGTERM, _p1_forward_signal) try: from codecome.phase_1 import run_phase_1 as _run_phase_1 - return _run_phase_1(args, console, _rendering_ctx, _p1_runner, _p1_server_info.base_url) + _p1_base_url = _p1_server_info.base_url + _p1_start_at = "1a" + _p1_restart_count = 0 + _p1_max_server_restarts = int(os.environ.get("CODECOME_MAX_SERVER_RESTARTS", "2")) + + while True: + outcome = _run_phase_1( + args, + console, + _rendering_ctx, + _p1_runner, + _p1_base_url, + start_at=_p1_start_at, + ) + if outcome.status not in (RunStatus.SERVER_UNREACHABLE, RunStatus.SESSION_STALLED): + return int(outcome.status) + + if _p1_restart_count >= _p1_max_server_restarts: + get_output(console).error( + f"Server restart budget exhausted ({_p1_restart_count}/{_p1_max_server_restarts}). " + "Cannot continue Phase 1." + ) + return int(RunStatus.ERROR) + + _p1_restart_count += 1 + _p1_start_at = outcome.failed_subphase or _p1_start_at + _p1_reason = ( + "session_stalled" + if outcome.status == RunStatus.SESSION_STALLED + else "server_unreachable" + ) + get_output(console).warn( + f"\n[Auto-Recovery] {_RECOVERABLE_RESTART_REASONS[_p1_reason]} " + f"CodeCome will restart the opencode server and retry from Phase {_p1_start_at} " + f"(server restart {_p1_restart_count}/{_p1_max_server_restarts})." + ) + try: + _p1_server_info = _restart_server(_p1_runner, log_level=args.log_level) + except ServerRunnerError as exc: + _emit_fatal_error(console, "Server Restart Error", str(exc)) + return int(RunStatus.ERROR) + _p1_base_url = _p1_server_info.base_url finally: signal.signal(signal.SIGINT, _p1_prev_sigint) signal.signal(signal.SIGTERM, _p1_prev_sigterm) @@ -204,6 +268,8 @@ def _p1_forward_signal(signum: int, _frame: Any) -> None: finish_warning: Optional[str] = None phase_failures: list[str] = [] phase_ok: bool = False # defensive default; assigned in D.2 or D.3 before use in D.3b + server_restart_count = 0 + max_server_restarts = int(os.environ.get("CODECOME_MAX_SERVER_RESTARTS", "2")) os.environ["_CODECOME_INSIDE_HARNESS"] = "1" @@ -232,6 +298,7 @@ def _forward_signal(signum: int, _frame: Any) -> None: from codecome.runner import _run_single_attempt from rendering.events import _reset_subagent_state + liveness_check = make_liveness_check(runner) try: while True: attempt_number += 1 @@ -245,10 +312,11 @@ def _forward_signal(signum: int, _frame: Any) -> None: server_info.password, str(ROOT), render_event_fn=render_event, emit_fatal_error_fn=_emit_fatal_error, - existing_session_id=last_session_id or None + existing_session_id=last_session_id or None, + liveness_check=liveness_check, ) - if returncode == 2 and run_result.last_finish_reason == "resume_not_ready": + if returncode == RunStatus.INCOMPLETE and run_result.last_finish_reason == "resume_not_ready": last_session_id = session_id or last_session_id last_finish_reason = run_result.last_finish_reason finish_warning = ( @@ -257,7 +325,43 @@ def _forward_signal(signum: int, _frame: Any) -> None: ) break - if returncode != 0: + _recovery_reason = None + if returncode == RunStatus.INCOMPLETE and run_result.last_finish_reason == "server_unreachable": + _recovery_reason = "server_unreachable" + elif getattr(run_result, "session_stalled", False) or run_result.last_finish_reason == "session_stalled": + _recovery_reason = "session_stalled" + + if _recovery_reason is not None: + if server_restart_count < max_server_restarts: + server_restart_count += 1 + out.warn( + f"\n[Auto-Recovery] {_RECOVERABLE_RESTART_REASONS[_recovery_reason]} " + f"CodeCome will restart the opencode server and retry with a fresh session " + f"(server restart {server_restart_count}/{max_server_restarts})." + ) + try: + server_info = _restart_server(runner, log_level=args.log_level) + except ServerRunnerError as exc: + _emit_fatal_error(console, "Server Restart Error", str(exc)) + return int(RunStatus.ERROR) + base_url = server_info.base_url + last_session_id = "" + prompt = build_phase_resume_prompt( + str(args.phase), args.finding, + _recovery_reason, step_finish_count, + failure_details=phase_failures if phase_failures else None, + ) + continue + last_session_id = session_id or last_session_id + last_finish_reason = _recovery_reason + finish_warning = ( + f"CodeCome attempted {server_restart_count} server restart(s) after a " + f"'{_recovery_reason}' condition, but the restart budget is exhausted. " + "The phase cannot continue." + ) + break + + if returncode != RunStatus.OK: # Infrastructure/transient failure (timeout, connection error, etc.) # Retry with a separate budget so infra blips don't consume the # "model needs more turns" iteration retry budget. @@ -342,28 +446,28 @@ def _forward_signal(signum: int, _frame: Any) -> None: finish_warning = None last_finish_reason = "graceful_forgiveness" else: - returncode = 2 + returncode = RunStatus.INCOMPLETE else: # _FINISH_BUDGET: fail only if artifacts are incomplete if not phase_ok: - returncode = 2 + returncode = RunStatus.INCOMPLETE else: finish_warning = None else: - returncode = 2 + returncode = RunStatus.INCOMPLETE - if returncode == 0: + if returncode == RunStatus.OK: if last_finish_reason in _FINISH_TERMINAL_OK: phase_ok, phase_failures = check_phase_graceful_completion( str(args.phase), args.finding, RUN_START_TIME) if not phase_ok: - returncode = 2 + returncode = RunStatus.INCOMPLETE finish_warning = ( f"Phase {args.phase} reported terminal finish reason '{last_finish_reason}', " "but required durable artifacts were not produced. Treating as incomplete." ) - if returncode == 0: + if returncode == RunStatus.OK: from findings.checks_entry import run_frontmatter_validation validation_rc, validation_output = run_frontmatter_validation() @@ -381,14 +485,14 @@ def _forward_signal(signum: int, _frame: Any) -> None: prompt = build_frontmatter_resume_prompt(args.phase, args.finding, validation_output) continue else: - returncode = 2 + returncode = RunStatus.INCOMPLETE finish_warning = ( "The model output failed local frontmatter validation, and CodeCome could not determine a " "session ID to resume for repair. Treating the phase as incomplete so the validator output " "can be reported back with the saved transcript." ) else: - returncode = 2 + returncode = RunStatus.INCOMPLETE finish_warning = ( f"The model output still fails local frontmatter validation after {max_frontmatter_retries} " "auto-repair attempts. Treating the phase as incomplete so the validation errors can be reported back." @@ -397,10 +501,10 @@ def _forward_signal(signum: int, _frame: Any) -> None: out.error(msg) print(validation_output) break - if returncode == 0: + if returncode == RunStatus.OK: break - if returncode == 2 and ( + if returncode == RunStatus.INCOMPLETE and ( last_finish_reason in _FINISH_MID_TURN or last_finish_reason in _FINISH_BUDGET or (last_finish_reason in _FINISH_TERMINAL_OK and not phase_ok) @@ -442,14 +546,14 @@ def _forward_signal(signum: int, _frame: Any) -> None: signal.signal(signal.SIGTERM, previous_sigterm) runner.stop() - if returncode == 0: + if returncode == RunStatus.OK: out.separator(tone=T.SUCCESS) out.success(f"Phase {args.phase} completed successfully", symbol=True) out.detail( f" finish reason: {last_finish_reason!r} " f"transcript: {transcript_path.relative_to(ROOT)}" ) - elif returncode == 130: + elif returncode == RunStatus.INTERRUPTED: out.separator(tone=T.WARNING) out.warn(f"Phase {args.phase} interrupted") else: diff --git a/tools/codecome/phase_1.py b/tools/codecome/phase_1.py index bb953231..4d4fec16 100644 --- a/tools/codecome/phase_1.py +++ b/tools/codecome/phase_1.py @@ -17,11 +17,12 @@ from pathlib import Path from typing import Any -from opencode.serve import ServerRunner, ServerRunnerError +from opencode.serve import ServerRunner, make_liveness_check from codecome.console import build_console, _emit_fatal_error from codecome.config import ROOT, resolve_color_mode, load_prompt, resolve_runtime_config from codecome.runner import _run_single_attempt +from codecome.status import RunStatus, normalize_status from phases.phase_1_gates import ( check_phase_1a, check_phase_1b, @@ -50,6 +51,16 @@ class _SubphaseOutcome: returncode: int session_id: str transcript_path: Path + + +@dataclass(frozen=True) +class Phase1Outcome: + status: RunStatus + failed_subphase: str | None = None + + +def _phase1_outcome(status: int | RunStatus, failed_subphase: str | None = None) -> Phase1Outcome: + return Phase1Outcome(status=normalize_status(status), failed_subphase=failed_subphase) # --------------------------------------------------------------------------- # CodeQL analysis (between 1a gate and 1b) # --------------------------------------------------------------------------- @@ -353,7 +364,7 @@ def _run_subphase( existing_session_id: str | None = None, initial_prompt: str | None = None, return_outcome: bool = False, -) -> int | _SubphaseOutcome: +) -> int | RunStatus | _SubphaseOutcome: """Run a single subphase agent session with retry/resume.""" prompt_path = ROOT / prompt_file prompt = initial_prompt if initial_prompt is not None else load_prompt(prompt_path, finding, phase=phase_id) @@ -405,6 +416,7 @@ def _run_subphase( subphase_start_time = time.time() password = runner.info.password if runner.info else "" + liveness_check = make_liveness_check(runner) # --- Retry loop (mirrors harness.run_phase_mode) --- while True: @@ -423,9 +435,10 @@ def _run_subphase( transcript_phase=phase_id, phase_override=phase_id, label_override=label, + liveness_check=liveness_check, ) - if returncode == 2 and run_result.last_finish_reason == "resume_not_ready": + if returncode == RunStatus.INCOMPLETE and run_result.last_finish_reason == "resume_not_ready": last_session_id = session_id or last_session_id last_finish_reason = run_result.last_finish_reason finish_warning = ( @@ -434,7 +447,26 @@ def _run_subphase( ) break - if returncode != 0: + if returncode == RunStatus.INCOMPLETE and run_result.last_finish_reason == "server_unreachable": + returncode = RunStatus.SERVER_UNREACHABLE + finish_warning = ( + "CodeCome detected that the opencode server is unreachable or unresponsive. " + "The phase harness will attempt to restart the server and retry " + "the subphase." + ) + break + + if (returncode == RunStatus.OK and getattr(run_result, "session_stalled", False)) or \ + run_result.last_finish_reason == "session_stalled": + returncode = RunStatus.SESSION_STALLED + finish_warning = ( + "CodeCome detected that the model turn produced no activity for an " + "extended period (stalled). The phase harness will attempt to restart " + "the server and retry the subphase." + ) + break + + if returncode != RunStatus.OK: break last_session_id = session_id @@ -501,17 +533,17 @@ def _run_subphase( finish_warning = None last_finish_reason = "graceful_forgiveness" else: - returncode = 2 + returncode = RunStatus.INCOMPLETE else: # _FINISH_BUDGET: fail only if artifacts are incomplete if not phase_ok: - returncode = 2 + returncode = RunStatus.INCOMPLETE else: finish_warning = None else: - returncode = 2 + returncode = RunStatus.INCOMPLETE - if returncode == 0: + if returncode == RunStatus.OK: from findings.checks_entry import run_frontmatter_validation validation_rc, validation_output = run_frontmatter_validation() @@ -529,14 +561,14 @@ def _run_subphase( prompt = build_frontmatter_resume_prompt(phase_id, finding, validation_output) continue else: - returncode = 2 + returncode = RunStatus.INCOMPLETE finish_warning = ( "The model output failed local frontmatter validation, and CodeCome could not determine a " "session ID to resume for repair. Treating the subphase as incomplete so the validator output " "can be reported back with the saved transcript." ) else: - returncode = 2 + returncode = RunStatus.INCOMPLETE finish_warning = ( f"The model output still fails local frontmatter validation after {max_frontmatter_retries} " "auto-repair attempts. Treating the subphase as incomplete so the validation errors can be reported back." @@ -566,14 +598,14 @@ def _run_subphase( ) continue else: - returncode = 2 + returncode = RunStatus.INCOMPLETE finish_warning = ( "The model output failed Phase 1b artifact validation, and CodeCome " "could not determine a session ID to resume for repair. Treating the " "subphase as incomplete so the validation output can be reported back." ) else: - returncode = 2 + returncode = RunStatus.INCOMPLETE finish_warning = ( f"Phase 1b artifact validation still fails after {max_artifact_retries} " "auto-repair attempts. Treating the subphase as incomplete so the " @@ -587,11 +619,10 @@ def _run_subphase( break - if returncode == 2 and ( + if returncode == RunStatus.INCOMPLETE and ( last_finish_reason in _FINISH_MID_TURN or last_finish_reason in _FINISH_BUDGET ): - import os max_iteration_retries = int(os.environ.get("CODECOME_MAX_ITERATION_RETRIES", "1")) if iteration_retry_count < max_iteration_retries: iteration_retry_count += 1 @@ -625,14 +656,14 @@ def _run_subphase( # --- end retry loop --- # Report subphase outcome - if returncode == 0: + if returncode == RunStatus.OK: out.separator(tone=T.SUCCESS) out.success(f"Phase {phase_id} completed successfully", symbol=True) out.detail( f" finish reason: {last_finish_reason!r} " f"transcript: {transcript_path.relative_to(ROOT) if transcript_path.name else 'N/A'}" ) - elif returncode == 130: + elif returncode == RunStatus.INTERRUPTED: out.separator(tone=T.WARNING) out.warn(f"Phase {phase_id} interrupted") else: @@ -657,86 +688,99 @@ def run_phase_1( rendering_ctx: Any, runner: ServerRunner, base_url: str, -) -> int: + *, + start_at: str = "1a", +) -> Phase1Outcome: """Orchestrate Phase 1 subphases 1a → 1b → 1c with gates.""" out = get_output(console) + + if start_at not in {"1a", "1b", "1c"}: + out.error(f"Unsupported Phase 1 start point: {start_at}") + return _phase1_outcome(RunStatus.ERROR) + # ---- Phase 1a: Target Profile ---- - findings_snapshot_1a = count_findings_snapshot() - phase_1a_session_id: str | None = None - phase_1a_prompt: str | None = None - phase_1a_artifact_retries = 0 - while True: - outcome = _run_subphase( + if start_at == "1a": + findings_snapshot_1a = count_findings_snapshot() + phase_1a_session_id: str | None = None + phase_1a_prompt: str | None = None + phase_1a_artifact_retries = 0 + while True: + outcome = _run_subphase( + args=args, + console=console, + rendering_ctx=rendering_ctx, + runner=runner, + base_url=base_url, + phase_id="1a", + label="Target Profile", + agent="recon", + prompt_file="prompts/phase-1a-profile.md", + existing_session_id=phase_1a_session_id, + initial_prompt=phase_1a_prompt, + return_outcome=True, + ) + if outcome.returncode in (RunStatus.SERVER_UNREACHABLE, RunStatus.SESSION_STALLED): + return _phase1_outcome(outcome.returncode, "1a") + if outcome.returncode != RunStatus.OK: + return _phase1_outcome(outcome.returncode) + + gate_rc = check_phase_1a(console, findings_snapshot=findings_snapshot_1a) + if gate_rc == RunStatus.OK: + break + + max_artifact_retries = 2 + if phase_1a_artifact_retries >= max_artifact_retries or not outcome.session_id: + return _phase1_outcome(gate_rc) + + phase_1a_artifact_retries += 1 + out.warn( + "\n[Auto-Correction] Phase 1a artifacts failed Gate 1a validation. " + "CodeCome will resume the same session and ask for a minimal CodeQL plan repair " + f"(retry {phase_1a_artifact_retries}/{max_artifact_retries})." + ) + phase_1a_session_id = outcome.session_id + phase_1a_prompt = build_artifact_repair_resume_prompt( + "1a", None, _phase_1a_codeql_plan_repair_output() + ) + + # ---- Phase 1b: Sandbox Bootstrap ---- + if start_at in {"1a", "1b"}: + rc = _run_subphase( args=args, console=console, rendering_ctx=rendering_ctx, runner=runner, base_url=base_url, - phase_id="1a", - label="Target Profile", + phase_id="1b", + label="Sandbox Bootstrap", agent="recon", - prompt_file="prompts/phase-1a-profile.md", - existing_session_id=phase_1a_session_id, - initial_prompt=phase_1a_prompt, - return_outcome=True, + prompt_file="prompts/phase-1b-sandbox.md", ) - if outcome.returncode != 0: - return outcome.returncode - - gate_rc = check_phase_1a(console, findings_snapshot=findings_snapshot_1a) - if gate_rc == 0: - break - - max_artifact_retries = 2 - if phase_1a_artifact_retries >= max_artifact_retries or not outcome.session_id: - return gate_rc - - phase_1a_artifact_retries += 1 - out.warn( - "\n[Auto-Correction] Phase 1a artifacts failed Gate 1a validation. " - "CodeCome will resume the same session and ask for a minimal CodeQL plan repair " - f"(retry {phase_1a_artifact_retries}/{max_artifact_retries})." - ) - phase_1a_session_id = outcome.session_id - phase_1a_prompt = build_artifact_repair_resume_prompt( - "1a", None, _phase_1a_codeql_plan_repair_output() - ) - - # ---- Phase 1b: Sandbox Bootstrap ---- - rc = _run_subphase( - args=args, - console=console, - rendering_ctx=rendering_ctx, - runner=runner, - base_url=base_url, - phase_id="1b", - label="Sandbox Bootstrap", - agent="recon", - prompt_file="prompts/phase-1b-sandbox.md", - ) - if rc != 0: - return rc - - gate_rc = check_phase_1b(console) - if gate_rc != 0: - return gate_rc - - # ---- CodeQL analysis (post-sandbox) ---- - import subprocess - has_sandbox = (ROOT / "sandbox").exists() - if has_sandbox: - out.info("Starting sandbox for CodeQL execution...") - subprocess.run(["make", "sandbox-up"], cwd=str(ROOT), check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - - try: - _run_codeql(console) - rc = _check_codeql_artifacts(console) - if rc != 0: - return rc - finally: + if rc in (RunStatus.SERVER_UNREACHABLE, RunStatus.SESSION_STALLED): + return _phase1_outcome(rc, "1b") + if rc != RunStatus.OK: + return _phase1_outcome(rc) + + gate_rc = check_phase_1b(console) + if gate_rc != RunStatus.OK: + return _phase1_outcome(gate_rc) + + # ---- CodeQL analysis (post-sandbox) ---- + import subprocess + has_sandbox = (ROOT / "sandbox").exists() if has_sandbox: - out.info("Stopping sandbox after CodeQL execution...") - subprocess.run(["make", "sandbox-down"], cwd=str(ROOT), check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + out.info("Starting sandbox for CodeQL execution...") + subprocess.run(["make", "sandbox-up"], cwd=str(ROOT), check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + try: + _run_codeql(console) + rc = _check_codeql_artifacts(console) + if rc != RunStatus.OK: + return _phase1_outcome(rc) + finally: + if has_sandbox: + out.info("Stopping sandbox after CodeQL execution...") + subprocess.run(["make", "sandbox-down"], cwd=str(ROOT), check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Snapshot findings immediately before 1c so the warning scope matches 1c. findings_snapshot = count_findings_snapshot() @@ -753,15 +797,17 @@ def run_phase_1( agent="recon", prompt_file="prompts/phase-1c-recon.md", ) - if rc != 0: - return rc + if rc in (RunStatus.SERVER_UNREACHABLE, RunStatus.SESSION_STALLED): + return _phase1_outcome(rc, "1c") + if rc != RunStatus.OK: + return _phase1_outcome(rc) gate_rc = check_phase_1c(console, findings_snapshot=findings_snapshot) - if gate_rc != 0: - return gate_rc + if gate_rc != RunStatus.OK: + return _phase1_outcome(gate_rc) # ---- Phase 1 complete ---- out.separator(tone=T.SUCCESS) out.success("Phase 1 complete — all subphases passed.", symbol=True) - return 0 + return _phase1_outcome(RunStatus.OK) diff --git a/tools/codecome/runner.py b/tools/codecome/runner.py index 751c84be..66414bd7 100644 --- a/tools/codecome/runner.py +++ b/tools/codecome/runner.py @@ -18,7 +18,14 @@ import _colors as C from events.phase_loop import PhaseEventLoop, RunResult from codecome.config import ROOT -from codecome.session import create_session, get_session_status, send_prompt_to_session +from codecome.session import ( + OpenCodeRequestError, + create_session, + get_session_status, + is_server_healthy, + send_prompt_to_session, +) +from codecome.status import RunStatus from codecome.transcript import Transcript from codecome.recording import EventRecorder @@ -27,6 +34,10 @@ class ResumeSessionNotReady(RuntimeError): """Raised when an existing session is not ready for a resume prompt.""" +class ResumeSessionServerUnreachable(ResumeSessionNotReady): + """Raised when the server appears to be dead/unreachable during resume wait.""" + + def _consume_events( base_url: str, session_id: str, @@ -39,6 +50,7 @@ def _consume_events( workspace_dir: str | None, render_event_fn: Callable[..., None], event_loop_box: dict[str, Any] | None = None, + liveness_check: Callable[[], bool] | None = None, ) -> RunResult: event_loop = PhaseEventLoop( base_url=base_url, @@ -48,6 +60,7 @@ def _consume_events( label=label, auth_token=auth_token, workspace_dir=workspace_dir, + liveness_check=liveness_check, ) if event_loop_box is not None: event_loop_box["loop"] = event_loop @@ -74,17 +87,75 @@ def _wait_for_resume_idle( auth_token: str | None, workspace_dir: str | None, transcript: Transcript, + liveness_check: Callable[[], bool] | None = None, ) -> None: timeout_s = float(os.environ.get("CODECOME_RESUME_IDLE_TIMEOUT", "120")) poll_s = float(os.environ.get("CODECOME_RESUME_IDLE_POLL", "1")) + probe_timeout_s = float(os.environ.get("CODECOME_RESUME_PROBE_TIMEOUT", "2")) deadline = time.monotonic() + max(timeout_s, 0.0) + server_unavailable_threshold = int( + os.environ.get("CODECOME_RESUME_SERVER_UNAVAILABLE_THRESHOLD", "3") + ) + consecutive_unavailable = 0 + while True: - status = get_session_status(base_url, session_id, auth_token, workspace_dir) + status = get_session_status( + base_url, session_id, auth_token, workspace_dir, timeout=probe_timeout_s + ) if status == "idle": _record_codecome_event(transcript, "codecome.resume.status_ready", sessionID=session_id, status=status) return + if status is None: + # The session-status endpoint is unavailable. This is NOT proof the + # server is dead: under a long busy turn opencode's HTTP control + # plane (including /global/health) can block and time out while the + # process is perfectly alive. The authoritative death signal is the + # child process handle, supplied via liveness_check. + if liveness_check is not None: + if liveness_check(): + consecutive_unavailable = 0 + _record_codecome_event( + transcript, + "codecome.resume.status_unavailable_process_alive", + sessionID=session_id, + ) + else: + _record_codecome_event( + transcript, + "codecome.resume.server_unreachable", + sessionID=session_id, + processExited=True, + ) + raise ResumeSessionServerUnreachable( + f"server at {base_url} process has exited; " + f"session {session_id} cannot be resumed" + ) + elif is_server_healthy(base_url, auth_token, workspace_dir, timeout=probe_timeout_s): + consecutive_unavailable = 0 + _record_codecome_event( + transcript, + "codecome.resume.status_unavailable_server_healthy", + sessionID=session_id, + ) + else: + consecutive_unavailable += 1 + if consecutive_unavailable >= server_unavailable_threshold: + _record_codecome_event( + transcript, + "codecome.resume.server_unreachable", + sessionID=session_id, + consecutiveNone=consecutive_unavailable, + ) + raise ResumeSessionServerUnreachable( + f"server at {base_url} appears to be unreachable or dead " + f"after {consecutive_unavailable} consecutive failed status and health checks; " + f"session {session_id} cannot be resumed" + ) + else: + consecutive_unavailable = 0 + event_type = "codecome.resume.blocked_busy" if status == "busy" else "codecome.resume.blocked_unknown" _record_codecome_event(transcript, event_type, sessionID=session_id, status=status) if time.monotonic() >= deadline: @@ -118,6 +189,7 @@ def _run_single_attempt( transcript_phase: str | None = None, phase_override: str | None = None, label_override: str | None = None, + liveness_check: Callable[[], bool] | None = None, ) -> tuple[int, str, RunResult, Path]: transcript: Transcript @@ -142,9 +214,35 @@ def _run_single_attempt( ) if existing_session_id: session_id = existing_session_id - _wait_for_resume_idle(base_url, session_id, auth_token, workspace_dir, transcript) + _wait_for_resume_idle( + base_url, session_id, auth_token, workspace_dir, transcript, + liveness_check=liveness_check, + ) else: - session_id = create_session(base_url, str(args.phase), args.agent, model, auth_token, workspace_dir) + try: + session_id = create_session(base_url, str(args.phase), args.agent, model, auth_token, workspace_dir) + except OpenCodeRequestError as exc: + _record_codecome_event( + transcript, + "codecome.session.create_failed", + errorType=type(exc).__name__, + message=str(exc), + retriable=exc.retriable, + operation=exc.operation, + ) + if exc.retriable: + _record_codecome_event( + transcript, + "codecome.attempt.incomplete", + errorType=type(exc).__name__, + message=str(exc), + existingSession=False, + ) + return RunStatus.INCOMPLETE, "", RunResult( + last_finish_reason="server_unreachable", + last_session_id="", + ), transcript.path + raise _record_codecome_event( transcript, @@ -168,6 +266,7 @@ def _consume() -> None: auth_token, workspace_dir, render_event_fn=render_event_fn, event_loop_box=event_loop_box, + liveness_check=liveness_check, ) except Exception as exc: # noqa: BLE001 consume_error_box["error"] = exc @@ -195,6 +294,18 @@ def _consume() -> None: consumer.join(timeout=5.0) if consumer.is_alive(): _record_codecome_event(transcript, "codecome.event_loop.stop_timeout", sessionID=session_id) + if isinstance(exc, OpenCodeRequestError) and exc.retriable: + _record_codecome_event( + transcript, + "codecome.attempt.incomplete", + errorType=type(exc).__name__, + message=str(exc), + existingSession=bool(existing_session_id), + ) + return RunStatus.INCOMPLETE, session_id, RunResult( + last_finish_reason="server_unreachable", + last_session_id=session_id, + ), transcript.path raise _record_codecome_event(transcript, "codecome.prompt.send_completed", sessionID=session_id) consumer.join() @@ -212,6 +323,25 @@ def _consume() -> None: run_result = run_result_box.get("result") if not isinstance(run_result, RunResult): raise RuntimeError("Event loop ended without a RunResult") + if getattr(run_result, "session_stalled", False): + _record_codecome_event( + transcript, + "codecome.session.stalled", + sessionID=session_id, + existingSession=bool(existing_session_id), + ) + except ResumeSessionServerUnreachable as exc: + _record_codecome_event( + transcript, + "codecome.attempt.incomplete", + errorType=type(exc).__name__, + message=str(exc), + existingSession=bool(existing_session_id), + ) + return RunStatus.INCOMPLETE, existing_session_id or "", RunResult( + last_finish_reason="server_unreachable", + last_session_id=existing_session_id, + ), transcript.path except ResumeSessionNotReady as exc: _record_codecome_event( transcript, @@ -220,7 +350,7 @@ def _consume() -> None: message=str(exc), existingSession=bool(existing_session_id), ) - return 2, existing_session_id or "", RunResult( + return RunStatus.INCOMPLETE, existing_session_id or "", RunResult( last_finish_reason="resume_not_ready", last_session_id=existing_session_id, ), transcript.path @@ -239,8 +369,8 @@ def _consume() -> None: console.print(f"Fatal error: {exc}") except Exception: print(C.error(f"Fatal error: {exc}"), file=sys.stderr) - return 1, existing_session_id or "", RunResult(), transcript.path + return RunStatus.ERROR, existing_session_id or "", RunResult(), transcript.path finally: transcript.close() - return 0, session_id, run_result, transcript.path + return RunStatus.OK, session_id, run_result, transcript.path diff --git a/tools/codecome/session.py b/tools/codecome/session.py index f65bca39..531837d1 100644 --- a/tools/codecome/session.py +++ b/tools/codecome/session.py @@ -19,6 +19,20 @@ # Retry config for transient failures on prompt submission. _PROMPT_MAX_RETRIES = 3 _PROMPT_BACKOFF_SCHEDULE = (5.0, 15.0, 30.0) # seconds between retries +_SESSION_CREATE_MAX_RETRIES = 3 + + +class OpenCodeRequestError(RuntimeError): + """Raised when an opencode HTTP operation fails. + + ``retriable`` distinguishes transient control-plane failures (timeouts, + 5xx, 429, connection errors) from hard request/schema errors. + """ + + def __init__(self, message: str, *, retriable: bool, operation: str) -> None: + super().__init__(message) + self.retriable = retriable + self.operation = operation def _is_retriable_error(exc: Exception) -> bool: @@ -43,6 +57,18 @@ def _get_headers(auth_token: str | None, workspace_dir: str | None) -> dict[str, return headers +def _describe_http_error(prefix: str, exc: urllib.error.HTTPError) -> str: + body = "" + try: + body = exc.read().decode("utf-8", errors="replace").strip() + except Exception: + body = "" + detail = f"{prefix}: HTTP {exc.code}" + if body: + detail = f"{detail}: {body}" + return detail + + def send_prompt_to_session( base_url: str, session_id: str, @@ -100,16 +126,16 @@ def send_prompt_to_session( # All retries exhausted or non-retriable error if isinstance(last_exc, urllib.error.HTTPError): - body = "" - try: - body = last_exc.read().decode("utf-8", errors="replace").strip() - except Exception: - body = "" - detail = f"Failed to send prompt: HTTP {last_exc.code}" - if body: - detail = f"{detail}: {body}" - raise RuntimeError(detail) from last_exc - raise RuntimeError(f"Failed to send prompt: {last_exc}") from last_exc + raise OpenCodeRequestError( + _describe_http_error("Failed to send prompt", last_exc), + retriable=_is_retriable_error(last_exc), + operation="send_prompt", + ) from last_exc + raise OpenCodeRequestError( + f"Failed to send prompt: {last_exc}", + retriable=bool(last_exc and _is_retriable_error(last_exc)), + operation="send_prompt", + ) from last_exc def get_session_status( @@ -117,6 +143,7 @@ def get_session_status( session_id: str, auth_token: str | None, workspace_dir: str | None, + timeout: float = 5.0, ) -> str | None: """Best-effort lookup of an opencode session status type.""" req = urllib.request.Request( @@ -125,7 +152,7 @@ def get_session_status( method="GET", ) try: - with urllib.request.urlopen(req, timeout=5.0) as resp: + with urllib.request.urlopen(req, timeout=timeout) as resp: data = json.loads(resp.read().decode("utf-8")) except Exception: return None @@ -145,6 +172,27 @@ def get_session_status( return None +def is_server_healthy( + base_url: str, + auth_token: str | None, + workspace_dir: str | None, + timeout: float = 5.0, +) -> bool: + """Best-effort opencode health probe using the same auth context.""" + req = urllib.request.Request( + f"{base_url}/global/health", + headers=_get_headers(auth_token, workspace_dir), + method="GET", + ) + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + data = json.loads(resp.read().decode("utf-8")) + except Exception: + return False + + return isinstance(data, dict) and data.get("healthy") is True + + def create_session( base_url: str, phase: str, @@ -164,17 +212,50 @@ def create_session( payload["model"] = {"providerID": parts[0], "id": parts[1]} else: payload["model"] = {"id": model} - req = urllib.request.Request( - f"{base_url}/session", - data=json.dumps(payload).encode("utf-8"), - headers=_get_headers(auth_token, workspace_dir), - method="POST", - ) - resp = urllib.request.urlopen(req, timeout=10.0) - data = json.loads(resp.read().decode("utf-8")) + data_bytes = json.dumps(payload).encode("utf-8") + url = f"{base_url}/session" + last_exc: Exception | None = None + for attempt in range(_SESSION_CREATE_MAX_RETRIES): + req = urllib.request.Request( + url, + data=data_bytes, + headers=_get_headers(auth_token, workspace_dir), + method="POST", + ) + try: + resp = urllib.request.urlopen(req, timeout=10.0) + data = json.loads(resp.read().decode("utf-8")) + break + except Exception as exc: + last_exc = exc + if not _is_retriable_error(exc) or attempt == _SESSION_CREATE_MAX_RETRIES - 1: + if isinstance(exc, urllib.error.HTTPError): + raise OpenCodeRequestError( + _describe_http_error("Failed to create session", exc), + retriable=_is_retriable_error(exc), + operation="create_session", + ) from exc + raise OpenCodeRequestError( + f"Failed to create session: {exc}", + retriable=_is_retriable_error(exc), + operation="create_session", + ) from exc + backoff = _PROMPT_BACKOFF_SCHEDULE[min(attempt, len(_PROMPT_BACKOFF_SCHEDULE) - 1)] + time.sleep(backoff) + else: # pragma: no cover - loop always returns or raises + raise OpenCodeRequestError( + f"Failed to create session: {last_exc}", + retriable=True, + operation="create_session", + ) from last_exc + sid = str(data.get("id", "")) if not sid: - raise RuntimeError("Server returned empty session ID") + raise OpenCodeRequestError( + "Server returned empty session ID", + retriable=False, + operation="create_session", + ) return sid diff --git a/tools/codecome/status.py b/tools/codecome/status.py new file mode 100644 index 00000000..29ffead5 --- /dev/null +++ b/tools/codecome/status.py @@ -0,0 +1,29 @@ +# Copyright (C) 2025-2026 Pablo Ruiz García +# SPDX-License-Identifier: GPL-3.0-or-later OR AGPL-3.0-or-later + +"""Shared run status values for phase harnesses.""" + +from __future__ import annotations + +from enum import IntEnum + + +class RunStatus(IntEnum): + """Process-compatible status codes used by phase orchestration.""" + + OK = 0 + ERROR = 1 + INCOMPLETE = 2 + SERVER_UNREACHABLE = 3 + SESSION_STALLED = 4 + INTERRUPTED = 130 + + +def normalize_status(value: int | RunStatus) -> RunStatus: + """Return a known RunStatus, mapping unknown failures to ERROR.""" + if isinstance(value, RunStatus): + return value + try: + return RunStatus(value) + except ValueError: + return RunStatus.ERROR diff --git a/tools/events/base.py b/tools/events/base.py index 1c4ee123..7a898254 100644 --- a/tools/events/base.py +++ b/tools/events/base.py @@ -138,6 +138,35 @@ def _fetch_session_messages(self) -> list[Any]: return messages + def _fetch_session_status(self, timeout: float = 2.0) -> str | None: + """Best-effort lookup of this session's current opencode status. + + opencode's /session/status endpoint only returns non-idle sessions; if + this session is absent from the response map, treat that as idle. + """ + try: + req = urllib.request.Request( + f"{self.base_url}/session/status", + headers=self._get_headers(), + method="GET", + ) + with urllib.request.urlopen(req, timeout=timeout) as resp: + data = json.loads(resp.read().decode("utf-8")) + except Exception: # noqa: BLE001 + return None + + if not isinstance(data, dict): + return None + status = data.get(self.session_id) + if status is None: + return "idle" + if isinstance(status, dict): + status_type = status.get("type") + return status_type if isinstance(status_type, str) else None + if isinstance(status, str): + return status + return None + def _messages_to_events(self, messages: list[Any]) -> list[dict[str, Any]]: events: list[dict[str, Any]] = [] diff --git a/tools/events/phase_loop.py b/tools/events/phase_loop.py index aba10abb..f8b8df6c 100644 --- a/tools/events/phase_loop.py +++ b/tools/events/phase_loop.py @@ -12,6 +12,8 @@ from __future__ import annotations import dataclasses +import os +import time from typing import Any, Callable from events.sse_client import SseClient, SseClientError @@ -29,6 +31,7 @@ class RunResult: last_finish_tokens: dict[str, Any] = dataclasses.field(default_factory=dict) last_permission_error: str | None = None last_session_id: str | None = None + session_stalled: bool = False class PhaseEventLoop(BaseEventLoop): @@ -44,6 +47,7 @@ def __init__( *, auth_token: str | None = None, workspace_dir: str | None = None, + liveness_check: Callable[[], bool] | None = None, ) -> None: super().__init__(base_url, session_id, console, auth_token=auth_token, workspace_dir=workspace_dir) @@ -51,6 +55,33 @@ def __init__( self.label = label self._pending_recovery_sync = False self._idle_event_to_sync_and_emit: dict[str, Any] | None = None + # liveness_check reports whether the server process is still alive. + self._liveness_check = liveness_check + # Tracks the last session status observed on the SSE stream so we can + # decide whether a stream drop should be tolerated (still busy) or is a + # genuine end-of-turn. + self._session_busy = False + # No-progress watchdog: when a busy turn produces no meaningful SSE + # events for this long, treat the turn as stalled (hung model/provider) + # so the harness can restart the server and retry instead of waiting + # forever. server.heartbeat / server.connected do not count as progress. + self._stall_timeout_s = float(os.environ.get("CODECOME_BUSY_STALL_TIMEOUT", "180")) + # Optional diagnostic signal: opencode may pause server.heartbeat during + # valid long turns, so heartbeat-gap stalls are disabled by default. When + # explicitly enabled, they can flag runtime blockage sooner than the + # primary no-progress timeout. + self._heartbeat_stall_timeout_s = float( + os.environ.get("CODECOME_HEARTBEAT_STALL_TIMEOUT", "0") + ) + self._last_progress_at = time.monotonic() + self._session_stalled = False + self._session_idle_via_status = False + try: + self._status_probe_timeout_s = max( + 0.1, float(os.environ.get("CODECOME_RESUME_PROBE_TIMEOUT", "2")) + ) + except (TypeError, ValueError): + self._status_probe_timeout_s = 2.0 # ------------------------------------------------------------------ # Public API @@ -74,9 +105,23 @@ def run( reconnect=True, max_reconnects=10, on_reconnect=self.trigger_recovery_sync, + should_continue=self._should_keep_consuming, ) try: + def emit_finalized(finalized_events: list[dict[str, Any]]) -> None: + nonlocal _any_step_finish_seen, _step_finish_count + nonlocal _last_finish_reason, _last_finish_tokens + for fe in finalized_events: + sig = (fe.get("type", ""), fe.get("part", {}).get("id", "")) + if sig[1] and sig in self._emitted_signatures: + continue + self._emitted_signatures.add(sig) + _any_step_finish_seen, _step_finish_count, _last_finish_reason, _last_finish_tokens = self._update_result( + fe, _any_step_finish_seen, _step_finish_count, _last_finish_reason, _last_finish_tokens + ) + emit_event(render_fn, self.console, self.phase, self.label, fe) + for event in self._client.events(): if self._stopped: break @@ -84,6 +129,8 @@ def run( if not self._belongs_to_session(event): continue + self._note_progress(event) + if record_raw_event_fn is not None: record_raw_event_fn(event) @@ -97,6 +144,8 @@ def run( _last_permission_error = perm_err continue + self._track_session_busy(event) + _is_idle = self._is_session_idle(event) if _is_idle and self._idle_event_to_sync_and_emit is None: self._idle_event_to_sync_and_emit = event @@ -116,15 +165,7 @@ def run( ) ] - for fe in finalized_events: - sig = (fe.get("type", ""), fe.get("part", {}).get("id", "")) - if sig[1] and sig in self._emitted_signatures: - continue - self._emitted_signatures.add(sig) - _any_step_finish_seen, _step_finish_count, _last_finish_reason, _last_finish_tokens = self._update_result( - fe, _any_step_finish_seen, _step_finish_count, _last_finish_reason, _last_finish_tokens - ) - emit_event(render_fn, self.console, self.phase, self.label, fe) + emit_finalized(finalized_events) if self._is_session_idle(event): self._idle_event_to_sync_and_emit = None @@ -144,6 +185,27 @@ def run( except SseClientError: pass + if self._session_idle_via_status: + emit_finalized(self._sync_session_messages()) + idle_event = { + "type": "session.status", + "properties": { + "sessionID": self.session_id, + "status": {"type": "idle"}, + }, + } + if record_raw_event_fn is not None: + record_raw_event_fn(idle_event) + emit_event(render_fn, self.console, self.phase, self.label, idle_event) + return self._build_result( + _any_step_finish_seen, _step_finish_count, + _last_finish_reason, _last_finish_tokens, + _last_permission_error, self.session_id, + ) + + if self._session_stalled and _last_finish_reason not in ("stop", "idle"): + _last_finish_reason = "session_stalled" + return self._build_result( any_step_finish_seen=_any_step_finish_seen, step_finish_count=_step_finish_count, @@ -156,12 +218,92 @@ def run( def trigger_recovery_sync(self) -> None: self._pending_recovery_sync = True + def _track_session_busy(self, event: dict[str, Any]) -> None: + """Record the latest session busy/idle state seen on the SSE stream.""" + if event.get("type") == "session.status": + status_type = event.get("properties", {}).get("status", {}).get("type") + if status_type == "busy": + self._session_busy = True + elif status_type == "idle": + self._session_busy = False + elif event.get("type") == "session.idle": + self._session_busy = False + + # Events that are pure connection lifecycle / keepalive and do NOT count as + # turn progress for the stall watchdog. + _NON_PROGRESS_EVENTS = frozenset({"server.heartbeat", "server.connected"}) + + def _note_progress(self, event: dict[str, Any]) -> None: + """Reset the no-progress watchdog on any meaningful SSE event.""" + if event.get("type") in self._NON_PROGRESS_EVENTS: + return + self._last_progress_at = time.monotonic() + + def _stalled(self) -> bool: + """True once a busy turn looks hung. + + Primary signal: no meaningful (non-heartbeat) SSE event for + CODECOME_BUSY_STALL_TIMEOUT seconds. + + Secondary signal: heartbeats were observed on this connection and then + stopped for CODECOME_HEARTBEAT_STALL_TIMEOUT seconds. Because opencode + schedules heartbeats on the same runtime that runs the model turn, a + heartbeat gap is an early indicator the runtime has blocked. + """ + if self._stall_timeout_s > 0: + if (time.monotonic() - self._last_progress_at) >= self._stall_timeout_s: + return True + + if self._heartbeat_stall_timeout_s > 0 and self._client is not None: + since_hb = self._client.seconds_since_heartbeat() + if since_hb is not None and since_hb >= self._heartbeat_stall_timeout_s: + return True + + return False + + def _should_keep_consuming(self) -> bool: + """Whether the SSE client should keep reconnecting past its budget. + + A long model turn must never be abandoned: while the session is still + busy AND the server process is alive, keep consuming the stream so we + observe the genuine terminal idle instead of misreading a transient + stream drop as a mid-turn cutoff. + + However, a busy session that produces no meaningful events for + CODECOME_BUSY_STALL_TIMEOUT seconds is treated as a hung turn: stop + consuming so the harness can restart the server and retry. + """ + if not self._session_busy: + return False + if self._stalled(): + if self._status_probe_reports_idle(): + return False + self._session_stalled = True + return False + if self._liveness_check is None: + # No liveness signal available: be conservative and keep consuming + # while the session is busy (the prior behavior gave up too early). + return True + try: + return bool(self._liveness_check()) + except Exception: # noqa: BLE001 + return False + + def _status_probe_reports_idle(self) -> bool: + """Confirm whether a stale busy SSE state actually became idle.""" + status = self._fetch_session_status(timeout=self._status_probe_timeout_s) + if status == "idle": + self._session_busy = False + self._session_idle_via_status = True + return True + return False + # ------------------------------------------------------------------ # Phase-specific helpers # ------------------------------------------------------------------ - @staticmethod def _build_result( + self, any_step_finish_seen: bool, step_finish_count: int, last_finish_reason: str | None, @@ -176,6 +318,7 @@ def _build_result( last_finish_tokens=last_finish_tokens, last_permission_error=last_permission_error, last_session_id=last_session_id, + session_stalled=self._session_stalled, ) def _should_sync_session_messages(self, event: dict[str, Any]) -> bool: diff --git a/tools/events/sse_client.py b/tools/events/sse_client.py index defce1ea..6fd024b1 100644 --- a/tools/events/sse_client.py +++ b/tools/events/sse_client.py @@ -12,6 +12,8 @@ from __future__ import annotations import json +import os +import socket import time import urllib.request from typing import Any, Callable, Iterator @@ -22,13 +24,25 @@ _BACKOFF_MAX_S = 30.0 _BACKOFF_MULTIPLIER = 2.0 -# If no heartbeat for this long, treat as dead. -_HEARTBEAT_TIMEOUT_S = 15.0 - -# Read timeout for the SSE connection. +# Connect timeout for establishing the SSE connection. _SSE_READ_TIMEOUT_S = 30.0 +def _sse_read_tick() -> float: + """Wall-clock cadence (seconds) at which a silent-but-open stream wakes. + + opencode schedules its ``server.heartbeat`` on the Effect runtime; when a + model turn hangs, the runtime blocks and NO bytes flow on the open SSE + connection. A plain blocking line read would wait forever. We set this as an + explicit socket timeout so the read loop regains control every tick and can + re-evaluate whether to keep consuming (e.g. the stall watchdog). + """ + try: + return max(1.0, float(os.environ.get("CODECOME_SSE_READ_TICK", "10"))) + except (TypeError, ValueError): + return 10.0 + + import base64 def _build_sse_request(base_url: str, auth_token: str | None = None, workspace_dir: str | None = None) -> urllib.request.Request: @@ -67,6 +81,7 @@ def __init__( reconnect: bool = True, max_reconnects: int = 10, on_reconnect: Callable[[], None] | None = None, + should_continue: Callable[[], bool] | None = None, ) -> None: self.base_url = base_url.rstrip("/") self.auth_token = auth_token @@ -74,10 +89,16 @@ def __init__( self.reconnect = reconnect self.max_reconnects = max_reconnects self.on_reconnect = on_reconnect + # Optional predicate: when it returns True, the stream is kept alive + # even past the normal reconnect budget / heartbeat timeout. This lets a + # caller force continued consumption while the session is still busy and + # the server process is alive (a long model turn must not be abandoned). + self.should_continue = should_continue self._started = False self._stopped = False self._last_heartbeat = 0.0 + self._heartbeats_seen = False self._reconnect_count = 0 self._first_connection_done = False @@ -98,6 +119,18 @@ def events(self) -> Iterator[dict]: for event in self._open_stream(): if self._stopped: return + if event is None: + # Inactivity tick: the connection is open but no bytes + # arrived within the read tick. This is the ONLY chance + # to regain control on a silent-but-open stream (the + # opencode runtime can block and stop emitting events, + # including heartbeats). Let the caller decide whether to + # stop (e.g. stall watchdog) without treating it as an + # error or a reconnect. + if self.should_continue is not None and not self._should_continue_safe(): + self.stop() + return + continue self._on_event(event) if notify_reconnect and self.on_reconnect: self.on_reconnect() @@ -107,7 +140,7 @@ def events(self) -> Iterator[dict]: except SseClientError: if not self.reconnect or self._stopped: raise - if self._reconnect_count >= self.max_reconnects: + if self._reconnect_budget_exhausted(): raise SseClientError( f"SSE reconnect budget exhausted ({self.max_reconnects} attempts)" ) @@ -117,7 +150,7 @@ def events(self) -> Iterator[dict]: # Unexpected error during stream consumption. if not self.reconnect or self._stopped: raise SseClientError(f"SSE stream error: {exc}") from exc - if self._reconnect_count >= self.max_reconnects: + if self._reconnect_budget_exhausted(): raise SseClientError( f"SSE reconnect budget exhausted ({self.max_reconnects} attempts)" ) from exc @@ -132,21 +165,58 @@ def stop(self) -> None: # Internal # ------------------------------------------------------------------ + def _should_continue_safe(self) -> bool: + """Evaluate the should_continue predicate, defaulting to stop on error.""" + if self.should_continue is None: + return True + try: + return bool(self.should_continue()) + except Exception: # noqa: BLE001 + return False + + def _reconnect_budget_exhausted(self) -> bool: + """Return True only when we should stop retrying. + + Normally the budget is ``max_reconnects``. But if ``should_continue`` + reports the session is still working (busy + process alive), we keep + reconnecting indefinitely so a long model turn is never abandoned. + """ + if self._reconnect_count < self.max_reconnects: + return False + if self.should_continue is not None: + if self._should_continue_safe(): + # Keep going: reset the counter so backoff stays bounded. + self._reconnect_count = 0 + return False + return True + def _on_event(self, event: dict) -> None: """ Book-keeping on every consumed event. """ if event.get("type") == "server.heartbeat": self._last_heartbeat = time.time() + self._heartbeats_seen = True self._reconnect_count = 0 # Reset on successful read. - # Heartbeat timeout check. - elapsed = time.time() - self._last_heartbeat - if elapsed > _HEARTBEAT_TIMEOUT_S: - raise SseClientError( - f"No server heartbeat for {elapsed:.1f}s (timeout {_HEARTBEAT_TIMEOUT_S}s)" - ) + # A late non-heartbeat event is still useful progress. Do not raise here + # for stale heartbeats: opencode may pause server.heartbeat while a valid + # model/tool turn is still running, and raising would discard the very + # event that proves the stream is alive. Silence is handled by the + # read-tick + PhaseEventLoop stall watchdog instead. + + def seconds_since_heartbeat(self) -> float | None: + """Seconds since the last heartbeat, or None if none seen yet.""" + if not self._heartbeats_seen: + return None + return time.time() - self._last_heartbeat - def _open_stream(self) -> Iterator[dict]: - """ Open the SSE connection and yield parsed events. """ + def _open_stream(self) -> Iterator[dict | None]: + """ Open the SSE connection and yield parsed events. + + Yields ``None`` as an inactivity *tick* whenever no bytes arrive within + the read tick window, so the caller can re-evaluate liveness on a + silent-but-open connection (opencode can block its runtime and stop + emitting events entirely, including heartbeats). + """ req = _build_sse_request(self.base_url, self.auth_token, self.workspace_dir) try: resp = urllib.request.urlopen(req, timeout=_SSE_READ_TIMEOUT_S) @@ -155,12 +225,30 @@ def _open_stream(self) -> Iterator[dict]: except urllib.error.URLError as exc: raise SseClientError(f"Connection failed: {exc.reason}") from exc - # Read SSE lines. - buffer = [] + # Force a bounded per-read socket timeout so a silent stream surfaces as + # periodic socket.timeout ticks instead of blocking forever. + tick = _sse_read_tick() + self._set_stream_timeout(resp, tick) + + # Read SSE lines manually so we can convert read timeouts into ticks. + buffer: list[str] = [] try: - for byte_line in resp: + while True: if self._stopped: return + try: + byte_line = resp.readline() + except (socket.timeout, TimeoutError): + # No data within the tick window → emit an inactivity tick. + yield None + continue + except (urllib.error.URLError, ConnectionError, OSError) as exc: + raise SseClientError(f"SSE read error: {exc}") from exc + + if byte_line == b"": + # EOF: server closed the connection. + raise SseClientError("SSE stream closed by server") + line = byte_line.decode("utf-8", errors="replace").rstrip("\r\n") if not line: # Empty line → flush buffer. @@ -172,7 +260,26 @@ def _open_stream(self) -> Iterator[dict]: continue buffer.append(line) finally: - resp.close() + try: + resp.close() + except Exception: # noqa: BLE001 + pass + + @staticmethod + def _set_stream_timeout(resp: Any, timeout: float) -> None: + """Best-effort: set a per-read socket timeout on the response stream.""" + fp = getattr(resp, "fp", None) + sock = None + if fp is not None: + raw = getattr(fp, "raw", None) + sock = getattr(raw, "_sock", None) or getattr(fp, "_sock", None) + if sock is None: + sock = getattr(resp, "_sock", None) + if sock is not None and hasattr(sock, "settimeout"): + try: + sock.settimeout(timeout) + except Exception: # noqa: BLE001 + pass @staticmethod def _parse_buffer(lines: list[str]) -> dict | None: diff --git a/tools/opencode/serve.py b/tools/opencode/serve.py index f8eada19..ab50eee4 100644 --- a/tools/opencode/serve.py +++ b/tools/opencode/serve.py @@ -28,11 +28,12 @@ import socket import subprocess import sys +import threading import time import urllib.error import urllib.request from pathlib import Path -from typing import Any, Optional +from typing import Any, Callable, Optional ROOT = Path(__file__).resolve().parents[2] @@ -123,6 +124,24 @@ def _patch_json(base_url: str, path: str, payload: dict) -> dict: return json.loads(body) +def make_liveness_check(runner: "ServerRunner") -> "Callable[[], bool]": + """Return a callable reporting whether *runner*'s server process is alive. + + The returned check consults the OS child-process handle, which is the + authoritative liveness signal. HTTP probes (/session/status, /global/health) + can block and time out while the process is perfectly alive during a long + busy turn, so they must not be used to declare server death. + """ + + def _alive() -> bool: + info = runner.info + if info is None: + return False + return info.proc.poll() is None + + return _alive + + class ServerRunner: """ Spawn and manage a local opencode serve process. """ @@ -215,6 +234,7 @@ def start( log_path=log_path, password=password, ) + self._start_exit_monitor(proc, log_path) return self._info # If we reach here, this attempt failed. Kill and retry. @@ -243,6 +263,14 @@ def stop(self) -> None: self._kill(info.proc) self._info = None + def restart(self, **kwargs: Any) -> ServerInfo: + """Stop the current server and start a new one. + + Forwards *kwargs* to ``start()``. Returns the new ``ServerInfo``. + """ + self.stop() + return self.start(**kwargs) + @property def info(self) -> Optional[ServerInfo]: return self._info @@ -271,6 +299,29 @@ def _kill(proc: subprocess.Popen[Any]) -> None: except ProcessLookupError: pass + @staticmethod + def _start_exit_monitor(proc: subprocess.Popen[Any], log_path: Path) -> None: + """Spawn a daemon thread that logs the server process exit code.""" + def _monitor_child() -> None: + exit_code = proc.wait() + line = ( + f"opencode serve killed by signal {-exit_code} (exit code {exit_code})\n" + if exit_code < 0 + else f"opencode serve exited with code {exit_code}\n" + ) + try: + with open(log_path, "a") as f: + f.write(line) + except OSError: + pass + + t = threading.Thread( + target=_monitor_child, + name=f"serve-monitor-{proc.pid}", + daemon=True, + ) + t.start() + # ------------------------------------------------------------------ # Convenience CLI (not the primary entry point) diff --git a/tools/phases/completion.py b/tools/phases/completion.py index b0c4b4b9..1e31d3b1 100644 --- a/tools/phases/completion.py +++ b/tools/phases/completion.py @@ -485,6 +485,10 @@ def _resume_opener_for_reason(reason: str) -> str: treated as incomplete: - ``"infrastructure_error"`` — harness fatal-retry path. + - ``"server_unreachable"`` — the opencode server was unreachable or + unresponsive and was restarted. + - ``"session_stalled"`` — the model turn produced no activity for an + extended period (hung); the server was restarted. - A finish reason from ``rendering.events._FINISH_MID_TURN`` (e.g. ``"tool_use"``, ``"unknown"``) — the model/provider cut off mid-turn. - A finish reason from ``rendering.events._FINISH_BUDGET`` (e.g. @@ -507,6 +511,17 @@ def _resume_opener_for_reason(reason: str) -> str: if reason == "infrastructure_error": return "Your previous attempt failed with an infrastructure error and was retried." + if reason == "server_unreachable": + return ( + "The opencode server that was hosting your previous session died unexpectedly. " + "CodeCome has restarted the server. Your previous session was lost." + ) + if reason == "session_stalled": + return ( + "Your previous session stalled: the model turn produced no activity for an " + "extended period and did not complete. CodeCome has restarted the server. " + "Your previous session was lost." + ) if reason in _FINISH_MID_TURN: return ( f"Your previous run was cut off mid-turn (finish reason '{reason}') " From 0b13245978a7208c74940ccb6487676c7d71d2c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Ruiz=20Garc=C3=ADa?= Date: Tue, 30 Jun 2026 15:41:37 +0200 Subject: [PATCH 2/3] fix: redirect erlang crash dumps to ignored tmp --- templates/sandboxes/erlang-otp/docker-compose.yml | 2 ++ tests/test_sandbox_bootstrap.py | 10 ++++++++++ 2 files changed, 12 insertions(+) diff --git a/templates/sandboxes/erlang-otp/docker-compose.yml b/templates/sandboxes/erlang-otp/docker-compose.yml index 4d659b91..1caa9191 100644 --- a/templates/sandboxes/erlang-otp/docker-compose.yml +++ b/templates/sandboxes/erlang-otp/docker-compose.yml @@ -7,6 +7,8 @@ services: context: . dockerfile: Dockerfile working_dir: /workspace + environment: + ERL_CRASH_DUMP: /workspace/tmp/erl_crash.dump volumes: - ../:/workspace - ../.tools/codeql/current:/opt/codeql:ro diff --git a/tests/test_sandbox_bootstrap.py b/tests/test_sandbox_bootstrap.py index 7425f006..ba8d1897 100644 --- a/tests/test_sandbox_bootstrap.py +++ b/tests/test_sandbox_bootstrap.py @@ -67,6 +67,16 @@ def test_render_provenance_includes_compose_project_name(): assert "`phorge`" in content +def test_erlang_sandbox_redirects_crash_dump_to_ignored_tmp(): + compose = (ROOT / "templates" / "sandboxes" / "erlang-otp" / "docker-compose.yml").read_text( + encoding="utf-8" + ) + gitignore = (ROOT / ".gitignore").read_text(encoding="utf-8") + + assert "ERL_CRASH_DUMP: /workspace/tmp/erl_crash.dump" in compose + assert "tmp/*" in gitignore + + def test_opencode_json_allows_src_and_sandbox_env_reads(): # Read as plain JSON because this file is expected to be strict JSON. with (ROOT / "opencode.json").open("r", encoding="utf-8") as fh: From 7c2b8accf23238a8adc80c94cbf73aa813b486c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Ruiz=20Garc=C3=ADa?= Date: Thu, 2 Jul 2026 18:48:10 +0200 Subject: [PATCH 3/3] fix: address resilience review feedback --- tests/test_codecome_runner.py | 48 ++++++++++++++++++++++ tests/test_harness_recovery_restart.py | 15 +++++++ tests/test_runner_resume_health.py | 1 + tests/test_session.py | 28 ++++++++----- tests/test_sse_busy_resilience.py | 56 ++++++++++++++++++++++++++ tools/codecome/harness.py | 2 + tools/codecome/runner.py | 43 +++++++++++++++++++- tools/codecome/session.py | 4 +- tools/events/phase_loop.py | 22 +++++++--- tools/events/sse_client.py | 24 ++++++++++- 10 files changed, 222 insertions(+), 21 deletions(-) diff --git a/tests/test_codecome_runner.py b/tests/test_codecome_runner.py index a487c9b0..e4c60e84 100644 --- a/tests/test_codecome_runner.py +++ b/tests/test_codecome_runner.py @@ -194,6 +194,54 @@ def fake_send(*_a, **_kw): assert failed["properties"]["message"] == "Failed to send prompt: timed out" +def test_run_single_attempt_prompt_timeout_requires_consumer_exit(mock_args, mock_console, monkeypatch): + monkeypatch.setattr(runner, "create_session", lambda *a, **kw: "new_session") + monkeypatch.setattr(runner, "_consume_events", lambda *a, **kw: RunResult()) + + def fake_send(*_a, **_kw): + raise runner.OpenCodeRequestError( + "Failed to send prompt: timed out", + retriable=True, + operation="send_prompt", + ) + + class AliveThread: + def __init__(self, *args, **kwargs): + pass + + def start(self): + pass + + def join(self, timeout=None): + pass + + def is_alive(self): + return True + + monkeypatch.setattr(runner, "send_prompt_to_session", fake_send) + monkeypatch.setattr(runner.threading, "Thread", AliveThread) + monkeypatch.setenv("CODECOME_SSE_READ_TICK", "invalid") + + events = [] + fake_transcript = MagicMock(spec=Transcript) + fake_transcript.path = Path("fake.jsonl") + fake_transcript.write_event.side_effect = events.append + monkeypatch.setattr(Transcript, "for_phase", classmethod(lambda cls, p, f: fake_transcript)) + + code, session_id, _res, _path = runner._run_single_attempt( + mock_args, mock_console, "do work", "model", "var", + "http://base", "auth", "dir", lambda *a: None, + emit_fatal_error_fn=lambda *_a: None, + ) + + assert code == 1 + assert session_id == "" + event_types = [event["type"] for event in events] + assert "codecome.event_loop.stop_timeout" in event_types + assert "codecome.attempt.failed" in event_types + assert "codecome.attempt.incomplete" not in event_types + + def test_run_single_attempt_create_session_timeout_is_recoverable(mock_args, mock_console, monkeypatch): def fake_create(*_a, **_kw): raise runner.OpenCodeRequestError( diff --git a/tests/test_harness_recovery_restart.py b/tests/test_harness_recovery_restart.py index f96fcd14..c6f868e5 100644 --- a/tests/test_harness_recovery_restart.py +++ b/tests/test_harness_recovery_restart.py @@ -140,3 +140,18 @@ def test_recovery_shares_budget_and_exhausts(monkeypatch): # Budget exhausted after 2 restarts → non-zero terminal status. assert rc != 0 assert fake.restart_calls == 2 + + +def test_session_stalled_budget_exhaustion_is_non_success(monkeypatch): + fake = _FakeServerRunner() + from codecome import harness as harness_mod + transcript = _t(harness_mod) + monkeypatch.setenv("CODECOME_MAX_SERVER_RESTARTS", "0") + harness_mod = _wire(monkeypatch, fake, [ + (0, "ses_x", _stalled_result(), transcript), + ]) + + rc = harness_mod.run_phase_mode(_args()) + + assert rc != 0 + assert fake.restart_calls == 0 diff --git a/tests/test_runner_resume_health.py b/tests/test_runner_resume_health.py index 1c5f0bb5..b208bf59 100644 --- a/tests/test_runner_resume_health.py +++ b/tests/test_runner_resume_health.py @@ -86,6 +86,7 @@ def test_resume_status_none_with_live_process_does_not_kill_server(monkeypatch, assert not isinstance(excinfo.value, runner_mod.ResumeSessionServerUnreachable) event_types = [event["type"] for event in _events(transcript.path)] assert "codecome.resume.status_unavailable_process_alive" in event_types + assert "codecome.resume.blocked_unknown" not in event_types assert "codecome.resume.server_unreachable" not in event_types diff --git a/tests/test_session.py b/tests/test_session.py index 1b29ebf7..7ea26a8b 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -13,6 +13,13 @@ def _load_session_module(): return load_tool_module("codecome_session", "tools/codecome/session.py") +def _mock_json_response(payload: dict): + mock_resp = MagicMock() + mock_resp.read.return_value = json.dumps(payload).encode("utf-8") + mock_resp.__enter__.return_value = mock_resp + return mock_resp + + class TestGetHeaders: def test_no_auth_no_workspace(self): module = _load_session_module() @@ -45,8 +52,7 @@ class TestCreateSession: @patch("urllib.request.urlopen") def test_create_session_without_model(self, mock_urlopen): module = _load_session_module() - mock_resp = MagicMock() - mock_resp.read.return_value = json.dumps({"id": "sess-abc"}).encode("utf-8") + mock_resp = _mock_json_response({"id": "sess-abc"}) mock_urlopen.return_value = mock_resp sid = module.create_session("http://localhost:8080", "1", "recon", None, None, None) @@ -58,12 +64,12 @@ def test_create_session_without_model(self, mock_urlopen): assert payload["title"] == "CodeCome Phase 1" assert payload["agent"] == "recon" assert "model" not in payload + assert mock_resp.__exit__.called @patch("urllib.request.urlopen") def test_create_session_with_provider_model(self, mock_urlopen): module = _load_session_module() - mock_resp = MagicMock() - mock_resp.read.return_value = json.dumps({"id": "sess-xyz"}).encode("utf-8") + mock_resp = _mock_json_response({"id": "sess-xyz"}) mock_urlopen.return_value = mock_resp sid = module.create_session( @@ -78,8 +84,7 @@ def test_create_session_with_provider_model(self, mock_urlopen): @patch("urllib.request.urlopen") def test_create_session_with_bare_model(self, mock_urlopen): module = _load_session_module() - mock_resp = MagicMock() - mock_resp.read.return_value = json.dumps({"id": "sess-bare"}).encode("utf-8") + mock_resp = _mock_json_response({"id": "sess-bare"}) mock_urlopen.return_value = mock_resp sid = module.create_session( @@ -94,19 +99,20 @@ def test_create_session_with_bare_model(self, mock_urlopen): @patch("urllib.request.urlopen") def test_create_session_empty_id_raises(self, mock_urlopen): module = _load_session_module() - mock_resp = MagicMock() - mock_resp.read.return_value = json.dumps({"id": ""}).encode("utf-8") + mock_resp = _mock_json_response({"id": ""}) mock_urlopen.return_value = mock_resp - with pytest.raises(RuntimeError, match="empty session ID"): + with pytest.raises(module.OpenCodeRequestError, match="empty session ID") as excinfo: module.create_session("http://localhost:8080", "1", "recon", None, None, None) + assert excinfo.value.retriable is False + assert excinfo.value.operation == "create_session" + @patch("time.sleep") @patch("urllib.request.urlopen") def test_create_session_retries_on_timeout(self, mock_urlopen, mock_sleep): module = _load_session_module() - mock_resp = MagicMock() - mock_resp.read.return_value = json.dumps({"id": "sess-after-retry"}).encode("utf-8") + mock_resp = _mock_json_response({"id": "sess-after-retry"}) mock_urlopen.side_effect = [TimeoutError("timed out"), mock_resp] sid = module.create_session("http://localhost:8080", "1", "recon", None, None, None) diff --git a/tests/test_sse_busy_resilience.py b/tests/test_sse_busy_resilience.py index 0e44c51c..023b9fbf 100644 --- a/tests/test_sse_busy_resilience.py +++ b/tests/test_sse_busy_resilience.py @@ -110,6 +110,17 @@ def test_keep_consuming_false_when_busy_but_process_dead(): {"type": "session.status", "properties": {"status": {"type": "busy"}}} ) assert loop._should_keep_consuming() is False + assert loop._server_unreachable is True + + +def test_invalid_stall_env_values_fall_back(monkeypatch): + monkeypatch.setenv("CODECOME_BUSY_STALL_TIMEOUT", "180s") + monkeypatch.setenv("CODECOME_HEARTBEAT_STALL_TIMEOUT", "off") + + loop = _loop(liveness_check=lambda: True) + + assert loop._stall_timeout_s == 180.0 + assert loop._heartbeat_stall_timeout_s == 0.0 def test_keep_consuming_true_when_busy_and_no_liveness_signal(): @@ -452,3 +463,48 @@ def stop(self): assert result.last_finish_reason == "stop" assert any(event.get("type") == "step_finish" for event in rendered) assert recorded[-1]["properties"]["status"]["type"] == "idle" + + +def test_run_dead_process_reports_server_unreachable(monkeypatch): + class FakeSseClient: + def __init__(self, *args, should_continue=None, **kwargs): + self.should_continue = should_continue + + def events(self): + yield { + "type": "session.status", + "properties": { + "sessionID": "sess-1", + "status": {"type": "busy"}, + }, + } + if self.should_continue is not None: + self.should_continue() + return + + def seconds_since_heartbeat(self): + return None + + def stop(self): + pass + + monkeypatch.setattr("events.phase_loop.SseClient", FakeSseClient) + + loop = _loop(liveness_check=lambda: False) + result = loop.run(lambda *_a: None) + + assert result.last_finish_reason == "server_unreachable" + assert result.session_stalled is False + + +def test_set_stream_timeout_warns_once_when_socket_missing(monkeypatch, capsys): + import events.sse_client as sse_mod + + monkeypatch.setattr(sse_mod, "_STREAM_TIMEOUT_WARNING_EMITTED", False) + resp = object() + + SseClient._set_stream_timeout(resp, 1.0) + SseClient._set_stream_timeout(resp, 1.0) + + err = capsys.readouterr().err + assert err.count("could not set SSE read timeout") == 1 diff --git a/tools/codecome/harness.py b/tools/codecome/harness.py index c90d5f55..4b4cfa94 100644 --- a/tools/codecome/harness.py +++ b/tools/codecome/harness.py @@ -332,6 +332,8 @@ def _forward_signal(signum: int, _frame: Any) -> None: _recovery_reason = "session_stalled" if _recovery_reason is not None: + if returncode == RunStatus.OK: + returncode = RunStatus.INCOMPLETE if server_restart_count < max_server_restarts: server_restart_count += 1 out.warn( diff --git a/tools/codecome/runner.py b/tools/codecome/runner.py index 66414bd7..18748c57 100644 --- a/tools/codecome/runner.py +++ b/tools/codecome/runner.py @@ -121,6 +121,20 @@ def _wait_for_resume_idle( "codecome.resume.status_unavailable_process_alive", sessionID=session_id, ) + if time.monotonic() >= deadline: + _record_codecome_event( + transcript, + "codecome.resume.timeout", + sessionID=session_id, + status=None, + timeoutSeconds=timeout_s, + ) + raise ResumeSessionNotReady( + f"session {session_id} status endpoint unavailable (process alive) " + f"after {timeout_s:g}s; refusing to send resume prompt" + ) + time.sleep(max(poll_s, 0.1)) + continue else: _record_codecome_event( transcript, @@ -139,6 +153,20 @@ def _wait_for_resume_idle( "codecome.resume.status_unavailable_server_healthy", sessionID=session_id, ) + if time.monotonic() >= deadline: + _record_codecome_event( + transcript, + "codecome.resume.timeout", + sessionID=session_id, + status=None, + timeoutSeconds=timeout_s, + ) + raise ResumeSessionNotReady( + f"session {session_id} status endpoint unavailable (server healthy) " + f"after {timeout_s:g}s; refusing to send resume prompt" + ) + time.sleep(max(poll_s, 0.1)) + continue else: consecutive_unavailable += 1 if consecutive_unavailable >= server_unavailable_threshold: @@ -291,9 +319,14 @@ def _consume() -> None: loop.stop() except Exception: pass - consumer.join(timeout=5.0) + try: + stop_timeout = max(5.0, float(os.environ.get("CODECOME_SSE_READ_TICK", "10")) + 1.0) + except (TypeError, ValueError): + stop_timeout = 11.0 + consumer.join(timeout=stop_timeout) if consumer.is_alive(): _record_codecome_event(transcript, "codecome.event_loop.stop_timeout", sessionID=session_id) + raise RuntimeError("event loop thread did not stop after prompt send failure") from exc if isinstance(exc, OpenCodeRequestError) and exc.retriable: _record_codecome_event( transcript, @@ -330,6 +363,14 @@ def _consume() -> None: sessionID=session_id, existingSession=bool(existing_session_id), ) + if run_result.last_finish_reason == "server_unreachable": + _record_codecome_event( + transcript, + "codecome.session.server_unreachable", + sessionID=session_id, + existingSession=bool(existing_session_id), + ) + return RunStatus.INCOMPLETE, session_id, run_result, transcript.path except ResumeSessionServerUnreachable as exc: _record_codecome_event( transcript, diff --git a/tools/codecome/session.py b/tools/codecome/session.py index 531837d1..20a67bfb 100644 --- a/tools/codecome/session.py +++ b/tools/codecome/session.py @@ -223,8 +223,8 @@ def create_session( method="POST", ) try: - resp = urllib.request.urlopen(req, timeout=10.0) - data = json.loads(resp.read().decode("utf-8")) + with urllib.request.urlopen(req, timeout=10.0) as resp: + data = json.loads(resp.read().decode("utf-8")) break except Exception as exc: last_exc = exc diff --git a/tools/events/phase_loop.py b/tools/events/phase_loop.py index f8b8df6c..9346c910 100644 --- a/tools/events/phase_loop.py +++ b/tools/events/phase_loop.py @@ -65,16 +65,15 @@ def __init__( # events for this long, treat the turn as stalled (hung model/provider) # so the harness can restart the server and retry instead of waiting # forever. server.heartbeat / server.connected do not count as progress. - self._stall_timeout_s = float(os.environ.get("CODECOME_BUSY_STALL_TIMEOUT", "180")) + self._stall_timeout_s = self._float_env("CODECOME_BUSY_STALL_TIMEOUT", 180.0) # Optional diagnostic signal: opencode may pause server.heartbeat during # valid long turns, so heartbeat-gap stalls are disabled by default. When # explicitly enabled, they can flag runtime blockage sooner than the # primary no-progress timeout. - self._heartbeat_stall_timeout_s = float( - os.environ.get("CODECOME_HEARTBEAT_STALL_TIMEOUT", "0") - ) + self._heartbeat_stall_timeout_s = self._float_env("CODECOME_HEARTBEAT_STALL_TIMEOUT", 0.0) self._last_progress_at = time.monotonic() self._session_stalled = False + self._server_unreachable = False self._session_idle_via_status = False try: self._status_probe_timeout_s = max( @@ -205,6 +204,8 @@ def emit_finalized(finalized_events: list[dict[str, Any]]) -> None: if self._session_stalled and _last_finish_reason not in ("stop", "idle"): _last_finish_reason = "session_stalled" + if self._server_unreachable and _last_finish_reason not in ("stop", "idle"): + _last_finish_reason = "server_unreachable" return self._build_result( any_step_finish_seen=_any_step_finish_seen, @@ -218,6 +219,13 @@ def emit_finalized(finalized_events: list[dict[str, Any]]) -> None: def trigger_recovery_sync(self) -> None: self._pending_recovery_sync = True + @staticmethod + def _float_env(name: str, default: float) -> float: + try: + return float(os.environ.get(name, str(default))) + except (TypeError, ValueError): + return default + def _track_session_busy(self, event: dict[str, Any]) -> None: """Record the latest session busy/idle state seen on the SSE stream.""" if event.get("type") == "session.status": @@ -285,8 +293,12 @@ def _should_keep_consuming(self) -> bool: # while the session is busy (the prior behavior gave up too early). return True try: - return bool(self._liveness_check()) + alive = bool(self._liveness_check()) + if not alive: + self._server_unreachable = True + return alive except Exception: # noqa: BLE001 + self._server_unreachable = True return False def _status_probe_reports_idle(self) -> bool: diff --git a/tools/events/sse_client.py b/tools/events/sse_client.py index 6fd024b1..ec1fcf67 100644 --- a/tools/events/sse_client.py +++ b/tools/events/sse_client.py @@ -14,6 +14,7 @@ import json import os import socket +import sys import time import urllib.request from typing import Any, Callable, Iterator @@ -26,6 +27,15 @@ # Connect timeout for establishing the SSE connection. _SSE_READ_TIMEOUT_S = 30.0 +_STREAM_TIMEOUT_WARNING_EMITTED = False + + +def _warn_stream_timeout(message: str) -> None: + global _STREAM_TIMEOUT_WARNING_EMITTED + if _STREAM_TIMEOUT_WARNING_EMITTED: + return + _STREAM_TIMEOUT_WARNING_EMITTED = True + print(message, file=sys.stderr) def _sse_read_tick() -> float: @@ -275,11 +285,21 @@ def _set_stream_timeout(resp: Any, timeout: float) -> None: sock = getattr(raw, "_sock", None) or getattr(fp, "_sock", None) if sock is None: sock = getattr(resp, "_sock", None) + if sock is None: + _warn_stream_timeout( + "warning: could not set SSE read timeout; stream stalls may block indefinitely" + ) + return if sock is not None and hasattr(sock, "settimeout"): try: sock.settimeout(timeout) - except Exception: # noqa: BLE001 - pass + return + except Exception as exc: # noqa: BLE001 + _warn_stream_timeout(f"warning: could not set SSE read timeout: {exc}") + return + _warn_stream_timeout( + "warning: SSE stream object has no settimeout; stream stalls may block indefinitely" + ) @staticmethod def _parse_buffer(lines: list[str]) -> dict | None: