Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions openhands-agent-server/openhands/agent_server/event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,8 +740,9 @@ async def run(self):
immediately. When possible, the conversation is driven via its native
``arun()`` coroutine so LLM I/O does not tie up a thread-pool worker.
For conversations that do not expose ``arun()`` (e.g., custom
subclasses), the synchronous ``run()`` is executed in the thread pool as
before.
subclasses) or whose agent only implements sync ``step()`` (no
``astep()`` override), the synchronous ``run()`` is executed
in the thread pool as before.

Raises:
ValueError: If the service is inactive or conversation is already running.
Expand Down Expand Up @@ -773,19 +774,23 @@ async def _run_and_publish():
# loop is free during LLM I/O. Fall back to thread-pool
# execution for backward compatibility.
#
# Both guards are required:
# All guards are required:
# • iscoroutinefunction – filters out non-async objects
# (e.g. MagicMock in tests).
# • override check – BaseConversation defines a default
# ``async def arun()`` that delegates to sync ``run()``,
# so iscoroutinefunction alone is always True for real
# subclasses. We detect an *actual* override to avoid
# running a sync-only subclass on the event loop.
# • conversation override – BaseConversation's default
# ``arun()`` delegates to sync ``run()``, so we require an
# *actual* override to avoid running a sync-only subclass
# on the event loop.
# • agent override – ``LocalConversation`` always overrides
# ``arun()``, but an agent without an ``astep()`` override
# runs sync ``step()`` in a worker thread; route it
# through sync ``run()`` instead.
arun = getattr(conversation, "arun", None)
has_native_arun = (
arun is not None
and asyncio.iscoroutinefunction(arun)
and type(conversation).arun is not BaseConversation.arun
and type(conversation.agent).astep is not AgentBase.astep
Comment thread
VascoSch92 marked this conversation as resolved.
)
if has_native_arun:
await conversation.arun()
Expand Down
72 changes: 71 additions & 1 deletion tests/agent_server/test_event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
StoredConversation,
)
from openhands.agent_server.pub_sub import Subscriber
from openhands.sdk import LLM, Agent, Conversation, Message
from openhands.sdk import LLM, Agent, AgentBase, Conversation, Message
from openhands.sdk.conversation.fifo_lock import FIFOLock
from openhands.sdk.conversation.state import (
ConversationExecutionStatus,
Expand Down Expand Up @@ -2024,6 +2024,17 @@ def hold_lock_like_run_loop():
)


class _SyncOnlyAgent(AgentBase):
"""Agent that only implements sync step() (no astep override).

Defined at module level (not inside a test) because ``AgentBase`` is a
discriminated-union member and local classes cannot be registered.
"""

def step(self, conversation, on_event, on_token=None):
pass


class TestEventServiceClose:
"""Tests for EventService.close() awaiting conversation teardown."""

Expand Down Expand Up @@ -2255,6 +2266,65 @@ def fork(self, **kwargs):
"run() executed on the event loop thread — expected thread-pool"
)

async def test_run_uses_executor_for_sync_only_agent(self, event_service):
"""EventService.run() must use the thread-pool executor when the
agent only implements sync step() (no astep() override), even if the
conversation overrides arun(). ``LocalConversation`` always overrides
arun(), so the conversation-level guard alone would route sync-only
custom agents through the native async path, running their sync
step() in a worker thread while arun() holds the state lock on the
event-loop thread (B5)."""
from openhands.sdk.conversation.base import BaseConversation

run_called = False
arun_called = False
agent = _SyncOnlyAgent(llm=LLM(model="gpt-4o", usage_id="sync-only"))

# Stand-in conversation that overrides arun() (like LocalConversation)
# but wraps a sync-only agent. Only the dispatch-relevant members are
# implemented.
class AsyncConvSyncAgent:
def __init__(self):
self.agent = agent

async def arun(self):
nonlocal arun_called
arun_called = True

def run(self):
nonlocal run_called
run_called = True

conv = AsyncConvSyncAgent()
event_service._conversation = conv # type: ignore[assignment]

# Sanity: conversation overrides arun() but the agent inherits the
# default astep(), so the native async path must NOT be taken.
assert type(conv).arun is not BaseConversation.arun
assert type(conv.agent).astep is AgentBase.astep

with (
patch.object(
type(event_service),
"_get_execution_status",
new_callable=AsyncMock,
return_value=ConversationExecutionStatus.PAUSED,
),
patch.object(
type(event_service),
"_publish_state_update",
new_callable=AsyncMock,
),
):
await event_service.run()
# Give the background task a moment to execute
await asyncio.sleep(0.3)

assert run_called, "sync run() was never called"
Comment thread
VascoSch92 marked this conversation as resolved.
assert not arun_called, (
"arun() was used for a sync-only agent — expected sync run()"
)


@pytest_asyncio.fixture
async def real_conversation_service(tmp_path):
Expand Down
Loading