Skip to content

fix: sub-agent stream stall timeout with heartbeat#51

Open
zyma wants to merge 1 commit into
ringier-data:mainfrom
Authrion:fix/subagent-stall-timeout
Open

fix: sub-agent stream stall timeout with heartbeat#51
zyma wants to merge 1 commit into
ringier-data:mainfrom
Authrion:fix/subagent-stall-timeout

Conversation

@zyma
Copy link
Copy Markdown
Contributor

@zyma zyma commented Jun 5, 2026

Problem: The orchestrator consumed a sub-agent's stream with a plain async for. If the sub-agent went silent (stuck LLM call, dead provider, MCP error, DB lock), the loop hung forever with no output and no signal of what was stuck.

How it was fixed: Added a wrapper that puts a per-item stall timeout around the stream. It logs a heartbeat every 30s while waiting (without cancelling the in-flight read), and after a 300s hard cap it closes the stream and emits one ErrorEvent instead of hanging. Both timeouts are env-tunable, and each dispatch gets a unique id threaded through all logs + the error so parallel sub-agent calls can be told apart.

Checklist

  • I have performed a self-review of my code
  • If it is a core feature and/or a bugfix, I have added thorough tests.
  • Does this PR introduce a breaking change? 🚨

Copy link
Copy Markdown
Contributor

@aartaria aartaria left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix is incompatible with the current main and fails at runtime on every real sub-agent dispatch:

File "agent_common/core/graph_utils.py", line 224, in isolate_parent_stream_context
    var_child_runnable_config.reset(token)
ValueError: <Token var=<ContextVar name='child_runnable_config'> was created in a different Context

Why it crashes

main wraps the sub-agent's agent.astream() call with two context managers (denest_parent_pregel_context, isolate_parent_stream_context in graph_utils.py) that call var.set() on entry and var.reset(token) on exit. Python's contextvars.Token is bound to the exact context object where set() was called — reset() must be called from that same object, not a copy.

Your approach calls asyncio.create_task(anext(iterator)) for each item in the loop. asyncio.create_task() always gives the new task a copy of the current context. When var.set() fires in task copy N and var.reset(token) fires in task copy N+1 (next tick or during generator cleanup), the mismatch raises the ValueError above. This happens on the first real sub-agent call.

main also sets a current_attachments_backend ContextVar (attachments_store.py) at the start of _astream_impl and resets it in the finally block. The same task-copy mismatch breaks the attachment feature mid-stream and during cleanup, corrupting the per-turn attachment context across multiple turns.

parallel ``task`` tool calls), so operators can group log lines by dispatch
and downstream alerting can correlate logs to events.
"""
iterator = stream_iter.__aiter__() if hasattr(stream_iter, "__aiter__") else stream_iter
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
iterator = stream_iter.__aiter__() if hasattr(stream_iter, "__aiter__") else stream_iter
iterator = aiter(stream_iter)

# Wrap the awaitable in a Task so heartbeat ticks can observe it
# without cancelling it. Cancellation of the in-flight __anext__
# only happens once the hard cap is exceeded.
pending = asyncio.ensure_future(iterator.__anext__())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pending = asyncio.ensure_future(iterator.__anext__())
pending = asyncio.ensure_future(anext(iterator))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants