From 23c3ea96276ad7d5b9c4cf8836b1264782811cc7 Mon Sep 17 00:00:00 2001 From: VascoSch92 Date: Wed, 27 May 2026 16:13:56 +0200 Subject: [PATCH 1/2] fix(agent-server): route sync-only agents through sync run() instead of threaded astep --- .../openhands/agent_server/event_service.py | 21 +++--- tests/agent_server/test_event_service.py | 72 ++++++++++++++++++- 2 files changed, 84 insertions(+), 9 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 4ec523f031..c9d6942273 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -740,8 +740,9 @@ async def run(self): immediately. When possible, the conversation is driven via its native ``arun()`` coroutine so LLM I/O does not tie up a thread-pool worker. For conversations that do not expose ``arun()`` (e.g., custom - subclasses), the synchronous ``run()`` is executed in the thread pool as - before. + subclasses) or whose agent only implements sync ``step()`` (no + ``astep()`` override, e.g. ACPAgent), the synchronous ``run()`` is + executed in the thread pool as before. Raises: ValueError: If the service is inactive or conversation is already running. @@ -773,19 +774,23 @@ async def _run_and_publish(): # loop is free during LLM I/O. Fall back to thread-pool # execution for backward compatibility. # - # Both guards are required: + # All guards are required: # • iscoroutinefunction – filters out non-async objects # (e.g. MagicMock in tests). - # • override check – BaseConversation defines a default - # ``async def arun()`` that delegates to sync ``run()``, - # so iscoroutinefunction alone is always True for real - # subclasses. We detect an *actual* override to avoid - # running a sync-only subclass on the event loop. + # • conversation override – BaseConversation's default + # ``arun()`` delegates to sync ``run()``, so we require an + # *actual* override to avoid running a sync-only subclass + # on the event loop. + # • agent override – ``LocalConversation`` always overrides + # ``arun()``, but an agent without an ``astep()`` override + # (e.g. ACPAgent) runs sync ``step()`` in a worker thread; + # route it through sync ``run()`` instead. arun = getattr(conversation, "arun", None) has_native_arun = ( arun is not None and asyncio.iscoroutinefunction(arun) and type(conversation).arun is not BaseConversation.arun + and type(conversation.agent).astep is not AgentBase.astep ) if has_native_arun: await conversation.arun() diff --git a/tests/agent_server/test_event_service.py b/tests/agent_server/test_event_service.py index 7dc415bd8b..af330fbe83 100644 --- a/tests/agent_server/test_event_service.py +++ b/tests/agent_server/test_event_service.py @@ -22,7 +22,7 @@ StoredConversation, ) from openhands.agent_server.pub_sub import Subscriber -from openhands.sdk import LLM, Agent, Conversation, Message +from openhands.sdk import LLM, Agent, AgentBase, Conversation, Message from openhands.sdk.conversation.fifo_lock import FIFOLock from openhands.sdk.conversation.state import ( ConversationExecutionStatus, @@ -2024,6 +2024,17 @@ def hold_lock_like_run_loop(): ) +class _SyncOnlyAgent(AgentBase): + """Agent that only implements sync step() (no astep override). + + Defined at module level (not inside a test) because ``AgentBase`` is a + discriminated-union member and local classes cannot be registered. + """ + + def step(self, conversation, on_event, on_token=None): + pass + + class TestEventServiceClose: """Tests for EventService.close() awaiting conversation teardown.""" @@ -2255,6 +2266,65 @@ def fork(self, **kwargs): "run() executed on the event loop thread — expected thread-pool" ) + async def test_run_uses_executor_for_sync_only_agent(self, event_service): + """EventService.run() must use the thread-pool executor when the + agent only implements sync step() (no astep() override), even if the + conversation overrides arun(). ``LocalConversation`` always overrides + arun(), so the conversation-level guard alone would route ACP/custom + agents through the native async path, running their sync step() in a + worker thread while arun() holds the state lock on the event-loop + thread (B5).""" + from openhands.sdk.conversation.base import BaseConversation + + run_called = False + arun_called = False + agent = _SyncOnlyAgent(llm=LLM(model="gpt-4o", usage_id="sync-only")) + + # Stand-in conversation that overrides arun() (like LocalConversation) + # but wraps a sync-only agent. Only the dispatch-relevant members are + # implemented. + class AsyncConvSyncAgent: + def __init__(self): + self.agent = agent + + async def arun(self): + nonlocal arun_called + arun_called = True + + def run(self): + nonlocal run_called + run_called = True + + conv = AsyncConvSyncAgent() + event_service._conversation = conv # type: ignore[assignment] + + # Sanity: conversation overrides arun() but the agent inherits the + # default astep(), so the native async path must NOT be taken. + assert type(conv).arun is not BaseConversation.arun + assert type(conv.agent).astep is AgentBase.astep + + with ( + patch.object( + type(event_service), + "_get_execution_status", + new_callable=AsyncMock, + return_value=ConversationExecutionStatus.PAUSED, + ), + patch.object( + type(event_service), + "_publish_state_update", + new_callable=AsyncMock, + ), + ): + await event_service.run() + # Give the background task a moment to execute + await asyncio.sleep(0.3) + + assert run_called, "sync run() was never called" + assert not arun_called, ( + "arun() was used for a sync-only agent — expected sync run()" + ) + @pytest_asyncio.fixture async def real_conversation_service(tmp_path): From 7729d456d004011fe9b8da886a642a8787303649 Mon Sep 17 00:00:00 2001 From: openhands Date: Wed, 27 May 2026 15:42:17 +0000 Subject: [PATCH 2/2] fix: remove stale ACPAgent example from sync-only agent comment Co-authored-by: openhands --- .../openhands/agent_server/event_service.py | 8 ++++---- tests/agent_server/test_event_service.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index c9d6942273..4c8c674097 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -741,8 +741,8 @@ async def run(self): ``arun()`` coroutine so LLM I/O does not tie up a thread-pool worker. For conversations that do not expose ``arun()`` (e.g., custom subclasses) or whose agent only implements sync ``step()`` (no - ``astep()`` override, e.g. ACPAgent), the synchronous ``run()`` is - executed in the thread pool as before. + ``astep()`` override), the synchronous ``run()`` is executed + in the thread pool as before. Raises: ValueError: If the service is inactive or conversation is already running. @@ -783,8 +783,8 @@ async def _run_and_publish(): # on the event loop. # • agent override – ``LocalConversation`` always overrides # ``arun()``, but an agent without an ``astep()`` override - # (e.g. ACPAgent) runs sync ``step()`` in a worker thread; - # route it through sync ``run()`` instead. + # runs sync ``step()`` in a worker thread; route it + # through sync ``run()`` instead. arun = getattr(conversation, "arun", None) has_native_arun = ( arun is not None diff --git a/tests/agent_server/test_event_service.py b/tests/agent_server/test_event_service.py index af330fbe83..3b432a4147 100644 --- a/tests/agent_server/test_event_service.py +++ b/tests/agent_server/test_event_service.py @@ -2270,10 +2270,10 @@ async def test_run_uses_executor_for_sync_only_agent(self, event_service): """EventService.run() must use the thread-pool executor when the agent only implements sync step() (no astep() override), even if the conversation overrides arun(). ``LocalConversation`` always overrides - arun(), so the conversation-level guard alone would route ACP/custom - agents through the native async path, running their sync step() in a - worker thread while arun() holds the state lock on the event-loop - thread (B5).""" + arun(), so the conversation-level guard alone would route sync-only + custom agents through the native async path, running their sync + step() in a worker thread while arun() holds the state lock on the + event-loop thread (B5).""" from openhands.sdk.conversation.base import BaseConversation run_called = False