[codex] Accept ACP user messages during async turns#3376
Conversation
Co-authored-by: openhands <openhands@all-hands.dev>
Co-authored-by: openhands <openhands@all-hands.dev>
Python API breakage checks — ✅ PASSEDResult: ✅ PASSED |
REST API breakage checks (OpenAPI) — ✅ PASSEDResult: ✅ PASSED |
Coverage Report •
|
||||||||||||||||||||||||||||||||||||||||
all-hands-bot
left a comment
There was a problem hiding this comment.
Clean approach to the live-streaming problem: reusing the existing StreamingDeltaEvent / _pub_sub path for ACP deltas is exactly right — transient chunks don't belong in state.events, and the new helper _publish_streaming_delta_from_thread correctly encapsulates the cross-thread scheduling so neither the LLM token path nor the ACP path has to repeat the run_coroutine_threadsafe / suppress boilerplate. Test coverage is solid (wiring, unwiring, and actual delta delivery are all exercised). Two items worth a look before merge are called out below.
This review was generated by an AI agent (OpenHands) on behalf of the repository owner via OpenHands Automation.
dc3d0a3 to
ca04e8c
Compare
Co-authored-by: openhands <openhands@all-hands.dev>
|
Merged |
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Acceptable, but I found two ACP concurrency races that should be addressed before merge: one can replay a just-arrived user prompt, and one can let cancelled prompt updates leak into the next turn. Risk: 🟡 medium, since this changes conversation loop/cancellation behavior.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26368622109
Co-authored-by: openhands <openhands@all-hands.dev>
|
Addressed the two ACP async concurrency review items in 6c89026 and verified locally with ruff plus the full EventService test file (83 passed). Ready for another automated review.\n\n_This comment was generated by an AI agent (OpenHands) on behalf of the user._ |
Co-authored-by: openhands <openhands@all-hands.dev>
Co-authored-by: openhands <openhands@all-hands.dev>
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Acceptable direction, but I found a few ACP concurrency ordering gaps that should be addressed before merge. Risk: 🟡 medium because this changes async conversation loop/cancellation behavior.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26369467316
Co-authored-by: openhands <openhands@all-hands.dev>
|
Addressed the latest ACP concurrency review in c9bbb68: atomic async finalization, finish-gap queued-message reconciliation, and cancellation drain-before-failure ordering. Verified locally with ruff, pyright, and targeted ACP async tests.\n\n_This comment was generated by an AI agent (OpenHands) on behalf of the user._ |
all-hands-bot
left a comment
There was a problem hiding this comment.
This PR solves a real and difficult problem — allowing ACP conversations to accept user messages mid-turn without blocking the sender — and the implementation reflects careful thought about the concurrency model. After multiple iterative rounds, the critical races (drain ordering, double-cancel via wait_for, deferred restart for wedged sessions, FIFO cursor tracking, suffix-install clearing on forced restart) have all been addressed. The test coverage is substantive, with meaningful regression tests for each path.
Two minor items from prior review rounds remain unresolved (inline comments below), plus one fresh semantic observation. Overall the shape of the solution is sound and the PR is in good shape for merge.
This review was generated by an AI agent (OpenHands) on behalf of the user via OpenHands Automation.
all-hands-bot
left a comment
There was a problem hiding this comment.
✅ QA Report: PASS
The ACP live-message path now accepts a second user message during an in-flight async ACP turn, cancels/restarts the active ACP prompt, and streams ACP text chunks as transient deltas.
Does this PR achieve its stated goal?
Yes. I exercised the real EventService → LocalConversation.arun() → ACPAgent.astep() path with a stdio ACP subprocess. On current main, the second send_message(run=True) stayed blocked behind the first slow ACP prompt, no ACP cancel was sent, and no StreamingDeltaEvents were published. On the PR branch, the second message was persisted within 200ms, session/cancel was sent to the fake ACP server, the replacement prompt was processed, the conversation finished, and ACP text chunks arrived as StreamingDeltaEvents.
| Phase | Result |
|---|---|
| Environment Setup | ✅ uv sync --dev completed; branch rebuilt through uv run after checkout. |
| CI Status | ⏳ PR is open/blocked with 26 successes, 18 skipped checks, and 5 in-progress checks at inspection time (including this QA workflow and Docker manifest jobs). |
| Functional Verification | ✅ Real SDK/server-facing ACP execution path verified before/after; no tests or linters were run. |
Functional Verification
Test 1: ACP user message while an async turn is in-flight + ACP streaming deltas
Step 1 — Establish baseline on origin/main (2316c480) without this PR:
Ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_live_message.py using a temporary ACP subprocess that streams a text chunk, keeps the first prompt slow, and records received prompts/cancels:
{
"cancel_sent": false,
"fake_acp_log": "...PROMPT 1: first slow request
STREAMED 1: first slow request
PROMPT_TIMEOUT_FINISH 1: first slow request
PROMPT 2: second fast request
STREAMED 2: second fast request...",
"final_status": "ConversationExecutionStatus.FINISHED",
"second_send_completed_within_6s": false,
"second_send_elapsed_sec": 6.202,
"stream_deltas": [],
"user_messages_after_200ms": ["first slow request"]
}This confirms the baseline problem: the second message was not persisted within 200ms, the ACP server did not receive session/cancel, the second send_message(run=True) was still not complete after 6s, and streamed ACP text did not reach subscribers as StreamingDeltaEvents.
Step 2 — Apply the PR's changes:
Checked out codex/acp-live-message-deltas at 80dc346e32f0fb9a2676a5d105dcf2e944314fa2; uv run rebuilt the editable workspace packages.
Step 3 — Re-run with the fix in place:
Ran the same command, same fake ACP subprocess, and same user flow:
{
"cancel_sent": true,
"fake_acp_log": "...PROMPT 1: first slow request
STREAMED 1: first slow request
CANCEL qa-2624ba33
PROMPT_CANCELLED 1: first slow request
...PROMPT 1: second fast request
STREAMED 1: second fast request
PROMPT_FAST_FINISH 1: second fast request...",
"final_status": "ConversationExecutionStatus.FINISHED",
"second_send_completed_within_6s": true,
"second_send_elapsed_sec": 2.294,
"stream_deltas": [
{"content": "stream:first slow request", "reasoning_content": null},
{"content": "stream:second fast request", "reasoning_content": null}
],
"user_messages_after_200ms": ["first slow request", "second fast request"]
}This shows the PR fixes the user-visible behavior: the new message is persisted promptly while the ACP turn is still active, the active ACP prompt is cancelled/restarted, the replacement message is processed, and ACP text chunks stream through transient delta events.
Test 2: Condenser usage-id settings path
The PR description mentions carrying the condenser LLM usage-id fix from #3368. The current merge-base/current main already contains that fix, so there was no broken baseline to reproduce here. I still exercised the settings entry point on the PR branch with OpenHandsAgentSettings(..., condenser=CondenserSettings(enabled=True, max_size=40)) and observed distinct usage IDs:
{'agent_usage_id': 'default', 'condenser_usage_id': 'condenser', 'same_llm_object': False}This confirms the PR branch preserves the expected non-colliding condenser/agent LLM attribution behavior.
Issues Found
None.
This review was created by an AI agent (OpenHands) on behalf of the user.
Co-authored-by: openhands <openhands@all-hands.dev>
|
Addressed the latest review nits in 24ee53f: clarified the teardown-only RuntimeError suppression, documented the ACP prompt scan tradeoff, and made cancelled prompt futures report drained=False so restart protection remains enabled. Local validation: pre-commit on changed files and targeted ACP/EventService suites passed. This PR comment was generated by an AI agent (OpenHands) on behalf of the user. |
all-hands-bot
left a comment
There was a problem hiding this comment.
✅ QA Report: PASS
ACP live-message and streaming behavior worked in functional before/after probes; no QA issues found.
Does this PR achieve its stated goal?
Yes. The PR set out to let ACP conversations accept user messages during async turns, stream ACP assistant text as transient deltas, and send session/cancel on interrupted ACP prompts. I reproduced the old behavior on origin/main where send_message(run=True) stayed blocked behind an in-flight ACP turn and ACP string chunks produced no streaming deltas, then verified the PR branch processes the intervening message immediately, emits a StreamingDeltaEvent, and calls ACP cancel during interruption.
| Phase | Result |
|---|---|
| Environment Setup | ✅ uv run created/used the project environment and affected packages imported successfully. |
| CI Status | ⏳ Latest check snapshot: 20 successful, 0 failing, 8 pending, 4 skipped. |
| Functional Verification | ✅ Before/after SDK and agent-server probes confirm the claimed ACP behavior. |
Functional Verification
Test 1: ACP plain string token chunks stream as transient deltas
Step 1 — Reproduce baseline without the fix:
Ran git switch --detach origin/main && OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_streaming.py:
{"streaming_delta_content": null, "streaming_delta_count": 0, "streaming_delta_reasoning": null, "token_callback_count": 0}This confirms the old behavior: an ACP agent did not get a token callback wired, so a provider-style plain string chunk could not publish a StreamingDeltaEvent.
Step 2 — Apply the PR changes:
Checked out 24ee53f6a4dbd0404141d3925e408db0faf7a86e.
Step 3 — Re-run with the fix in place:
Ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_streaming.py:
{"streaming_delta_content": "ACP live text", "streaming_delta_count": 1, "streaming_delta_reasoning": null, "token_callback_count": 1}This shows the PR wires ACP token streaming and forwards a plain string chunk as one transient streaming delta.
Test 2: New user message during an in-flight ACP turn is accepted and processed
Step 1 — Reproduce baseline without the fix:
Ran git switch --detach origin/main && OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_live_message.py:
{"final_status": "ConversationExecutionStatus.PAUSED", "first_step_cancelled": true, "prompts_seen": ["initial request"], "second_prompt_processed": false, "send_completed_within_750ms": false, "send_elapsed_seconds": 0.75, "send_error": "timeout waiting for send_message(run=True)"}This reproduces the reported user-facing problem: while the ACP async turn is in-flight, the follow-up user message cannot complete promptly and never reaches a replacement ACP prompt.
Step 2 — Apply the PR changes:
Checked out 24ee53f6a4dbd0404141d3925e408db0faf7a86e.
Step 3 — Re-run with the fix in place:
Ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_live_message.py:
{"final_status": "ConversationExecutionStatus.FINISHED", "first_step_cancelled": true, "prompts_seen": ["initial request", "intervening"], "second_prompt_processed": true, "send_completed_within_750ms": true, "send_elapsed_seconds": 0.004, "send_error": null}This confirms the changed behavior: the first ACP turn is interrupted, the new message persists immediately, and the replacement run processes intervening instead of leaving it stuck in sending.
Test 3: Interrupted ACP turn sends session/cancel
Step 1 — Reproduce baseline without the fix:
Ran git switch --detach origin/main && OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_cancel.py:
{"cancel_called": false, "cancellation_propagated": true}This shows the old ACP cancellation path did not call the provider's cancel method.
Step 2 — Apply the PR changes:
Checked out 24ee53f6a4dbd0404141d3925e408db0faf7a86e.
Step 3 — Re-run with the fix in place:
Ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_cancel.py:
{"cancel_called": true, "cancellation_propagated": false}This confirms the PR sends ACP session/cancel when an in-flight async ACP prompt is interrupted.
Additional settings check
On the PR branch, I created an agent from LLMAgentSettings(llm=LLM(..., usage_id="agent_llm")) and observed distinct LLM usage IDs:
[("gpt-4o-mini", "agent_llm"), ("gpt-4o-mini", "condenser")]That confirms the condenser metrics identity is not colliding with the agent LLM in this flow.
Issues Found
None.
This review was created by an AI agent (OpenHands) on behalf of the user.
all-hands-bot
left a comment
There was a problem hiding this comment.
This is a significant, well-structured change. The core design decisions are sound: releasing the state lock around long ACP awaits, using _on_event_with_state_lock to serialize individual event emissions, shielding the portal future so cancel/drain can run after wait_for fires, the generation-based interrupt guard in EventService, and the FIFO message-cursor tracking via ACP_LAST_PROMPT_USER_MESSAGE_ID. Test coverage for the happy paths and several edge cases is solid.
Two items are called out below: one is a blocking-event-loop risk in the drain-timeout recovery path, and one is a minor code-duplication nit.
This review was generated by an AI agent (OpenHands) on behalf of the user via OpenHands Automation.
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Acceptable direction, but I found two remaining Stop/Pause races in the ACP restart path. Risk: 🟡 MEDIUM because this changes agent/conversation loop behavior; a human maintainer should decide eval coverage before merge.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26431325528
Co-authored-by: openhands <openhands@all-hands.dev>
|
Addressed the latest Stop/Pause restart races in 8b59878: send_message(run=True) now suppresses automatic restarts if an explicit interrupt lands during the append and passes the expected generation to every auto-run start; ACP arun now preserves PauseEvent requests made during the unlocked ACP step await after committing the completed prompt cursor. I replied to the two non-blocking suggestions and left them as-is to avoid adding complexity outside the cancellation-ordering fix. Local validation: pre-commit on changed files and targeted EventService/conversation tests passed. This PR comment was generated by an AI agent (OpenHands) on behalf of the user. |
all-hands-bot
left a comment
There was a problem hiding this comment.
✅ QA Report: PASS
Verified the PR’s primary ACP async-turn behavior end-to-end with a real EventService instance and a subprocess ACP provider: the new message is persisted promptly, the in-flight ACP turn is cancelled/restarted, ACP string chunks stream as deltas, and the replacement prompt finishes.
Does this PR achieve its stated goal?
Yes. The PR set out to make ACP conversations accept user messages during long async turns, stream ACP assistant text through transient deltas, and cancel/restart an in-flight ACP prompt so the new message is processed instead of staying stuck in sending. On main, the same user flow blocked for >4s, emitted no streaming deltas, did not send session/cancel, and never sent the second message to the ACP provider; on this PR, the second user message was visible in conversation state after 0.2s, send_message(run=True) completed in 2.274s, the fake ACP provider received cancel, the second prompt was sent, streaming deltas arrived for both prompts, and the conversation reached FINISHED.
| Phase | Result |
|---|---|
| Environment Setup | ✅ uv run rebuilt/imported local packages successfully; no tests/linters were run. |
| CI Status | 🟡 Snapshot: 20 successful, 8 pending, 4 skipped, 0 failing checks. |
| Functional Verification | ✅ Before/after harness reproduced the stuck baseline and verified ACP live-message + delta behavior on the PR. |
Functional Verification
Test 1: ACP user message during an in-flight async turn
I used temporary QA harnesses outside the repository (/tmp/qa_fake_acp_agent.py and /tmp/qa_acp_event_service.py). The fake ACP agent runs as a real subprocess over ACP stdio, emits AgentMessageChunk text, blocks the first prompt until session/cancel, then finishes when it receives second task. The client side uses the real EventService, LocalWorkspace, ACPAgent, subscription path, send_message(run=True), and conversation state.
Step 1 — Establish baseline without this PR:
Ran git checkout --detach origin/main && uv run python /tmp/qa_acp_event_service.py:
{
"cancel_count": 0,
"commit": "0c3c3e45",
"final_status_before_close": "ConversationExecutionStatus.PAUSED",
"first_delta_seen": false,
"prompt2_seen": false,
"prompt_starts": [
{"count": 1, "text": "first task"}
],
"second_message_visible_after_0_2s": false,
"second_send_completed_within_4s": false,
"second_send_elapsed_seconds": 4.205,
"streaming_deltas": []
}This confirms the baseline problem: while the first ACP prompt is in flight, the second user message is not persisted quickly, send_message(run=True) remains blocked past 4 seconds, no ACP session/cancel is sent, no replacement prompt reaches the provider, and ACP string chunks do not stream as StreamingDeltaEvents.
Step 2 — Apply the PR's changes:
Checked out codex/acp-live-message-deltas at 8b59878f0fc0fb6799ed5003e38ee5ec67b5cddf and reran the same harness.
Step 3 — Re-run with the fix in place:
Ran uv run python /tmp/qa_acp_event_service.py:
{
"cancel_count": 1,
"commit": "8b59878f",
"final_status_before_close": "ConversationExecutionStatus.FINISHED",
"first_delta_seen": true,
"prompt2_seen": true,
"prompt_starts": [
{"count": 1, "text": "first task"},
{"count": 1, "text": "second task"}
],
"second_message_visible_after_0_2s": true,
"second_send_completed_within_4s": true,
"second_send_elapsed_seconds": 2.274,
"streaming_deltas": [
"delta:1:first task",
"delta:1:second task"
]
}This verifies the changed behavior works through the real server/conversation path: the second message is persisted promptly while the ACP turn is still running, the in-flight prompt is interrupted via ACP cancel/restart, the replacement prompt is processed, ACP plain-string chunks are published as transient streaming deltas, and the conversation finishes.
Test 2: Settings-based condenser LLM usage ID
Ran a small SDK script through the public settings API on the PR branch:
uv run python -c "from openhands.sdk.settings.model import default_agent_settings, create_agent_from_settings; agent=create_agent_from_settings(default_agent_settings()); [print(i, llm.model, getattr(llm, 'usage_id', None)) for i, llm in enumerate(list(agent.get_all_llms()))]"Observed:
0 gpt-5.5 default
1 gpt-5.5 condenser
This confirms settings-created agents expose a distinct condenser LLM usage ID instead of colliding with the default agent LLM usage bucket.
Issues Found
None.
This review was created by an AI agent (OpenHands) on behalf of the user.
all-hands-bot
left a comment
There was a problem hiding this comment.
Overall
Solid implementation across a genuinely tricky surface area. The three-part design — releasing the state lock around long ACP awaits, streaming StreamingDeltaEvent from plain-string token callbacks, and the FIFO message cursor with supersede markers — holds together well. The iterative hardening through this review cycle has paid off: session/cancel now actually fires before re-wiring callbacks, close() now bumps _explicit_interrupt_generation, and the cancelled-prompt stop_reason == "cancelled" check is in place.
On the latest unresolved bot comments (posted ~03:22 UTC today)
Inspecting the current head (8b59878) against the six open bot threads:
close()not bumping generation (line 931 ofevent_service.py): Fixed —close()now sets_closing = True, increments_explicit_interrupt_generation, and clears both rerun flags before the run-task drain.- Drained prompt finalized without
stop_reasoncheck (acp_agent.pyline 1985): Fixed —_prompt_response_was_cancelled()is now called before_finalize_successful_turnin all drain paths. - Callbacks wired while
_cancel_inflight_tool_callsemits (acp_agent.pyline 1998): Fixed —_cancel_inflight_tool_callsnow captureson_event, calls_clear_turn_callbacks()first, then emits synthetic failures through the captured reference. - Generation snapshot race (
event_service.pysend_message): Mitigated — snapshot is taken before the firstawait; every subsequent async boundary (after_mark_running_acp_prompt_superseded, afterinterrupt, insiderun()) re-checks the generation before acting. - Concurrent
pause()between lock-release andastep()start (local_conversation.pyline 1178): Acceptable —LocalConversation.pause()emits aPauseEventunder the state lock while status is RUNNING, sopause_requested_during_acp_stepcatches it afterastep()completes. The ACP turn runs to natural completion before honoring the pause, which is the correct semantics for a remote subprocess. - Cursor committed after
astep()persists (local_conversation.pyline 1223): Both the FINISHED status write and cursor commit happen in the samewith self._state:block atomically from a concurrency perspective; crash-durability is out of scope here.
@neubig — please confirm these are intentionally addressed so those threads can be resolved.
This review was generated by an AI agent (OpenHands) on behalf of the user via OpenHands Automation.
all-hands-bot
left a comment
There was a problem hiding this comment.
✅ QA Report: PASS
Verified the PR’s primary ACP async-turn behavior end-to-end with a real EventService instance and a subprocess ACP provider: the new message is persisted promptly, the in-flight ACP turn is cancelled/restarted, ACP string chunks stream as deltas, and the replacement prompt finishes.
Does this PR achieve its stated goal?
Yes. The PR set out to make ACP conversations accept user messages during long async turns, stream ACP assistant text through transient deltas, and cancel/restart an in-flight ACP prompt so the new message is processed instead of staying stuck in sending. On main, the same user flow blocked for >4s, emitted no streaming deltas, did not send session/cancel, and never sent the second message to the ACP provider; on this PR, the second user message was visible in conversation state after 0.2s, send_message(run=True) completed in 2.274s, the fake ACP provider received cancel, the second prompt was sent, streaming deltas arrived for both prompts, and the conversation reached FINISHED.
| Phase | Result |
|---|---|
| Environment Setup | ✅ uv run rebuilt/imported local packages successfully; no tests/linters were run. |
| CI Status | 🟡 Snapshot: 20 successful, 8 pending, 4 skipped, 0 failing checks. |
| Functional Verification | ✅ Before/after harness reproduced the stuck baseline and verified ACP live-message + delta behavior on the PR. |
Functional Verification
Test 1: ACP user message during an in-flight async turn
I used temporary QA harnesses outside the repository (/tmp/qa_fake_acp_agent.py and /tmp/qa_acp_event_service.py). The fake ACP agent runs as a real subprocess over ACP stdio, emits AgentMessageChunk text, blocks the first prompt until session/cancel, then finishes when it receives second task. The client side uses the real EventService, LocalWorkspace, ACPAgent, subscription path, send_message(run=True), and conversation state.
Step 1 — Establish baseline without this PR:
Ran git checkout --detach origin/main && uv run python /tmp/qa_acp_event_service.py:
{
"cancel_count": 0,
"commit": "0c3c3e45",
"final_status_before_close": "ConversationExecutionStatus.PAUSED",
"first_delta_seen": false,
"prompt2_seen": false,
"prompt_starts": [
{"count": 1, "text": "first task"}
],
"second_message_visible_after_0_2s": false,
"second_send_completed_within_4s": false,
"second_send_elapsed_seconds": 4.205,
"streaming_deltas": []
}This confirms the baseline problem: while the first ACP prompt is in flight, the second user message is not persisted quickly, send_message(run=True) remains blocked past 4 seconds, no ACP session/cancel is sent, no replacement prompt reaches the provider, and ACP string chunks do not stream as StreamingDeltaEvents.
Step 2 — Apply the PR's changes:
Checked out codex/acp-live-message-deltas at 8b59878f0fc0fb6799ed5003e38ee5ec67b5cddf and reran the same harness.
Step 3 — Re-run with the fix in place:
Ran uv run python /tmp/qa_acp_event_service.py:
{
"cancel_count": 1,
"commit": "8b59878f",
"final_status_before_close": "ConversationExecutionStatus.FINISHED",
"first_delta_seen": true,
"prompt2_seen": true,
"prompt_starts": [
{"count": 1, "text": "first task"},
{"count": 1, "text": "second task"}
],
"second_message_visible_after_0_2s": true,
"second_send_completed_within_4s": true,
"second_send_elapsed_seconds": 2.274,
"streaming_deltas": [
"delta:1:first task",
"delta:1:second task"
]
}This verifies the changed behavior works through the real server/conversation path: the second message is persisted promptly while the ACP turn is still running, the in-flight prompt is interrupted via ACP cancel/restart, the replacement prompt is processed, ACP plain-string chunks are published as transient streaming deltas, and the conversation finishes.
Test 2: Settings-based condenser LLM usage ID
Ran a small SDK script through the public settings API on the PR branch:
uv run python -c "from openhands.sdk.settings.model import default_agent_settings, create_agent_from_settings; agent=create_agent_from_settings(default_agent_settings()); [print(i, llm.model, getattr(llm, 'usage_id', None)) for i, llm in enumerate(list(agent.get_all_llms()))]"Observed:
0 gpt-5.5 default
1 gpt-5.5 condenser
This confirms settings-created agents expose a distinct condenser LLM usage ID instead of colliding with the default agent LLM usage bucket.
Issues Found
None.
This review was created by an AI agent (OpenHands) on behalf of the user.
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Taste Rating: Acceptable, but I found a few remaining ACP cancellation/persistence edge cases inline. Risk: 🟡 MEDIUM because this changes ACP conversation-loop cancellation/restart behavior; please have a human maintainer decide eval coverage before approval.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26432380231
Co-authored-by: openhands <openhands@all-hands.dev>
|
Addressed the latest ACP cancellation/persistence review in 8fb9d70: cancel-send timeouts now quarantine/restart the ACP session, sync step() consumes the restart guard like astep(), restart failure handling is documented, and ACP cursor repair now covers ERROR resume state after a crash in the finish/cursor gap. I also replied to the prompt-scan/test-scope suggestions and kept them as-is to avoid complexity outside this cancellation fix. Local validation: pre-commit on changed files and targeted ACP/EventService/conversation suites passed. This PR comment was generated by an AI agent (OpenHands) on behalf of the user. |
all-hands-bot
left a comment
There was a problem hiding this comment.
Code Review: Accept ACP user messages during async turns
This is a well-engineered, iteratively hardened PR. After reviewing the current state of the diff (post all resolved threads), the implementation looks solid:
- The FIFO cursor (
ACP_LAST_PROMPT_USER_MESSAGE_ID/ACP_INFLIGHT_PROMPT_USER_MESSAGE_ID) correctly tracks which user message eachastepis responsible for, surviving crashes and cancellations. - The generation-counter guard in
EventService(_explicit_interrupt_generation) cleanly prevents ACP-internal reruns from overwriting explicit user stop intent. - The shielded-drain pattern (
asyncio.shield(asyncio.wrap_future(future))) inastepis the right approach: it lets the portal task quiesce before callbacks are rewired rather than abandoning a live future that could still fire events. - Session-restart safety (
_restart_session_on_next_turn,_restart_session_after_drain_timeout) correctly preserves the ACP session ID so conversation memory survives subprocess restarts. - Test coverage is excellent: FIFO ordering, finish-gap races, iteration-cap edge cases, double-cancel during drain, and suffix-marker clearing on fallback are all exercised.
Two minor observations below — no blocking issues.
This review was generated by an AI agent (OpenHands) on behalf of the user through OpenHands Automation.
all-hands-bot
left a comment
There was a problem hiding this comment.
✅ QA Report: PASS
The ACP EventService paths were exercised with a real fake ACP subprocess, and the PR fixes both live ACP streaming deltas and mid-turn user message handling.
Does this PR achieve its stated goal?
Yes. On main, ACP assistant chunks reached the provider but produced 0 StreamingDeltaEvents, and a second user message sent during an in-flight ACP prompt timed out after 8 seconds without reaching the ACP provider. On PR commit 8fb9d70, the same flows produced two streaming deltas and accepted the second message in ~2.0s, sent session/cancel, restarted/resumed the ACP session, and delivered the corrective prompt to the provider.
| Phase | Result |
|---|---|
| Environment Setup | ✅ make build completed successfully with uv sync --dev |
| CI Status | 🟡 No failing checks observed; core checks were green, with image build and OpenHands QA/review jobs still in progress when checked |
| Functional Verification | ✅ ACP streaming and mid-turn message behavior verified through EventService with an ACP subprocess |
Functional Verification
Test 1: ACP assistant chunks stream as transient deltas
Step 1 — Establish baseline without the fix:
Checked out origin/main (0c3c3e45) and ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_stream_delta.py:
{
"delta_count": 0,
"deltas": [],
"status": "ConversationExecutionStatus.FINISHED",
"log": [
"START mode=stream",
"INITIALIZE",
"NEW_SESSION",
"PROMPT 1: stream this ACP reply"
]
}The fake ACP provider received and completed the prompt, but subscribers received no StreamingDeltaEvents.
Step 2 — Apply the PR's changes:
Checked out PR commit 8fb9d70af5e58eb85460c95640a149109a860c0d.
Step 3 — Re-run with the fix in place:
Ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_stream_delta.py:
{
"delta_count": 2,
"deltas": [
"chunk-1:stream this ACP reply",
"done-1"
],
"status": "ConversationExecutionStatus.FINISHED",
"log": [
"START mode=stream",
"INITIALIZE",
"NEW_SESSION",
"PROMPT 1: stream this ACP reply"
]
}This confirms ACP text chunks now publish transient streaming deltas while the turn still finishes normally.
Test 2: User message during in-flight ACP turn is accepted and supersedes the old prompt
Step 1 — Reproduce baseline without the fix:
Checked out origin/main and ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_midturn_message.py against a fake ACP prompt that blocks until session/cancel:
{
"second_send_returned": false,
"second_send_error": "TimeoutError",
"second_send_elapsed_seconds": 8.008,
"saw_second_prompt": false,
"log": [
"START mode=block",
"INITIALIZE",
"NEW_SESSION",
"PROMPT 1: first long request"
]
}This reproduces the bug: the second user message could not be persisted/processed while the ACP prompt was in flight.
Step 2 — Apply the PR's changes:
Checked out PR commit 8fb9d70af5e58eb85460c95640a149109a860c0d.
Step 3 — Re-run with the fix in place:
Ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_midturn_message.py:
{
"second_send_returned": true,
"second_send_error": null,
"second_send_elapsed_seconds": 2.011,
"saw_second_prompt": true,
"log": [
"START mode=block",
"INITIALIZE",
"NEW_SESSION",
"PROMPT 1: first long request",
"CANCEL",
"PROMPT 1 OBSERVED_CANCEL",
"START mode=block",
"INITIALIZE",
"LOAD_SESSION",
"PROMPT 1: second corrective request"
]
}This verifies the server-side behavior a user needs: the follow-up message no longer stays stuck, the in-flight ACP turn is cancelled, the ACP session is resumed, and the corrective message is sent to the provider.
Additional sanity check: condenser usage IDs
On the PR commit, creating the default agent through settings produced separate usage IDs: agent LLM usage_id='default' and condenser LLM usage_id='condenser', matching the metrics-collision fix described in the PR.
Issues Found
None.
This review was created by an AI agent (OpenHands) on behalf of the user.
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Acceptable direction, but I found a few remaining ACP restart/resume races inline. Risk: 🟡 MEDIUM because this changes ACP conversation-loop cancellation/resume behavior; a human maintainer should decide eval coverage before approval.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26433203300
Co-authored-by: openhands <openhands@all-hands.dev>
Co-authored-by: openhands <openhands@all-hands.dev>
|
Addressed the latest restart/resume review in 0418ccf and da40f36: queued automatic reruns now honor the captured explicit-interrupt generation for IDLE restarts too, the unsafe ERROR-state ACP cursor promotion was removed, the shielded interrupt wait is documented, and arun now registers its task/cancellation token before agent initialization so interrupts during slow ACP setup cancel the run instead of being overwritten. Local validation: pre-commit on changed files and targeted EventService/conversation suites passed. This PR comment was generated by an AI agent (OpenHands) on behalf of the user. |
all-hands-bot
left a comment
There was a problem hiding this comment.
This is a sophisticated piece of async-concurrency engineering. The FIFO cursor approach for ACP message delivery is correct: LocalConversation.arun() owns the cursor, snapshots the exact MessageEvent under the state lock, passes it into astep(), and commits the advance only on clean completion. The session cancel/drain protocol — shield → session/cancel → bounded drain → conditional session restart — correctly addresses the races called out in prior review rounds. The generation counter in EventService elegantly prevents an internal ACP interrupt from overriding a subsequent explicit user stop.
All prior review threads are resolved. Two new observations below.
This review was generated by an AI agent (OpenHands) on behalf of the user through OpenHands Automation.
| """ | ||
| if not self._conversation: | ||
| return (False, False) | ||
| if self._run_task is None: |
There was a problem hiding this comment.
🟡 Suggestion: The guard exits early when _run_task is None, but a task that has already finished (_run_task.done()) can still reach the execution_status check inside the lock. If the task completes between this guard and the with self._conversation._state as state: acquire, the status could briefly remain RUNNING — the inner execution_status != RUNNING check would catch it, but the intent is clearer with an explicit done() guard here:
| if self._run_task is None: | |
| if self._run_task is None or self._run_task.done(): | |
| return (False, False) |
| await self._publish_state_update() | ||
|
|
||
| async def interrupt(self): | ||
| async def interrupt(self, *, internal_acp_rerun: bool = False): |
There was a problem hiding this comment.
🟡 Suggestion: internal_acp_rerun: bool = False is an implementation detail of the ACP supersede path now exposed on the public interrupt() signature. Any external caller (or future test helper) that accidentally passes interrupt(internal_acp_rerun=True) will silently bypass the generation-counter increment and rerun-flag clearing, making generation tracking unsafe. Consider moving the internal bypass to a private helper (e.g., _interrupt_for_acp_rerun()) that send_message calls directly, and keeping the public interrupt() signature clean.
all-hands-bot
left a comment
There was a problem hiding this comment.
✅ QA Report: PASS
Manual QA with a real ACP stdio provider confirms ACP text streaming and live user-message interruption/restart now work end-to-end through EventService.
Does this PR achieve its stated goal?
Yes. The PR set out to stream ACP assistant text and accept/reprocess user messages that arrive during long async ACP turns. On main, the fake ACP provider emitted a chunk but EventService subscribers saw no StreamingDeltaEvent, and a second send_message(run=True) during an in-flight ACP prompt timed out after 6s without sending ACP session/cancel. On this PR, the same flows produced a streaming delta, sent ACP cancel, resumed the session, and processed the intervening user correction.
| Phase | Result |
|---|---|
| Environment Setup | ✅ Dependencies were already bootstrapped with uv sync --dev; uv warned that the external VIRTUAL_ENV was ignored in favor of .venv. |
| CI Status | ✅ Core checks are green; pr-review and qa-changes were still in progress at report time. |
| Functional Verification | ✅ Real ACP stdio provider exercised through the SDK/agent-server path on both main and the PR branch. |
Functional Verification
Used a temporary ACP stdio provider (/tmp/qa_fake_acp_server.py) and drove it through EventService (/tmp/qa_acp_event_service.py) rather than calling internals directly. No test suite, linters, type checkers, or pre-commit hooks were run.
Test 1: ACP plain text chunks stream as transient deltas
Step 1 — Reproduce / establish baseline on main:
Ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_event_service.py streaming --expect base:
MODE=streaming
EXPECT=base
ACP_LOG_KINDS=['connected', 'initialize', 'new_session', 'prompt', 'chunk']
STREAMING_DELTAS=[]
BASELINE_NO_DELTA_REPRODUCED=True
RESULT=BASELINE_REPRODUCED
This confirms the fake ACP provider emitted an assistant text chunk, but the server-side event stream did not publish any StreamingDeltaEvent on main.
Step 2 — Apply the PR's changes:
Checked out codex/acp-live-message-deltas at da40f3689e477bf5f1bc31f9228ab4b5b966f459.
Step 3 — Re-run with the fix in place:
Ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_event_service.py streaming --expect pr:
MODE=streaming
EXPECT=pr
ACP_LOG_KINDS=['connected', 'initialize', 'new_session', 'prompt', 'chunk']
STREAMING_DELTAS=['streaming reply for: hello stream']
RESULT=PASS
This confirms ACP plain-string/chunk text is now forwarded to subscribers as a transient streaming delta.
Test 2: User message during an in-flight async ACP turn interrupts and restarts
Step 1 — Reproduce / establish baseline on main:
Ran OPENHANDS_SUPPRESS_BANNER=1 timeout 35s uv run python /tmp/qa_acp_event_service.py live-message --expect base:
MODE=live-message-during-async-turn
EXPECT=base
SECOND_SEND_TIMED_OUT=True
SECOND_SEND_ELAPSED_MS=6006
CANCEL_SENT_TO_ACP=False
ACP_LOG=[{'kind': 'connected'}, {'kind': 'initialize', 'protocol_version': 1}, {'cwd': '/tmp/qa-acp-live-myn92rd8/workspace', 'kind': 'new_session', 'session_id': 'qa-session'}, {'count': 1, 'kind': 'prompt', 'text': 'initial request'}]
BASELINE_BLOCKED_REPRODUCED=True
RESULT=BASELINE_REPRODUCED
This confirms the old behavior: while the first ACP prompt was in flight, the replacement user message remained blocked long enough to time out and ACP never received session/cancel.
Step 2 — Apply the PR's changes:
Checked out codex/acp-live-message-deltas at da40f3689e477bf5f1bc31f9228ab4b5b966f459.
Step 3 — Re-run with the fix in place:
Ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_event_service.py live-message --expect pr:
MODE=live-message-during-async-turn
EXPECT=pr
SECOND_SEND_TIMED_OUT=False
SECOND_SEND_ELAPSED_MS=2012
CANCEL_SENT_TO_ACP=True
ACP_LOG=[..., {'kind': 'cancel', 'session_id': 'qa-session'}, ..., {'cwd': '/tmp/qa-acp-live-l1vxm91_/workspace', 'kind': 'load_session', 'session_id': 'qa-session'}, {'count': 1, 'kind': 'prompt', 'text': 'intervening correction'}, {'kind': 'chunk', 'text': 'second processed: intervening correction'}]
EVENT_SUMMARY={'streaming_deltas': ['cancel acknowledged for: initial request', 'second processed: intervening correction'], 'finish_messages': [], 'agent_messages': []}
RESULT=PASS
This confirms the PR path sends ACP session/cancel, restarts/resumes the ACP session after the first turn is superseded, and processes the new user message instead of leaving it stuck.
Issues Found
None.
This review was created by an AI agent (OpenHands) on behalf of the user.
|
Can the PR description be updated?
Can we add utils to extract shared ACP/non-ACP arun logic to avoid future drifts?
|
Summary
StreamingDeltaEvents, including ACP providers that invoke token callbacks with plain string chunksastep()awaits so websocket/API user messages can be persisted immediatelysession/cancelwhen an in-flight turn is interruptedRegression Tests
test_acp_string_token_callback_publishes_deltafails on the prior implementation because ACP plain-string token callbacks raised before publishing anyStreamingDeltaEventtest_acp_arun_accepts_user_message_while_step_is_in_flightfails on main becausesend_message()waits behind the in-flight ACP prompttests/sdk/test_settings.py -k condensercovers the copied condenser LLM usage-id behavior from fix(sdk): assign condenser LLM usage id #3368Validation
uv run pytest -q tests/sdk/test_settings.py -k condenseruv run pytest -q tests/agent_server/test_event_streaming.py tests/agent_server/test_event_service.py::TestEventServiceSendMessage tests/sdk/agent/test_acp_agent.py::TestACPAgentAstep tests/sdk/conversation/local/test_conversation_send_message.pyuv run ruff check openhands-sdk/openhands/sdk/settings/model.py tests/sdk/test_settings.py openhands-agent-server/openhands/agent_server/event_service.py tests/agent_server/test_event_streaming.pyAgent Server images for this PR
• GHCR package: https://github.com/OpenHands/agent-sdk/pkgs/container/agent-server
Variants & Base Images
eclipse-temurin:17-jdknikolaik/python-nodejs:python3.13-nodejs22-slimgolang:1.21-bookwormPull (multi-arch manifest)
# Each variant is a multi-arch manifest supporting both amd64 and arm64 docker pull ghcr.io/openhands/agent-server:0418ccf-pythonRun
All tags pushed for this build
About Multi-Architecture Support
0418ccf-python) is a multi-arch manifest supporting both amd64 and arm640418ccf-python-amd64) are also available if needed