Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
0a180f1
fix(sdk): assign condenser LLM usage id
openhands-agent May 23, 2026
43032a6
fix(sdk): reset condenser LLM metrics
openhands-agent May 24, 2026
ca04e8c
fix(acp): accept user messages during async turns
neubig May 24, 2026
f50ed72
fix(agent-server): stream ACP text deltas
neubig May 24, 2026
71f8148
fix(acp): interrupt running turn on new user message
neubig May 24, 2026
b9dc93b
fix(agent-server): handle ACP string token deltas
neubig May 24, 2026
b336d05
merge main into PR #3376
openhands-agent May 24, 2026
6c89026
fix: address ACP async turn races (#3376)
openhands-agent May 24, 2026
49583a2
fix: satisfy ACP prompt future typing (#3376)
openhands-agent May 24, 2026
55b6c38
test: update ACP arun prompt snapshot test (#3376)
openhands-agent May 24, 2026
c9bbb68
fix: close ACP async ordering gaps (#3376)
openhands-agent May 24, 2026
ce68215
fix: address ACP cancellation edge cases (#3376)
openhands-agent May 24, 2026
f581a01
fix: address ACP review edge cases (#3376)
openhands-agent May 24, 2026
f1ca28d
Merge branch 'pr-3368-fix-condenser-usage-id' into codex/acp-live-mes…
neubig May 24, 2026
ffeb881
fix: close ACP rerun race windows (#3376)
openhands-agent May 24, 2026
c1c37ca
Merge branch 'main' into codex/acp-live-message-deltas
neubig May 24, 2026
1c390af
fix(acp): reassign agent state for prompt tracking
neubig May 24, 2026
c3749ff
fix(acp): resume session after cancel drain timeout
neubig May 25, 2026
6246d1e
Address ACP async review races
May 25, 2026
682fad9
Clarify ACP queued-message cleanup fixes
May 25, 2026
741399e
Fix ACP resume cursor after cancellation
May 25, 2026
89a68ff
Treat ACP prompt timeout as idle timeout
May 25, 2026
81b4713
Restore hard ACP prompt timeout
May 25, 2026
70e44e2
Fix ACP interrupt cursor races
May 25, 2026
ff3b45b
Format EventService ACP rerun logic
May 25, 2026
af1ac3f
Use reassignment-safe ACP cursor state updates
May 25, 2026
c988567
Fix remaining ACP async race reviews
May 25, 2026
9d7d2be
fix: address ACP async review feedback
openhands-agent May 26, 2026
1deacf2
fix: handle ACP double cancellation
openhands-agent May 26, 2026
91d11a8
fix: close ACP cancellation restart races
openhands-agent May 26, 2026
eb7bed3
fix: harden ACP cleanup interrupts
openhands-agent May 26, 2026
80dc346
fix: guard ACP restart ordering
openhands-agent May 26, 2026
24ee53f
fix: address ACP review nits
openhands-agent May 26, 2026
8b59878
fix: preserve explicit ACP pauses
openhands-agent May 26, 2026
8fb9d70
fix: quarantine ACP cancel timeouts
openhands-agent May 26, 2026
0418ccf
fix: guard ACP queued reruns
openhands-agent May 26, 2026
da40f36
fix: track async runs before initialization
openhands-agent May 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 179 additions & 39 deletions openhands-agent-server/openhands/agent_server/event_service.py

Large diffs are not rendered by default.

385 changes: 328 additions & 57 deletions openhands-sdk/openhands/sdk/agent/acp_agent.py

Large diffs are not rendered by default.

303 changes: 294 additions & 9 deletions openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py

Large diffs are not rendered by default.

287 changes: 287 additions & 0 deletions tests/agent_server/test_event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@
)
from openhands.agent_server.pub_sub import Subscriber
from openhands.sdk import LLM, Agent, Conversation, Message
from openhands.sdk.agent import ACPAgent
from openhands.sdk.conversation.fifo_lock import FIFOLock
from openhands.sdk.conversation.impl.local_conversation import (
ACP_INFLIGHT_PROMPT_USER_MESSAGE_ID,
ACP_SUPERSEDE_INFLIGHT_PROMPT,
LocalConversation,
)
from openhands.sdk.conversation.state import (
ConversationExecutionStatus,
ConversationState,
Expand Down Expand Up @@ -825,6 +831,287 @@ async def test_send_message_with_run_true_agent_idle(self, event_service):
# Verify run was called since agent was idle
conversation.run.assert_called_once()

@pytest.mark.asyncio
async def test_send_message_with_run_true_interrupts_running_acp_turn(
self, event_service, tmp_path
):
"""A new user message should interrupt an in-flight ACP prompt."""
agent = ACPAgent(acp_command=["echo", "test"])
conversation = LocalConversation(
agent=agent,
workspace=str(tmp_path),
max_iteration_per_run=4,
stuck_detection=False,
)
conversation.send_message("initial request")
event_service._conversation = conversation
event_service._publish_state_update = AsyncMock()

first_step_started = asyncio.Event()
first_step_cancelled = asyncio.Event()
second_step_seen = asyncio.Event()
prompts_seen: list[str] = []

def user_text(event: MessageEvent | None) -> str:
assert event is not None
content = event.llm_message.content[0]
assert isinstance(content, TextContent)
return content.text

async def blocking_astep(
self, # noqa: ARG001
conv: LocalConversation, # noqa: ARG001
on_event, # noqa: ARG001
on_token=None, # noqa: ARG001
prompt_message: MessageEvent | None = None,
) -> None:
prompts_seen.append(user_text(prompt_message))
if len(prompts_seen) == 1:
first_step_started.set()
try:
await asyncio.Event().wait()
except asyncio.CancelledError:
first_step_cancelled.set()
raise

second_step_seen.set()
conv.state.execution_status = ConversationExecutionStatus.FINISHED

with (
patch.object(ACPAgent, "init_state", autospec=True),
patch.object(ACPAgent, "astep", new=blocking_astep),
):
try:
await event_service.run()
await asyncio.wait_for(first_step_started.wait(), timeout=1.0)

await event_service.send_message(
Message(role="user", content=[TextContent(text="intervening")]),
run=True,
)

await asyncio.wait_for(first_step_cancelled.wait(), timeout=1.0)
await asyncio.wait_for(second_step_seen.wait(), timeout=1.0)
finally:
if (
event_service._run_task is not None
and not event_service._run_task.done()
):
conversation.interrupt()
with suppress(asyncio.CancelledError, TimeoutError):
await asyncio.wait_for(event_service._run_task, timeout=1.0)

assert prompts_seen == ["initial request", "intervening"]

@pytest.mark.asyncio
async def test_send_message_with_run_true_does_not_interrupt_current_acp_prompt(
self, event_service, tmp_path
):
"""Do not cancel the ACP prompt if it already advanced to the new message."""
agent = ACPAgent(acp_command=["echo", "test"])
conversation = LocalConversation(
agent=agent,
workspace=str(tmp_path),
max_iteration_per_run=4,
stuck_detection=False,
)
conversation.send_message("initial request")
conversation.state.execution_status = ConversationExecutionStatus.RUNNING
event_service._conversation = conversation
event_service._publish_state_update = AsyncMock()

release_run = asyncio.Event()
event_service._run_task = asyncio.create_task(release_run.wait())
original_send_message = conversation.send_message

def send_and_mark_active_prompt(message):
original_send_message(message)
conversation.state.execution_status = ConversationExecutionStatus.RUNNING
conversation.state.agent_state = {
**conversation.state.agent_state,
ACP_INFLIGHT_PROMPT_USER_MESSAGE_ID: (
conversation.state.last_user_message_id
),
}

conversation.send_message = send_and_mark_active_prompt # type: ignore[method-assign]
conversation.interrupt = MagicMock() # type: ignore[method-assign]

try:
await event_service.send_message(
Message(role="user", content=[TextContent(text="intervening")]),
run=True,
)
finally:
release_run.set()
await event_service._run_task
event_service._run_task = None

conversation.interrupt.assert_not_called()
assert event_service._rerun_requested is False

@pytest.mark.asyncio
async def test_acp_supersede_mark_rechecks_current_prompt(
self, event_service, tmp_path
):
"""Do not attach the supersede marker to a replacement ACP prompt."""
agent = ACPAgent(acp_command=["echo", "test"])
conversation = LocalConversation(
agent=agent,
workspace=str(tmp_path),
max_iteration_per_run=4,
stuck_detection=False,
)
conversation.send_message("initial request")
conversation.send_message("replacement request")
latest_user_message_id = conversation.state.last_user_message_id
assert latest_user_message_id is not None
conversation.state.execution_status = ConversationExecutionStatus.RUNNING
conversation.state.agent_state = {
**conversation.state.agent_state,
ACP_INFLIGHT_PROMPT_USER_MESSAGE_ID: latest_user_message_id,
}
event_service._conversation = conversation
release_run = asyncio.Event()
event_service._run_task = asyncio.create_task(release_run.wait())

try:
(
marked,
active_prompt_has_latest,
) = await event_service._mark_running_acp_prompt_superseded()
finally:
release_run.set()
await event_service._run_task
event_service._run_task = None

assert marked is False
assert active_prompt_has_latest is True
assert ACP_SUPERSEDE_INFLIGHT_PROMPT not in conversation.state.agent_state

@pytest.mark.asyncio
async def test_explicit_interrupt_clears_internal_acp_rerun_request(
self, event_service
):
"""A later explicit stop should win over an earlier internal ACP rerun."""
conversation = MagicMock()
event_service._conversation = conversation
event_service._publish_state_update = AsyncMock()
event_service._rerun_requested = True
event_service._acp_internal_rerun_requested = True

await event_service.interrupt()

conversation.interrupt.assert_called_once()
assert event_service._rerun_requested is False
assert event_service._acp_internal_rerun_requested is False

@pytest.mark.asyncio
async def test_internal_acp_rerun_does_not_override_explicit_interrupt(
self, event_service
):
"""Explicit Stop/Pause should win while an internal ACP interrupt drains."""
conversation = MagicMock()
conversation.send_message = MagicMock()
event_service._conversation = conversation
event_service._mark_running_acp_prompt_superseded = AsyncMock(
return_value=(True, False)
)
event_service.run = AsyncMock()

async def interrupt_and_simulate_user_stop(internal_acp_rerun=False):
assert internal_acp_rerun is True
event_service._explicit_interrupt_generation += 1
event_service._rerun_requested = False
event_service._acp_internal_rerun_requested = False

event_service.interrupt = interrupt_and_simulate_user_stop

await event_service.send_message(Message(role="user", content=[]), run=True)

event_service.run.assert_not_awaited()
assert event_service._rerun_requested is False
assert event_service._acp_internal_rerun_requested is False

@pytest.mark.asyncio
async def test_internal_acp_send_message_restart_rechecks_generation_in_run(
self, event_service, tmp_path
):
"""A late explicit Stop/Pause should prevent direct ACP restart."""
agent = ACPAgent(acp_command=["echo", "test"])
conversation = LocalConversation(
agent=agent,
workspace=str(tmp_path),
max_iteration_per_run=3,
stuck_detection=False,
)
mock_arun = AsyncMock()
event_service._conversation = conversation
event_service._publish_state_update = AsyncMock()
event_service._mark_running_acp_prompt_superseded = AsyncMock(
return_value=(True, False)
)
event_service.interrupt = AsyncMock()

async def status_with_late_explicit_interrupt():
event_service._explicit_interrupt_generation += 1
event_service._rerun_requested = False
event_service._acp_internal_rerun_requested = False
return ConversationExecutionStatus.PAUSED

event_service._get_execution_status = status_with_late_explicit_interrupt

with patch.object(conversation, "arun", mock_arun):
await event_service.send_message(Message(role="user", content=[]), run=True)

event_service.interrupt.assert_awaited_once_with(internal_acp_rerun=True)
mock_arun.assert_not_awaited()
assert event_service._run_task is None
assert event_service._rerun_requested is False
assert event_service._acp_internal_rerun_requested is False

@pytest.mark.asyncio
async def test_internal_acp_rerun_rechecks_explicit_interrupt_before_restart(
self, event_service, tmp_path
):
"""Explicit Stop/Pause should win during final restart status checks."""
agent = ACPAgent(acp_command=["echo", "test"])
conversation = LocalConversation(
agent=agent,
workspace=str(tmp_path),
max_iteration_per_run=3,
stuck_detection=False,
)
mock_arun = AsyncMock()
event_service._conversation = conversation
event_service._publish_state_update = AsyncMock()
event_service._rerun_requested = True
event_service._acp_internal_rerun_requested = True

status_calls = 0

async def status_with_late_explicit_interrupt():
nonlocal status_calls
status_calls += 1
if status_calls == 1:
return ConversationExecutionStatus.IDLE
event_service._explicit_interrupt_generation += 1
event_service._rerun_requested = False
event_service._acp_internal_rerun_requested = False
return ConversationExecutionStatus.PAUSED

event_service._get_execution_status = status_with_late_explicit_interrupt

with patch.object(conversation, "arun", mock_arun):
await event_service.run()
assert event_service._run_task is not None
await asyncio.wait_for(event_service._run_task, timeout=1.0)

mock_arun.assert_awaited_once()
assert status_calls == 2
assert event_service._rerun_requested is False
assert event_service._acp_internal_rerun_requested is False

@pytest.mark.asyncio
async def test_send_message_with_run_true_logs_exception(self, event_service):
"""Test that exceptions from conversation.run() are caught and logged."""
Expand Down
60 changes: 59 additions & 1 deletion tests/agent_server/test_event_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from openhands.agent_server.models import StoredConversation
from openhands.agent_server.pub_sub import Subscriber
from openhands.sdk import Event
from openhands.sdk.agent import Agent
from openhands.sdk.agent import ACPAgent, Agent
from openhands.sdk.event import StreamingDeltaEvent
from openhands.sdk.llm import LLM
from openhands.sdk.workspace import LocalWorkspace
Expand Down Expand Up @@ -218,6 +218,64 @@ async def test_token_callbacks_not_wired_when_stream_disabled(tmp_path):
assert MockConv.call_args.kwargs["token_callbacks"] == []


@pytest.mark.asyncio
async def test_acp_agents_wire_token_callback_without_llm_streaming(tmp_path):
"""ACP AgentMessageChunk text should stream even though ACPAgent has no LLM."""
service = EventService(
stored=StoredConversation(
id=uuid4(),
agent=ACPAgent(acp_command=["echo", "test"]),
workspace=LocalWorkspace(working_dir=str(tmp_path / "workspace")),
),
conversations_dir=tmp_path / "conversations",
)
(tmp_path / "workspace").mkdir(exist_ok=True)

with _mock_local_conversation() as MockConv:
mock_conv = MagicMock()
mock_conv.state = MagicMock(execution_status="idle")
mock_conv._state = MagicMock()
mock_conv._on_event = MagicMock()
MockConv.return_value = mock_conv

await service.start()
assert len(MockConv.call_args.kwargs["token_callbacks"]) == 1


@pytest.mark.asyncio
async def test_acp_string_token_callback_publishes_delta(tmp_path):
"""ACPAgent invokes token callbacks with plain text chunks."""
service = EventService(
stored=StoredConversation(
id=uuid4(),
agent=ACPAgent(acp_command=["echo", "test"]),
workspace=LocalWorkspace(working_dir=str(tmp_path / "workspace")),
),
conversations_dir=tmp_path / "conversations",
)
collector = _CollectorSubscriber()
service._pub_sub.subscribe(collector)
(tmp_path / "workspace").mkdir(exist_ok=True)

with _mock_local_conversation() as MockConv:
mock_conv = MagicMock()
mock_conv.state = MagicMock(execution_status="idle")
mock_conv._state = MagicMock()
mock_conv._on_event = MagicMock()
MockConv.return_value = mock_conv

await service.start()
callback = MockConv.call_args.kwargs["token_callbacks"][0]

callback("ACP live text")
await asyncio.sleep(0.05)

delta_events = [e for e in collector.events if isinstance(e, StreamingDeltaEvent)]
assert len(delta_events) == 1
assert delta_events[0].content == "ACP live text"
assert delta_events[0].reasoning_content is None


@pytest.mark.asyncio
async def test_multiple_chunks_produce_multiple_events(event_service, tmp_path):
collector = _CollectorSubscriber()
Expand Down
Loading
Loading