fix: sub-agent stream stall timeout with heartbeat#51
Conversation
aartaria
left a comment
There was a problem hiding this comment.
This fix is incompatible with the current main and fails at runtime on every real sub-agent dispatch:
File "agent_common/core/graph_utils.py", line 224, in isolate_parent_stream_context
var_child_runnable_config.reset(token)
ValueError: <Token var=<ContextVar name='child_runnable_config'> was created in a different Context
Why it crashes
main wraps the sub-agent's agent.astream() call with two context managers (denest_parent_pregel_context, isolate_parent_stream_context in graph_utils.py) that call var.set() on entry and var.reset(token) on exit. Python's contextvars.Token is bound to the exact context object where set() was called — reset() must be called from that same object, not a copy.
Your approach calls asyncio.create_task(anext(iterator)) for each item in the loop. asyncio.create_task() always gives the new task a copy of the current context. When var.set() fires in task copy N and var.reset(token) fires in task copy N+1 (next tick or during generator cleanup), the mismatch raises the ValueError above. This happens on the first real sub-agent call.
main also sets a current_attachments_backend ContextVar (attachments_store.py) at the start of _astream_impl and resets it in the finally block. The same task-copy mismatch breaks the attachment feature mid-stream and during cleanup, corrupting the per-turn attachment context across multiple turns.
| parallel ``task`` tool calls), so operators can group log lines by dispatch | ||
| and downstream alerting can correlate logs to events. | ||
| """ | ||
| iterator = stream_iter.__aiter__() if hasattr(stream_iter, "__aiter__") else stream_iter |
There was a problem hiding this comment.
| iterator = stream_iter.__aiter__() if hasattr(stream_iter, "__aiter__") else stream_iter | |
| iterator = aiter(stream_iter) |
| # Wrap the awaitable in a Task so heartbeat ticks can observe it | ||
| # without cancelling it. Cancellation of the in-flight __anext__ | ||
| # only happens once the hard cap is exceeded. | ||
| pending = asyncio.ensure_future(iterator.__anext__()) |
There was a problem hiding this comment.
| pending = asyncio.ensure_future(iterator.__anext__()) | |
| pending = asyncio.ensure_future(anext(iterator)) |
Problem: The orchestrator consumed a sub-agent's stream with a plain async for. If the sub-agent went silent (stuck LLM call, dead provider, MCP error, DB lock), the loop hung forever with no output and no signal of what was stuck.
How it was fixed: Added a wrapper that puts a per-item stall timeout around the stream. It logs a heartbeat every 30s while waiting (without cancelling the in-flight read), and after a 300s hard cap it closes the stream and emits one ErrorEvent instead of hanging. Both timeouts are env-tunable, and each dispatch gets a unique id threaded through all logs + the error so parallel sub-agent calls can be told apart.
Checklist