From 4b7ff96bd55b7d8fd5281301a5d5dbb961a50735 Mon Sep 17 00:00:00 2001 From: openhands Date: Wed, 27 May 2026 14:13:59 +0000 Subject: [PATCH 1/3] feat(agent-server): add docker runtime mode for per-conversation containers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When Config.conversation_runtime == 'docker', every conversation runs in its own Docker container hosting another agent-server (in local mode). The outer agent-server acts as a thin reverse proxy in front of the per-conversation containers: * POST /api/conversations spawns a fresh container, mints a session key, and forwards the create request. * All other /api/conversations/{cid}/... HTTP routes — including /run, /pause, /events/..., /workspace/..., the trajectory download, secrets, etc. — are forwarded verbatim to the matching container via a catch-all proxy route. * The /sockets/events/{cid} WebSocket is bridged to the inner container with the same session key. * DELETE /api/conversations/{cid} proxies the delete and then stops the container. * GET /api/conversations, /search and /count fan out across the registered containers. Default behavior (conversation_runtime == 'local') is unchanged. No container pre-warming, no pools: each conversation gets a fresh container at first use and an in-memory registry tracks the host port + session key. Implementation lives entirely in docker_runtime/; api.py only learns how to install the routers and start/stop the container manager. Co-authored-by: openhands --- .../openhands/agent_server/api.py | 63 ++- .../openhands/agent_server/config.py | 58 ++- .../agent_server/docker_runtime/__init__.py | 26 ++ .../docker_runtime/container_manager.py | 328 +++++++++++++++ .../agent_server/docker_runtime/proxy.py | 232 +++++++++++ .../agent_server/docker_runtime/routers.py | 378 ++++++++++++++++++ tests/agent_server/docker_runtime/__init__.py | 0 .../docker_runtime/test_container_manager.py | 246 ++++++++++++ .../docker_runtime/test_docker_routers.py | 313 +++++++++++++++ 9 files changed, 1629 insertions(+), 15 deletions(-) create mode 100644 openhands-agent-server/openhands/agent_server/docker_runtime/__init__.py create mode 100644 openhands-agent-server/openhands/agent_server/docker_runtime/container_manager.py create mode 100644 openhands-agent-server/openhands/agent_server/docker_runtime/proxy.py create mode 100644 openhands-agent-server/openhands/agent_server/docker_runtime/routers.py create mode 100644 tests/agent_server/docker_runtime/__init__.py create mode 100644 tests/agent_server/docker_runtime/test_container_manager.py create mode 100644 tests/agent_server/docker_runtime/test_docker_routers.py diff --git a/openhands-agent-server/openhands/agent_server/api.py b/openhands-agent-server/openhands/agent_server/api.py index 2040ae84c0..a7e6102dab 100644 --- a/openhands-agent-server/openhands/agent_server/api.py +++ b/openhands-agent-server/openhands/agent_server/api.py @@ -34,6 +34,11 @@ ) from openhands.agent_server.desktop_router import desktop_router from openhands.agent_server.desktop_service import get_desktop_service +from openhands.agent_server.docker_runtime.container_manager import ContainerManager +from openhands.agent_server.docker_runtime.routers import ( + docker_conversation_router, + docker_sockets_router, +) from openhands.agent_server.event_router import event_router from openhands.agent_server.file_router import file_router from openhands.agent_server.git_router import git_router @@ -203,9 +208,25 @@ async def start_tool_preload_service(): config.bash_events_retention_seconds, ) + # Docker runtime: install the per-conversation container manager. + # The proxy routers each construct their own short-lived + # ``httpx.AsyncClient`` per request, so there is no shared client + # to plumb in here. The in-process conversation_service stays + # live as well — in docker mode it just isn't routed to. + container_manager: ContainerManager | None = None + if config.conversation_runtime == "docker": + container_manager = ContainerManager(config) + api.state.container_manager = container_manager + logger.info( + "Docker conversation runtime enabled (image=%s)", + config.conversation_image, + ) + try: yield finally: + if container_manager is not None: + await container_manager.shutdown() if retention_task is not None: retention_task.cancel() with suppress(asyncio.CancelledError): @@ -298,19 +319,27 @@ def _add_api_routes(app: FastAPI, config: Config) -> None: dependencies.append(Depends(create_session_api_key_dependency(config))) api_router = APIRouter(prefix="/api", dependencies=dependencies) - api_router.include_router(event_router) - api_router.include_router(conversation_router) - api_router.include_router(conversation_router_acp) - api_router.include_router(tool_router) - api_router.include_router(bash_router) - api_router.include_router(git_router) - api_router.include_router(file_router) - api_router.include_router(vscode_router) - api_router.include_router(desktop_router) - api_router.include_router(skills_router) - api_router.include_router(hooks_router) + + if config.conversation_runtime == "docker": + # Docker mode: conversation/event/workspace traffic is reverse-proxied + # to a per-conversation container. The non-conversation routers + # (settings, profiles, workspaces, auth, ...) still run in-process on + # the outer server. + api_router.include_router(docker_conversation_router) + else: + api_router.include_router(event_router) + api_router.include_router(conversation_router) + api_router.include_router(conversation_router_acp) + api_router.include_router(tool_router) + api_router.include_router(bash_router) + api_router.include_router(git_router) + api_router.include_router(file_router) + api_router.include_router(vscode_router) + api_router.include_router(desktop_router) + api_router.include_router(skills_router) + api_router.include_router(hooks_router) + api_router.include_router(mcp_router) api_router.include_router(llm_router) - api_router.include_router(mcp_router) api_router.include_router(settings_router) api_router.include_router(workspaces_router) api_router.include_router(profiles_router) @@ -331,10 +360,16 @@ def _add_api_routes(app: FastAPI, config: Config) -> None: Depends(create_workspace_session_dependency(config)) ) workspace_api_router = APIRouter(prefix="/api", dependencies=workspace_dependencies) - workspace_api_router.include_router(workspace_router) + if config.conversation_runtime == "local": + # In docker mode workspace static files are served by the generic + # ``/api/conversations/{cid}/workspace/...`` proxy registered above. + workspace_api_router.include_router(workspace_router) app.include_router(workspace_api_router) - app.include_router(sockets_router) + if config.conversation_runtime == "docker": + app.include_router(docker_sockets_router) + else: + app.include_router(sockets_router) def _setup_static_files(app: FastAPI, config: Config) -> None: diff --git a/openhands-agent-server/openhands/agent_server/config.py b/openhands-agent-server/openhands/agent_server/config.py index b76a18f460..5a43ba998f 100644 --- a/openhands-agent-server/openhands/agent_server/config.py +++ b/openhands-agent-server/openhands/agent_server/config.py @@ -1,7 +1,7 @@ import logging import os from pathlib import Path -from typing import ClassVar +from typing import ClassVar, Literal from pydantic import BaseModel, ConfigDict, Field, SecretStr @@ -212,6 +212,62 @@ class Config(BaseModel): "The URL where this agent server instance is available externally" ), ) + + # ---- Docker runtime mode ----------------------------------------------- + # When ``conversation_runtime == "docker"``, every conversation runs in + # its own Docker container hosting another agent-server. This outer + # agent-server then acts as a thin reverse proxy in front of the per- + # conversation containers. See ``docker_runtime/`` for the implementation. + conversation_runtime: Literal["local", "docker"] = Field( + default="local", + description=( + "How to host conversations. ``local`` runs each conversation " + "in-process on this server (the default; current behavior). " + "``docker`` spawns a fresh Docker container running an " + "agent-server image per conversation and proxies " + "conversation-scoped HTTP and WebSocket traffic to it." + ), + ) + conversation_image: str = Field( + default="ghcr.io/openhands/agent-server:latest-python", + description=( + "Container image used to host each conversation when " + "``conversation_runtime == 'docker'``. Ignored otherwise." + ), + ) + conversation_container_network: str | None = Field( + default=None, + description=( + "Optional Docker network to attach per-conversation containers to." + ), + ) + conversation_container_volumes: list[str] = Field( + default_factory=list, + description=( + "Additional ``-v`` volume mounts to apply to every per-conversation " + "container (e.g. ``'/host/cache:/cache'``). Ignored in local mode." + ), + ) + conversation_container_forward_env: list[str] = Field( + default_factory=lambda: ["DEBUG"], + description=( + "Environment variable names to forward from this server's " + "environment into every per-conversation container." + ), + ) + conversation_container_platform: str = Field( + default="linux/amd64", + description="``--platform`` flag passed to ``docker run``.", + ) + conversation_container_startup_timeout: float = Field( + default=120.0, + gt=0.0, + description=( + "Seconds to wait for a freshly-spawned conversation container " + "to pass its /health check before giving up." + ), + ) + model_config: ClassVar[ConfigDict] = {"frozen": True} @property diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/__init__.py b/openhands-agent-server/openhands/agent_server/docker_runtime/__init__.py new file mode 100644 index 0000000000..f938053c8d --- /dev/null +++ b/openhands-agent-server/openhands/agent_server/docker_runtime/__init__.py @@ -0,0 +1,26 @@ +"""Docker-runtime mode for the agent-server. + +When ``Config.conversation_runtime == "docker"`` the outer agent-server stops +running conversations in-process and instead spawns a Docker container per +conversation. Each container hosts its own agent-server (configured in +``local`` mode), and this outer server acts as a thin reverse proxy in front +of those containers. + +Submodules: + +* :mod:`.container_manager` — spawns / tracks / stops per-conversation + containers. Wraps ``docker run`` via subprocess. +* :mod:`.proxy` — low-level HTTP and WebSocket forwarding helpers that + stream bytes between the outer server and the appropriate container. +* :mod:`.routers` — FastAPI routers that replace the in-process + ``conversation_router``/``event_router``/``workspace_router``/``sockets_router`` + when docker mode is active. +""" + +from openhands.agent_server.docker_runtime.container_manager import ( + ContainerManager, + RunningContainer, +) + + +__all__ = ["ContainerManager", "RunningContainer"] diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/container_manager.py b/openhands-agent-server/openhands/agent_server/docker_runtime/container_manager.py new file mode 100644 index 0000000000..311f3180f0 --- /dev/null +++ b/openhands-agent-server/openhands/agent_server/docker_runtime/container_manager.py @@ -0,0 +1,328 @@ +"""Spawn and track per-conversation Docker containers. + +This is a deliberately small module: it owns the lifecycle of the inner +agent-server containers (one per conversation), the in-memory map from +``conversation_id`` to the container's local URL + session key, and the +``docker`` CLI calls needed to make that work. Everything else (HTTP +proxying, WebSocket bridging, request validation) lives in sibling modules. + +The shape of ``docker run`` invoked here mirrors what +:class:`openhands.workspace.docker.workspace.DockerWorkspace` does in the SDK, +just adapted to live inside the agent-server. +""" + +from __future__ import annotations + +import asyncio +import random +import secrets +import socket +import subprocess +import time +import uuid +from dataclasses import dataclass, field +from urllib.request import urlopen +from uuid import UUID + +from openhands.agent_server.config import Config +from openhands.sdk.logger import get_logger + + +logger = get_logger(__name__) + +# Port range to allocate host ports from for inner-container forwards. +# Mirrors DockerWorkspace's range so the two implementations don't fight. +_PORT_MIN = 30000 +_PORT_MAX = 39999 +_PORT_MAX_ATTEMPTS = 50 + + +def _check_port_available(port: int) -> bool: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sock.bind(("0.0.0.0", port)) + return True + except OSError: + return False + finally: + sock.close() + + +def _find_available_tcp_port() -> int: + rng = random.SystemRandom() + ports = list(range(_PORT_MIN, _PORT_MAX + 1)) + rng.shuffle(ports) + for port in ports[:_PORT_MAX_ATTEMPTS]: + if _check_port_available(port): + return port + raise RuntimeError( + f"No available TCP port found in [{_PORT_MIN},{_PORT_MAX}] after " + f"{_PORT_MAX_ATTEMPTS} attempts" + ) + + +@dataclass +class RunningContainer: + """Bookkeeping for one running per-conversation agent-server container. + + Attributes: + conversation_id: The conversation that owns this container. + container_id: Docker container id (long form) returned by ``docker run``. + host_port: Host port the container's ``:8000`` is mapped to. + session_api_key: The session API key the inner agent-server was + configured with. The outer server injects this on every proxied + request so the inner container cannot be reached without going + through us (assuming the host port is not exposed externally). + """ + + conversation_id: UUID + container_id: str + host_port: int + session_api_key: str + image: str = "" + # Locking primitive so concurrent requests for the same conversation don't + # race during shutdown. + lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False) + + @property + def base_url(self) -> str: + return f"http://127.0.0.1:{self.host_port}" + + +class DockerUnavailableError(RuntimeError): + """Raised when ``docker`` is not reachable from this process.""" + + +class ContainerStartupError(RuntimeError): + """Raised when a freshly-started container fails to become healthy.""" + + +class ContainerManager: + """Tracks per-conversation Docker containers. + + The manager is in-memory only — restarting the outer agent-server forgets + every running container. That's intentional for a first cut: durable + container <-> conversation tracking is a follow-up concern. + """ + + def __init__( + self, + config: Config, + *, + run_command=subprocess.run, # injectable for tests + sleep=time.sleep, + ) -> None: + self._config = config + self._containers: dict[UUID, RunningContainer] = {} + self._lock = asyncio.Lock() + # Injected so tests can stub out ``docker`` invocations cleanly. + self._run_command = run_command + self._sleep = sleep + + # -- public API -------------------------------------------------------- + + @property + def config(self) -> Config: + return self._config + + def list(self) -> list[RunningContainer]: + return list(self._containers.values()) + + def get(self, conversation_id: UUID) -> RunningContainer | None: + return self._containers.get(conversation_id) + + async def start(self, conversation_id: UUID) -> RunningContainer: + """Spawn a fresh container for ``conversation_id``. + + Idempotent: if a container is already registered for this conversation + the existing :class:`RunningContainer` is returned and no new + container is started. + """ + async with self._lock: + existing = self._containers.get(conversation_id) + if existing is not None: + return existing + + running = await asyncio.to_thread(self._start_blocking, conversation_id) + self._containers[conversation_id] = running + return running + + async def stop(self, conversation_id: UUID) -> bool: + """Stop and forget the container for ``conversation_id``. + + Returns ``True`` if a container was running, ``False`` otherwise. + """ + async with self._lock: + running = self._containers.pop(conversation_id, None) + if running is None: + return False + await asyncio.to_thread(self._stop_blocking, running) + return True + + async def shutdown(self) -> None: + """Stop every tracked container. Best-effort; logs errors and + continues so a single broken container doesn't block the rest.""" + async with self._lock: + containers = list(self._containers.values()) + self._containers.clear() + for running in containers: + try: + await asyncio.to_thread(self._stop_blocking, running) + except Exception: + logger.exception( + "Failed to stop container %s during shutdown", + running.container_id, + ) + + # -- internals --------------------------------------------------------- + + def _start_blocking(self, conversation_id: UUID) -> RunningContainer: + self._ensure_docker_available() + + host_port = _find_available_tcp_port() + session_api_key = secrets.token_urlsafe(32) + container_name = f"oh-conv-{conversation_id.hex}-{uuid.uuid4().hex[:8]}" + image = self._config.conversation_image + + flags: list[str] = [] + for env_name in self._config.conversation_container_forward_env: + value = self._env_for_forward(env_name) + if value is not None: + flags += ["-e", f"{env_name}={value}"] + # Always tell the inner agent-server to require this key. We inject + # it on every proxied request from the outer server. + flags += ["-e", f"OH_SESSION_API_KEYS_0={session_api_key}"] + + for volume in self._config.conversation_container_volumes: + flags += ["-v", volume] + + if self._config.conversation_container_network: + flags += ["--network", self._config.conversation_container_network] + + run_cmd = [ + "docker", + "run", + "-d", + "--platform", + self._config.conversation_container_platform, + "--rm", + "--ulimit", + "nofile=65536:65536", + "--name", + container_name, + "-p", + f"{host_port}:8000", + *flags, + image, + "--host", + "0.0.0.0", + "--port", + "8000", + ] + logger.info( + "Starting conversation container for %s on host port %d", + conversation_id, + host_port, + ) + proc = self._run_command(run_cmd, capture_output=True, text=True, check=False) + if proc.returncode != 0: + raise ContainerStartupError( + f"docker run failed: {proc.stderr.strip() or proc.stdout.strip()}" + ) + container_id = (proc.stdout or "").strip() + if not container_id: + raise ContainerStartupError("docker run returned no container id") + + running = RunningContainer( + conversation_id=conversation_id, + container_id=container_id, + host_port=host_port, + session_api_key=session_api_key, + image=image, + ) + + try: + self._wait_for_health( + running, timeout=self._config.conversation_container_startup_timeout + ) + except Exception: + # Don't leave a stuck container behind. + self._stop_blocking(running) + raise + + logger.info( + "Conversation container ready: id=%s port=%d cid=%s", + container_id[:12], + host_port, + conversation_id, + ) + return running + + def _stop_blocking(self, running: RunningContainer) -> None: + logger.info("Stopping conversation container %s", running.container_id[:12]) + self._run_command( + ["docker", "stop", running.container_id], + capture_output=True, + text=True, + check=False, + ) + + def _ensure_docker_available(self) -> None: + proc = self._run_command( + ["docker", "version"], capture_output=True, text=True, check=False + ) + if proc.returncode != 0: + raise DockerUnavailableError( + "Docker is not available; cannot start conversation containers" + ) + + def _wait_for_health(self, running: RunningContainer, *, timeout: float) -> None: + deadline = time.monotonic() + timeout + health_url = f"{running.base_url}/health" + while time.monotonic() < deadline: + try: + with urlopen(health_url, timeout=1.0) as resp: + if 200 <= getattr(resp, "status", 200) < 300: + return + except Exception: + pass + # Bail out early if the container has already died: avoids + # ticking down the entire timeout when ``docker run`` accepted + # the command but the process inside exited immediately. + inspect = self._run_command( + [ + "docker", + "inspect", + "-f", + "{{.State.Running}}", + running.container_id, + ], + capture_output=True, + text=True, + check=False, + ) + if (inspect.stdout or "").strip() != "true": + logs = self._run_command( + ["docker", "logs", running.container_id], + capture_output=True, + text=True, + check=False, + ) + raise ContainerStartupError( + "Container stopped during startup. Logs:\n" + f"{(logs.stdout or '')}\n{(logs.stderr or '')}" + ) + self._sleep(1) + raise ContainerStartupError( + f"Container {running.container_id[:12]} did not become healthy " + f"within {timeout}s" + ) + + def _env_for_forward(self, name: str) -> str | None: + """Look up an env var value to forward into a container. + + Pulled out so tests can override without monkeypatching ``os.environ``. + """ + import os + + return os.environ.get(name) diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/proxy.py b/openhands-agent-server/openhands/agent_server/docker_runtime/proxy.py new file mode 100644 index 0000000000..9e0d4fc07b --- /dev/null +++ b/openhands-agent-server/openhands/agent_server/docker_runtime/proxy.py @@ -0,0 +1,232 @@ +"""HTTP and WebSocket reverse-proxy helpers used in docker runtime mode. + +Both helpers are deliberately dumb: they stream bytes between the outer +agent-server and an inner per-conversation container, without inspecting +request bodies or response shapes. The auth header for the inner container +is injected here (see ``X-Session-API-Key``) so callers don't have to know. + +These helpers are *only* used by routes in +:mod:`openhands.agent_server.docker_runtime.routers`; nothing outside the +docker runtime needs to know about them. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator + +import httpx +import websockets +from fastapi import HTTPException, status +from starlette.requests import Request +from starlette.responses import StreamingResponse +from starlette.websockets import WebSocket, WebSocketDisconnect + +from openhands.agent_server.docker_runtime.container_manager import RunningContainer +from openhands.sdk.logger import get_logger + + +logger = get_logger(__name__) + +# Hop-by-hop headers (RFC 7230) — must not be forwarded by a proxy. +_HOP_BY_HOP_HEADERS = frozenset( + { + "connection", + "keep-alive", + "proxy-authenticate", + "proxy-authorization", + "te", + "trailer", + "transfer-encoding", + "upgrade", + # ``host`` and ``content-length`` are recomputed by httpx; forwarding + # the original values causes spurious 400s when bodies are re-chunked. + "host", + "content-length", + } +) + +# Stream chunk size for request/response bodies. 64 KiB is the same default +# httpx uses internally; we pin it so behavior is stable across versions. +_CHUNK_SIZE = 64 * 1024 + + +def _filter_headers(headers) -> dict[str, str]: + return {k: v for k, v in headers.items() if k.lower() not in _HOP_BY_HOP_HEADERS} + + +async def proxy_http( + request: Request, + running: RunningContainer, + *, + upstream_path: str, + timeout: float | None = None, +) -> StreamingResponse: + """Forward ``request`` to the per-conversation container. + + Args: + request: Incoming Starlette request on the outer agent-server. + running: Bookkeeping for the target container. + upstream_path: Path (including any query string) on the inner + agent-server to forward to. Typically the same path the outer + server received, since the inner agent-server exposes the same + API surface. + timeout: Per-request timeout in seconds. ``None`` (the default) means + no read timeout — conversation event streams can be long-lived. + + Returns: + A :class:`starlette.responses.StreamingResponse` that streams the + inner container's response body back to the original caller. + + Notes: + A fresh :class:`httpx.AsyncClient` is created per request. We avoid a + long-lived pool because the outer server can serve many concurrent + conversations and each one talks to a different upstream port — and + because making the client per-request keeps the lifespan/teardown + story trivial. If profiling later shows per-request client setup is a + bottleneck we can revisit. + """ + url = running.base_url + upstream_path + headers = _filter_headers(request.headers) + # Inject the per-container session API key so the inner server accepts + # us. We deliberately replace any X-Session-API-Key the *client* sent — + # the outer server has already validated the user's key by the time + # this helper runs (via FastAPI's session_api_key dependency). + headers["X-Session-API-Key"] = running.session_api_key + + async def _request_body() -> AsyncIterator[bytes]: + async for chunk in request.stream(): + if chunk: + yield chunk + + client = httpx.AsyncClient( + timeout=httpx.Timeout(connect=10.0, read=timeout, write=30.0, pool=10.0) + ) + req = client.build_request( + request.method, + url, + headers=headers, + params=None, # query string is already part of upstream_path + content=_request_body(), + ) + + try: + upstream = await client.send(req, stream=True) + except (httpx.ConnectError, httpx.ReadError) as exc: + await client.aclose() + logger.warning( + "Upstream connection error to %s: %s", running.container_id[:12], exc + ) + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"Conversation container unreachable: {exc}", + ) from exc + + async def _response_body() -> AsyncIterator[bytes]: + try: + async for chunk in upstream.aiter_raw(chunk_size=_CHUNK_SIZE): + yield chunk + finally: + await upstream.aclose() + await client.aclose() + + return StreamingResponse( + _response_body(), + status_code=upstream.status_code, + headers=_filter_headers(upstream.headers), + media_type=upstream.headers.get("content-type"), + ) + + +async def bridge_websocket( + client_ws: WebSocket, + running: RunningContainer, + *, + upstream_path: str, +) -> None: + """Bridge a WebSocket session between the browser and an inner container. + + The bridge speaks both text and binary frames. Auth for the inner server + is injected via the ``X-Session-API-Key`` header on the connect handshake + (the agent-server's WebSocket auth also accepts that header — see + :mod:`openhands.agent_server.sockets`). + + Closure semantics: when either side closes (or errors), we close the + other side and return. We do not attempt to reconnect. + """ + upstream_url = ( + running.base_url.replace("http://", "ws://").replace("https://", "wss://") + + upstream_path + ) + + await client_ws.accept() + + extra_headers = {"X-Session-API-Key": running.session_api_key} + + try: + async with websockets.connect( + upstream_url, additional_headers=extra_headers + ) as upstream_ws: + await _bridge_websocket_loop(client_ws, upstream_ws) + except websockets.exceptions.InvalidStatus as exc: + logger.warning( + "Upstream WebSocket rejected (%s) to %s", exc, running.container_id[:12] + ) + # 1011 == "internal error"; closest match for an upstream HTTP failure + # since browsers can't see HTTP status codes from a failed upgrade. + await client_ws.close(code=1011) + except (OSError, websockets.exceptions.WebSocketException) as exc: + logger.warning( + "Upstream WebSocket connect failed to %s: %s", + running.container_id[:12], + exc, + ) + await client_ws.close(code=1011) + + +async def _bridge_websocket_loop(client_ws: WebSocket, upstream_ws) -> None: + async def _client_to_upstream() -> None: + try: + while True: + message = await client_ws.receive() + if message.get("type") == "websocket.disconnect": + return + if "bytes" in message and message["bytes"] is not None: + await upstream_ws.send(message["bytes"]) + elif "text" in message and message["text"] is not None: + await upstream_ws.send(message["text"]) + except WebSocketDisconnect: + return + + async def _upstream_to_client() -> None: + try: + async for message in upstream_ws: + if isinstance(message, (bytes, bytearray)): + await client_ws.send_bytes(bytes(message)) + else: + await client_ws.send_text(message) + except websockets.exceptions.ConnectionClosed: + return + + task_a = asyncio.create_task(_client_to_upstream()) + task_b = asyncio.create_task(_upstream_to_client()) + done, pending = await asyncio.wait( + {task_a, task_b}, return_when=asyncio.FIRST_COMPLETED + ) + for task in pending: + task.cancel() + for task in pending: + try: + await task + except (asyncio.CancelledError, Exception): + pass + # Whichever side closed first dictates the close. Ensure the other side + # also closes cleanly so neither leaks file descriptors. + try: + await upstream_ws.close() + except Exception: + pass + try: + await client_ws.close() + except Exception: + pass diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/routers.py b/openhands-agent-server/openhands/agent_server/docker_runtime/routers.py new file mode 100644 index 0000000000..ee60efc024 --- /dev/null +++ b/openhands-agent-server/openhands/agent_server/docker_runtime/routers.py @@ -0,0 +1,378 @@ +"""FastAPI routers used when ``Config.conversation_runtime == "docker"``. + +These replace ``conversation_router``, ``event_router``, ``workspace_router`` +and the conversation half of ``sockets_router`` from the local-mode app. +Settings, profiles, workspaces, auth, the cloud proxy, the static frontend +and ``/server_info`` all continue to be served by the outer server unchanged +— they're not conversation-scoped. + +The flow on each request is: + +1. Extract the ``conversation_id`` from the path (or, for ``POST + /api/conversations``, generate one and remember it). +2. Look up — or, on creation, spawn — the matching Docker container in the + :class:`ContainerManager`. +3. Forward the request body and headers via + :func:`openhands.agent_server.docker_runtime.proxy.proxy_http` (or, for + WebSockets, :func:`bridge_websocket`). +""" + +from __future__ import annotations + +import asyncio +import json +from typing import Annotated +from uuid import UUID, uuid4 + +import httpx +from fastapi import ( + APIRouter, + HTTPException, + Query, + Request, + WebSocket, + status, +) +from starlette.responses import JSONResponse, StreamingResponse + +from openhands.agent_server.docker_runtime.container_manager import ( + ContainerManager, + ContainerStartupError, + DockerUnavailableError, + RunningContainer, +) +from openhands.agent_server.docker_runtime.proxy import ( + bridge_websocket, + proxy_http, +) +from openhands.sdk.logger import get_logger + + +logger = get_logger(__name__) + + +def get_container_manager(request: Request) -> ContainerManager: + manager = getattr(request.app.state, "container_manager", None) + if manager is None: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Container manager is not available", + ) + return manager + + +def _build_upstream_path(request: Request, path: str) -> str: + """Reconstruct the inner-container path from the outer request. + + The inner agent-server exposes the same API surface, so we forward the + same path verbatim. Only difference: the outer path is rooted at + ``/api/conversations/...`` and so is the inner one, so we just pass it + through. + """ + query = request.url.query + return f"{path}?{query}" if query else path + + +def _container_or_404( + manager: ContainerManager, conversation_id: UUID +) -> RunningContainer: + running = manager.get(conversation_id) + if running is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Conversation not found: {conversation_id}", + ) + return running + + +# --------------------------------------------------------------------------- +# HTTP: /api/conversations +# --------------------------------------------------------------------------- + +docker_conversation_router = APIRouter( + prefix="/conversations", tags=["Docker Conversations"] +) + + +@docker_conversation_router.post("") +async def docker_start_conversation( + request: Request, + include_skills: Annotated[bool, Query()] = False, +) -> JSONResponse: + """Spawn a fresh per-conversation container, then forward the request. + + The container is registered against the *resolved* conversation id (either + the one the client supplied or a fresh UUID4 minted here). The body is + rewritten to: + + * pin ``conversation_id`` so the inner agent-server agrees on the id, + * rewrite ``workspace.working_dir`` to ``/workspace`` — the inner + container's filesystem is the canonical one, not the outer host's. + """ + manager = get_container_manager(request) + + try: + body_bytes = await request.body() + body = json.loads(body_bytes) if body_bytes else {} + except json.JSONDecodeError as exc: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid JSON body: {exc}", + ) from exc + + raw_cid = body.get("conversation_id") + try: + conversation_id = UUID(raw_cid) if raw_cid else uuid4() + except (TypeError, ValueError) as exc: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid conversation_id: {raw_cid!r}", + ) from exc + + body["conversation_id"] = str(conversation_id) + + # Inside the container, the working dir is always /workspace. Whatever + # the caller passed in points to a host path we can't reach from the + # outer server's vantage point. + workspace = body.get("workspace") or {} + workspace["working_dir"] = "/workspace" + body["workspace"] = workspace + + try: + running = await manager.start(conversation_id) + except DockerUnavailableError as exc: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc) + ) from exc + except ContainerStartupError as exc: + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc) + ) from exc + + upstream_path = ( + f"/api/conversations?include_skills={'true' if include_skills else 'false'}" + ) + headers = { + "content-type": request.headers.get("content-type", "application/json"), + "X-Session-API-Key": running.session_api_key, + "accept": request.headers.get("accept", "application/json"), + } + + try: + async with httpx.AsyncClient(timeout=60.0) as client: + response = await client.post( + running.base_url + upstream_path, + headers=headers, + content=json.dumps(body).encode("utf-8"), + ) + except httpx.HTTPError as exc: + # If we managed to start the container but the very first request + # failed, that's a startup race. Treat as 502 and tear down so we + # don't leak a half-initialized container. + logger.warning( + "Initial request to fresh container %s failed: %s", + running.container_id[:12], + exc, + ) + await manager.stop(conversation_id) + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"Conversation container could not accept the request: {exc}", + ) from exc + + if response.status_code >= 400: + # The inner server rejected the create. Don't leave the container + # behind in that case — it'd be orphaned, since no client will know + # to send DELETE. + await manager.stop(conversation_id) + + return JSONResponse( + content=response.json() if response.content else None, + status_code=response.status_code, + ) + + +@docker_conversation_router.delete("/{conversation_id}") +async def docker_delete_conversation( + conversation_id: UUID, + request: Request, +) -> JSONResponse: + manager = get_container_manager(request) + running = manager.get(conversation_id) + if running is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Conversation not found: {conversation_id}", + ) + + # Best-effort: ask the inner server to delete its own state first, then + # always tear the container down so we don't leak it even if the inner + # delete failed. + delete_status = 200 + delete_body: bytes = b"" + try: + async with httpx.AsyncClient(timeout=30.0) as client: + upstream = await client.delete( + f"{running.base_url}/api/conversations/{conversation_id}", + headers={"X-Session-API-Key": running.session_api_key}, + ) + delete_status = upstream.status_code + delete_body = upstream.content + except httpx.HTTPError as exc: + logger.warning("Inner DELETE failed for %s: %s", conversation_id, exc) + finally: + await manager.stop(conversation_id) + + return JSONResponse( + content=json.loads(delete_body) if delete_body else None, + status_code=delete_status, + ) + + +_FANOUT_LIST_PATHS = {"", "/", "/search", "/count"} + + +@docker_conversation_router.get("/search") +async def docker_search_conversations(request: Request) -> JSONResponse: + return await _fanout_get_aggregate(request, "/api/conversations/search") + + +@docker_conversation_router.get("/count") +async def docker_count_conversations(request: Request) -> JSONResponse: + return await _fanout_count(request) + + +@docker_conversation_router.get("") +async def docker_list_conversations(request: Request) -> JSONResponse: + return await _fanout_get_aggregate(request, "/api/conversations") + + +@docker_conversation_router.api_route( + "/{conversation_id}", + methods=["GET", "PATCH"], +) +async def docker_proxy_conversation_root( + conversation_id: UUID, request: Request +) -> StreamingResponse: + manager = get_container_manager(request) + running = _container_or_404(manager, conversation_id) + return await proxy_http( + request, + running, + upstream_path=_build_upstream_path( + request, f"/api/conversations/{conversation_id}" + ), + ) + + +@docker_conversation_router.api_route( + "/{conversation_id}/{tail:path}", + methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", "HEAD"], +) +async def docker_proxy_conversation_subpath( + conversation_id: UUID, tail: str, request: Request +) -> StreamingResponse: + """Catch-all that proxies every conversation-scoped HTTP route. + + Covers ``/run``, ``/pause``, ``/interrupt``, ``/secrets``, + ``/confirmation_policy``, ``/switch_profile``, ``/switch_llm``, + ``/condense``, ``/fork``, ``/agent_final_response``, all of + ``/events/...`` (from ``event_router``), and all of ``/workspace/...`` + (from ``workspace_router``). + """ + manager = get_container_manager(request) + running = _container_or_404(manager, conversation_id) + upstream_path = _build_upstream_path( + request, f"/api/conversations/{conversation_id}/{tail}" + ) + return await proxy_http(request, running, upstream_path=upstream_path) + + +# --------------------------------------------------------------------------- +# HTTP fan-out helpers (list / search / count) +# --------------------------------------------------------------------------- + + +async def _fanout_get_aggregate(request: Request, path: str) -> JSONResponse: + """Aggregate ``items`` lists from every container. + + Each inner agent-server has at most one conversation, so the inner + response always looks like ``{"items": [...], "next_page_id": null}``. + We concatenate the items in deterministic order (by container start + time, which is the registry insertion order). + + Pagination is intentionally not implemented in this first cut: ``GET + /api/conversations`` returns at most ``len(containers)`` rows, which + is bounded by however many conversations the operator allows. + """ + manager = get_container_manager(request) + query = request.url.query + upstream_path = f"{path}?{query}" if query else path + + async def _fetch(running: RunningContainer): + try: + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.get( + running.base_url + upstream_path, + headers={"X-Session-API-Key": running.session_api_key}, + ) + except httpx.HTTPError as exc: + logger.warning( + "Fan-out GET %s failed for %s: %s", + upstream_path, + running.container_id[:12], + exc, + ) + return None + if resp.status_code != 200: + return None + try: + return resp.json() + except json.JSONDecodeError: + return None + + results = await asyncio.gather( + *[_fetch(c) for c in manager.list()], return_exceptions=False + ) + + aggregated_items: list = [] + for result in results: + if not result: + continue + items = result.get("items") + if isinstance(items, list): + aggregated_items.extend(items) + + return JSONResponse( + content={"items": aggregated_items, "next_page_id": None}, + status_code=200, + ) + + +async def _fanout_count(request: Request) -> JSONResponse: + manager = get_container_manager(request) + return JSONResponse(content={"count": len(manager.list())}, status_code=200) + + +# --------------------------------------------------------------------------- +# WebSockets: /sockets/events/{cid} +# --------------------------------------------------------------------------- + +docker_sockets_router = APIRouter(prefix="/sockets", tags=["Docker WebSockets"]) + + +@docker_sockets_router.websocket("/events/{conversation_id}") +async def docker_events_websocket(websocket: WebSocket, conversation_id: UUID) -> None: + manager = getattr(websocket.app.state, "container_manager", None) + if manager is None: + await websocket.close(code=1011) + return + running = manager.get(conversation_id) + if running is None: + # 1008 == policy violation; closest standard code for "no such conv". + await websocket.close(code=1008) + return + upstream_path = f"/sockets/events/{conversation_id}" + if websocket.url.query: + upstream_path = f"{upstream_path}?{websocket.url.query}" + await bridge_websocket(websocket, running, upstream_path=upstream_path) diff --git a/tests/agent_server/docker_runtime/__init__.py b/tests/agent_server/docker_runtime/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/agent_server/docker_runtime/test_container_manager.py b/tests/agent_server/docker_runtime/test_container_manager.py new file mode 100644 index 0000000000..3304048a6e --- /dev/null +++ b/tests/agent_server/docker_runtime/test_container_manager.py @@ -0,0 +1,246 @@ +"""Tests for :mod:`openhands.agent_server.docker_runtime.container_manager`. + +We never actually run Docker here — instead we inject a fake ``run_command`` +that records the arguments and returns whatever the test wants. Health +checks talk to a real :class:`http.server.HTTPServer` bound to localhost, +so the production code path (``urlopen``) is exercised end-to-end. +""" + +from __future__ import annotations + +import contextlib +import threading +from dataclasses import dataclass +from http.server import BaseHTTPRequestHandler, HTTPServer +from uuid import uuid4 + +import pytest + +from openhands.agent_server.config import Config +from openhands.agent_server.docker_runtime.container_manager import ( + ContainerManager, + ContainerStartupError, + DockerUnavailableError, +) + + +@dataclass +class FakeCompleted: + returncode: int + stdout: str = "" + stderr: str = "" + + +class _RecordingRun: + """Stand-in for ``subprocess.run`` that just records every call. + + Returns ``responses`` in order. Each response is matched against the + first token of the argv list (``docker``, then the verb) so tests stay + readable. + """ + + def __init__(self, responses: dict[str, list[FakeCompleted]]) -> None: + self.responses = {k: list(v) for k, v in responses.items()} + self.calls: list[list[str]] = [] + + def __call__(self, argv, **kwargs): + self.calls.append(list(argv)) + # Key off the docker subcommand (run, stop, inspect, ...). + verb = argv[1] if len(argv) > 1 else "" + queue = self.responses.get(verb) + if not queue: + return FakeCompleted(returncode=0) + return queue.pop(0) + + +def _docker_config(**overrides) -> Config: + base = { + "conversation_runtime": "docker", + "conversation_image": "ghcr.io/openhands/agent-server:test", + "conversation_container_startup_timeout": 5.0, + } + base.update(overrides) + return Config(**base) + + +@contextlib.contextmanager +def _healthy_server(): + """Run a tiny HTTP server that responds 200 to /health.""" + + class _Handler(BaseHTTPRequestHandler): + def do_GET(self): # noqa: N802 - http.server API + if self.path == "/health": + self.send_response(200) + self.end_headers() + self.wfile.write(b"ok") + else: + self.send_response(404) + self.end_headers() + + def log_message(self, *args, **kwargs): # silence noisy stderr + pass + + server = HTTPServer(("127.0.0.1", 0), _Handler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + try: + yield server.server_address[1] + finally: + server.shutdown() + server.server_close() + + +async def test_start_invokes_docker_run_and_waits_for_health(monkeypatch): + with _healthy_server() as port: + # Force the port allocator to hand back the healthy server's port + # so the manager's health check actually succeeds. + monkeypatch.setattr( + "openhands.agent_server.docker_runtime.container_manager" + "._find_available_tcp_port", + lambda: port, + ) + + run = _RecordingRun( + { + "version": [FakeCompleted(returncode=0, stdout="Docker version")], + "run": [FakeCompleted(returncode=0, stdout="abcdef1234567890\n")], + } + ) + manager = ContainerManager( + _docker_config(), run_command=run, sleep=lambda _s: None + ) + + cid = uuid4() + running = await manager.start(cid) + + assert running.conversation_id == cid + assert running.container_id == "abcdef1234567890" + assert running.host_port == port + assert running.session_api_key # auto-minted + + # docker run was invoked with the expected port mapping and image. + run_argv = next(call for call in run.calls if call[1] == "run") + assert "-p" in run_argv + port_arg_index = run_argv.index("-p") + 1 + assert run_argv[port_arg_index] == f"{port}:8000" + assert "ghcr.io/openhands/agent-server:test" in run_argv + # Session key got injected via env so the inner server requires it. + env_args = [run_argv[i + 1] for i, arg in enumerate(run_argv) if arg == "-e"] + assert any( + e.startswith("OH_SESSION_API_KEYS_0=") and running.session_api_key in e + for e in env_args + ) + + +async def test_start_is_idempotent_per_conversation_id(monkeypatch): + with _healthy_server() as port: + monkeypatch.setattr( + "openhands.agent_server.docker_runtime.container_manager" + "._find_available_tcp_port", + lambda: port, + ) + run = _RecordingRun( + { + "version": [FakeCompleted(returncode=0, stdout="ok")], + "run": [FakeCompleted(returncode=0, stdout="container-1\n")], + } + ) + manager = ContainerManager( + _docker_config(), run_command=run, sleep=lambda _s: None + ) + cid = uuid4() + first = await manager.start(cid) + second = await manager.start(cid) + assert first is second + # Only one ``docker run`` was issued. + assert sum(1 for c in run.calls if c[1] == "run") == 1 + + +async def test_start_raises_when_docker_is_missing(): + run = _RecordingRun( + {"version": [FakeCompleted(returncode=1, stderr="docker: not found")]} + ) + manager = ContainerManager(_docker_config(), run_command=run, sleep=lambda _s: None) + with pytest.raises(DockerUnavailableError): + await manager.start(uuid4()) + + +async def test_start_cleans_up_on_container_exit(monkeypatch): + """If the container dies during startup we must call ``docker stop``.""" + # We deliberately do not start a healthy server, so /health never + # responds. Instead we make ``docker inspect`` report the container is + # not running, which trips the early-exit branch. + monkeypatch.setattr( + "openhands.agent_server.docker_runtime.container_manager" + "._find_available_tcp_port", + lambda: 39999, # unused port - urlopen will refuse + ) + run = _RecordingRun( + { + "version": [FakeCompleted(returncode=0)], + "run": [FakeCompleted(returncode=0, stdout="dead-container\n")], + "inspect": [FakeCompleted(returncode=0, stdout="false\n")], + "logs": [FakeCompleted(returncode=0, stdout="boom")], + "stop": [FakeCompleted(returncode=0)], + } + ) + manager = ContainerManager(_docker_config(), run_command=run, sleep=lambda _s: None) + with pytest.raises(ContainerStartupError): + await manager.start(uuid4()) + # The teardown must have stopped the orphaned container. + assert any(call[1] == "stop" for call in run.calls) + + +async def test_stop_removes_from_registry(monkeypatch): + with _healthy_server() as port: + monkeypatch.setattr( + "openhands.agent_server.docker_runtime.container_manager" + "._find_available_tcp_port", + lambda: port, + ) + run = _RecordingRun( + { + "version": [FakeCompleted(returncode=0)], + "run": [FakeCompleted(returncode=0, stdout="some-container\n")], + "stop": [FakeCompleted(returncode=0)], + } + ) + manager = ContainerManager( + _docker_config(), run_command=run, sleep=lambda _s: None + ) + cid = uuid4() + await manager.start(cid) + assert manager.get(cid) is not None + stopped = await manager.stop(cid) + assert stopped is True + assert manager.get(cid) is None + assert await manager.stop(cid) is False + + +async def test_shutdown_stops_all_tracked_containers(monkeypatch): + with _healthy_server() as port: + monkeypatch.setattr( + "openhands.agent_server.docker_runtime.container_manager" + "._find_available_tcp_port", + lambda: port, + ) + run = _RecordingRun( + { + "version": [FakeCompleted(returncode=0)] * 4, + "run": [ + FakeCompleted(returncode=0, stdout="c1\n"), + FakeCompleted(returncode=0, stdout="c2\n"), + ], + "stop": [FakeCompleted(returncode=0), FakeCompleted(returncode=0)], + } + ) + manager = ContainerManager( + _docker_config(), run_command=run, sleep=lambda _s: None + ) + await manager.start(uuid4()) + await manager.start(uuid4()) + assert len(manager.list()) == 2 + await manager.shutdown() + assert manager.list() == [] + # We issued two ``docker stop`` calls. + assert sum(1 for c in run.calls if c[1] == "stop") == 2 diff --git a/tests/agent_server/docker_runtime/test_docker_routers.py b/tests/agent_server/docker_runtime/test_docker_routers.py new file mode 100644 index 0000000000..1461810680 --- /dev/null +++ b/tests/agent_server/docker_runtime/test_docker_routers.py @@ -0,0 +1,313 @@ +"""End-to-end tests for the docker-runtime FastAPI routers. + +The "inner" container is replaced by a real FastAPI app running on an +ephemeral localhost port, plumbed in via a stub :class:`ContainerManager`. +We exercise the public HTTP and WebSocket surface of the outer agent-server +the same way a real client would (via :class:`TestClient`), so any +shape-of-the-wire bug in the proxy layer would show up here. +""" + +from __future__ import annotations + +import threading +from contextlib import contextmanager +from uuid import UUID, uuid4 + +import pytest +import uvicorn +from fastapi import APIRouter, FastAPI, Header, WebSocket +from fastapi.testclient import TestClient + +from openhands.agent_server.api import create_app +from openhands.agent_server.config import Config +from openhands.agent_server.docker_runtime.container_manager import RunningContainer + + +# --------------------------------------------------------------------------- +# Fake inner agent-server (FastAPI) bound to a real port +# --------------------------------------------------------------------------- + + +def _build_inner_app(session_key: str) -> FastAPI: + """A minimal FastAPI app shaped like the per-conversation agent-server.""" + app = FastAPI() + + def _check(authorization: str | None) -> bool: + return authorization == session_key + + api = APIRouter(prefix="/api") + + @api.post("/conversations") + async def create_conversation( + payload: dict, + x_session_api_key: str = Header(default=""), + ): + if not _check(x_session_api_key): + return {"detail": "unauthorized"}, 401 + return {"id": payload.get("conversation_id"), "echoed": payload} + + @api.delete("/conversations/{cid}") + async def delete_conversation( + cid: str, x_session_api_key: str = Header(default="") + ): + if not _check(x_session_api_key): + return {"detail": "unauthorized"}, 401 + return {"deleted": cid} + + @api.get("/conversations/{cid}/run") + async def get_run(cid: str, x_session_api_key: str = Header(default="")): + if not _check(x_session_api_key): + return {"detail": "unauthorized"}, 401 + return {"cid": cid, "status": "running"} + + @api.get("/conversations/{cid}/workspace/{file_path:path}") + async def serve_workspace( + cid: str, + file_path: str, + x_session_api_key: str = Header(default=""), + ): + if not _check(x_session_api_key): + return {"detail": "unauthorized"}, 401 + return {"file": file_path, "cid": cid} + + @api.get("/conversations") + async def list_conversations(x_session_api_key: str = Header(default="")): + if not _check(x_session_api_key): + return {"detail": "unauthorized"}, 401 + return {"items": [{"id": "inner-1"}], "next_page_id": None} + + @api.get("/conversations/search") + async def search_conversations(x_session_api_key: str = Header(default="")): + return {"items": [{"id": "inner-1"}], "next_page_id": None} + + app.include_router(api) + + @app.websocket("/sockets/events/{cid}") + async def events_ws(websocket: WebSocket, cid: str): + # Auth check via header on the upgrade request. + if websocket.headers.get("x-session-api-key") != session_key: + await websocket.close(code=1008) + return + await websocket.accept() + # Echo back whatever the client sends, plus a server-initiated frame. + await websocket.send_text(f"hello {cid}") + try: + while True: + msg = await websocket.receive_text() + await websocket.send_text(f"echo:{msg}") + except Exception: + pass + + return app + + +@contextmanager +def _run_inner_app(session_key: str): + """Run the fake inner app on a real localhost port.""" + app = _build_inner_app(session_key) + config = uvicorn.Config(app, host="127.0.0.1", port=0, log_level="warning") + server = uvicorn.Server(config) + thread = threading.Thread(target=server.run, daemon=True) + thread.start() + # uvicorn assigns the real port lazily; wait until it's bound. + import time + + deadline = time.time() + 10 + port: int | None = None + while time.time() < deadline: + if server.started and server.servers: + # ``servers[0].sockets[0].getsockname()`` gives us the bound port. + sockets = list(server.servers)[0].sockets + if sockets: + port = sockets[0].getsockname()[1] + break + time.sleep(0.05) + if port is None: + raise RuntimeError("inner app failed to bind") + try: + yield port + finally: + server.should_exit = True + thread.join(timeout=5) + + +# --------------------------------------------------------------------------- +# Stub ContainerManager wired into the outer app +# --------------------------------------------------------------------------- + + +class _StubContainerManager: + """A stand-in ContainerManager that points every conversation at a + pre-existing real HTTP server (the fake inner app).""" + + def __init__(self, port: int, session_key: str) -> None: + self._port = port + self._session_key = session_key + self._containers: dict[UUID, RunningContainer] = {} + + def _make(self, cid: UUID) -> RunningContainer: + return RunningContainer( + conversation_id=cid, + container_id=f"fake-{cid.hex[:8]}", + host_port=self._port, + session_api_key=self._session_key, + image="fake", + ) + + def get(self, cid: UUID) -> RunningContainer | None: + return self._containers.get(cid) + + def list(self): + return list(self._containers.values()) + + async def start(self, cid: UUID) -> RunningContainer: + if cid not in self._containers: + self._containers[cid] = self._make(cid) + return self._containers[cid] + + async def stop(self, cid: UUID) -> bool: + return self._containers.pop(cid, None) is not None + + async def shutdown(self) -> None: + self._containers.clear() + + +@pytest.fixture +def docker_app(): + """Spin up the docker-mode outer FastAPI app + a fake inner server. + + We deliberately do NOT enter the lifespan context (no ``with TestClient(...) + as client``): the lifespan starts a tmux/vscode/desktop service that we + don't want to drag into these tests. Instead we set ``container_manager`` + and ``proxy_client`` directly on ``app.state``, which is what the lifespan + would do in docker mode. + """ + session_key = "inner-secret" + with _run_inner_app(session_key) as port: + app = create_app(Config(conversation_runtime="docker")) + app.state.container_manager = _StubContainerManager(port, session_key) + client = TestClient(app) + try: + yield client, app + finally: + client.close() + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_post_conversations_spawns_and_forwards(docker_app): + client, app = docker_app + body = { + "workspace": {"working_dir": "/host/will-be-rewritten"}, + "agent": {"kind": "Agent"}, + } + resp = client.post("/api/conversations", json=body) + assert resp.status_code == 200 + payload = resp.json() + + # Inner app sees a freshly-minted conversation_id and the rewritten + # workspace path. We don't care what the id is, only that it's + # consistent. + inner_payload = payload["echoed"] + assert inner_payload["workspace"]["working_dir"] == "/workspace" + cid = UUID(inner_payload["conversation_id"]) + assert app.state.container_manager.get(cid) is not None + + +def test_subpath_proxied_to_inner_server(docker_app): + client, app = docker_app + create = client.post( + "/api/conversations", + json={"workspace": {"working_dir": "/x"}, "agent": {}}, + ) + cid = UUID(create.json()["echoed"]["conversation_id"]) + + # Generic catch-all routes + run = client.get(f"/api/conversations/{cid}/run") + assert run.status_code == 200 + assert run.json() == {"cid": str(cid), "status": "running"} + + workspace = client.get(f"/api/conversations/{cid}/workspace/foo/bar.txt") + assert workspace.status_code == 200 + assert workspace.json() == {"file": "foo/bar.txt", "cid": str(cid)} + + +def test_subpath_returns_404_when_no_container(docker_app): + client, _ = docker_app + cid = uuid4() + resp = client.get(f"/api/conversations/{cid}/run") + assert resp.status_code == 404 + + +def test_delete_proxies_then_stops_container(docker_app): + client, app = docker_app + create = client.post( + "/api/conversations", + json={"workspace": {"working_dir": "/x"}, "agent": {}}, + ) + cid = UUID(create.json()["echoed"]["conversation_id"]) + assert app.state.container_manager.get(cid) is not None + + delete = client.delete(f"/api/conversations/{cid}") + assert delete.status_code == 200 + assert delete.json() == {"deleted": str(cid)} + # Container has been deregistered. + assert app.state.container_manager.get(cid) is None + + +def test_list_aggregates_across_containers(docker_app): + client, _ = docker_app + # Spawn two conversations + client.post("/api/conversations", json={"workspace": {}, "agent": {}}) + client.post("/api/conversations", json={"workspace": {}, "agent": {}}) + + resp = client.get("/api/conversations") + assert resp.status_code == 200 + payload = resp.json() + # Each inner server returns one item -> we should see two. + assert len(payload["items"]) == 2 + + +def test_count_uses_local_registry(docker_app): + client, _ = docker_app + assert client.get("/api/conversations/count").json() == {"count": 0} + client.post("/api/conversations", json={"workspace": {}, "agent": {}}) + assert client.get("/api/conversations/count").json() == {"count": 1} + + +def test_websocket_bridges_to_inner_server(docker_app): + client, _ = docker_app + create = client.post( + "/api/conversations", + json={"workspace": {"working_dir": "/x"}, "agent": {}}, + ) + cid = UUID(create.json()["echoed"]["conversation_id"]) + + with client.websocket_connect(f"/sockets/events/{cid}") as ws: + greeting = ws.receive_text() + assert greeting == f"hello {cid}" + ws.send_text("ping") + assert ws.receive_text() == "echo:ping" + + +def test_websocket_closes_when_conversation_unknown(docker_app): + client, _ = docker_app + cid = uuid4() + with pytest.raises(Exception): + with client.websocket_connect(f"/sockets/events/{cid}"): + pass + + +def test_local_mode_routes_are_unchanged(): + """Sanity check: enabling docker mode must not have leaked into local mode.""" + app = create_app(Config(conversation_runtime="local")) + paths = {r.path for r in app.routes if hasattr(r, "path")} + # Local conversation router exposes the canonical POST /api/conversations + # plus the lifecycle endpoints. Docker mode catch-all lives at + # /api/conversations/{conversation_id}/{tail:path}; that path must NOT + # appear in local mode. + assert "/api/conversations" in paths + assert "/api/conversations/{conversation_id}/{tail:path}" not in paths From e7ec1a7d8c987a9acb886436ea06db324e1998dc Mon Sep 17 00:00:00 2001 From: openhands Date: Wed, 27 May 2026 15:13:35 +0000 Subject: [PATCH 2/3] fix(agent-server,docker_runtime): address PR review feedback Six fixes for the per-conversation docker runtime, driven by reviewer findings on PR #3403: 1. Bind inner container ports to loopback only (-p 127.0.0.1:HOST:8000) so the per-conversation agent-servers can only be reached through the outer server's authenticated proxy. (R3311480573) 2. Authenticate the WebSocket bridge against the OUTER server's session keys before opening the upstream connection. Reuses the existing sockets.py helper (header / query / first-message auth), and the bridge no longer calls accept() a second time. (R3311480598) 3. Preserve the local GET /api/conversations?ids=... contract: route is batch-get-by-id, requires the ids query param, returns list[ConversationInfo | None]. Looks each id up in the registry and fetches from its container (None for missing). (R3311480555, R3311480542) 4. Preserve the local /api/conversations/count contract: returns a bare JSON integer (not {"count": N}), honors ?status= by forwarding the query to each inner container and summing their integers. (R3311480576, R3311480571) 5. ContainerManager.start() now returns (running, is_new). The POST route only tears down the container on inner 4xx / connection error when is_new=True, so a retried create against an existing conversation can no longer kill the live container. (R3311480570) 6. Workspace static-file routes mount under the workspace-cookie auth group in docker mode via a new docker_workspace_router. The workspace router is now registered before the header-only api_router so the more specific path wins; browser iframe/ embeds with the oh_workspace_session_key cookie continue to work. (R3311480585) Tests: * test_container_manager: assert loopback port binding; updated for the (running, is_new) return tuple, plus an explicit is_new=False assert on the idempotent second start. * test_docker_routers: new tests for batch-get-by-ids (incl. 422 on missing ids, null slots for unknown ids), bare-int /count contract, WS rejects wrong key, WS rejects missing first-message auth, WS accepts with valid outer key, POST retry preserves existing container on inner 4xx, fresh-create cleans up on inner 4xx, workspace route registered before the catch-all. Fake inner app reordered so /search and /count aren't shadowed by /{cid}. 22 docker_runtime tests pass; 144 tests in api / conversation / workspace / docker_runtime all green. Co-authored-by: openhands --- .../openhands/agent_server/api.py | 18 +- .../docker_runtime/container_manager.py | 19 +- .../agent_server/docker_runtime/proxy.py | 7 +- .../agent_server/docker_runtime/routers.py | 278 +++++++++++++---- .../docker_runtime/test_container_manager.py | 15 +- .../docker_runtime/test_docker_routers.py | 293 +++++++++++++++++- 6 files changed, 541 insertions(+), 89 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/api.py b/openhands-agent-server/openhands/agent_server/api.py index a7e6102dab..1bc995df72 100644 --- a/openhands-agent-server/openhands/agent_server/api.py +++ b/openhands-agent-server/openhands/agent_server/api.py @@ -38,6 +38,7 @@ from openhands.agent_server.docker_runtime.routers import ( docker_conversation_router, docker_sockets_router, + docker_workspace_router, ) from openhands.agent_server.event_router import event_router from openhands.agent_server.file_router import file_router @@ -347,7 +348,6 @@ def _add_api_routes(app: FastAPI, config: Config) -> None: # /api/auth/* mints workspace cookies and requires the header to bootstrap, # so it lives under the header-only auth group. api_router.include_router(auth_router) - app.include_router(api_router) # Workspace static-file routes get their own auth group that accepts # EITHER the X-Session-API-Key header OR the workspace session cookie. @@ -360,11 +360,21 @@ def _add_api_routes(app: FastAPI, config: Config) -> None: Depends(create_workspace_session_dependency(config)) ) workspace_api_router = APIRouter(prefix="/api", dependencies=workspace_dependencies) - if config.conversation_runtime == "local": - # In docker mode workspace static files are served by the generic - # ``/api/conversations/{cid}/workspace/...`` proxy registered above. + if config.conversation_runtime == "docker": + # Proxy workspace static files via the per-conversation container, + # but under the workspace-cookie auth group so iframe/img embeds work. + workspace_api_router.include_router(docker_workspace_router) + else: workspace_api_router.include_router(workspace_router) + + # Order matters: the workspace router is more specific + # (``/conversations/{cid}/workspace/...``) than the docker catch-all + # (``/conversations/{cid}/{tail:path}``). Starlette matches in + # registration order, so we MUST include the cookie-auth workspace + # router before the header-auth api_router; otherwise the catch-all + # would shadow workspace requests and demand the header. app.include_router(workspace_api_router) + app.include_router(api_router) if config.conversation_runtime == "docker": app.include_router(docker_sockets_router) diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/container_manager.py b/openhands-agent-server/openhands/agent_server/docker_runtime/container_manager.py index 311f3180f0..265c3e7f23 100644 --- a/openhands-agent-server/openhands/agent_server/docker_runtime/container_manager.py +++ b/openhands-agent-server/openhands/agent_server/docker_runtime/container_manager.py @@ -131,21 +131,28 @@ def list(self) -> list[RunningContainer]: def get(self, conversation_id: UUID) -> RunningContainer | None: return self._containers.get(conversation_id) - async def start(self, conversation_id: UUID) -> RunningContainer: + async def start(self, conversation_id: UUID) -> tuple[RunningContainer, bool]: """Spawn a fresh container for ``conversation_id``. Idempotent: if a container is already registered for this conversation the existing :class:`RunningContainer` is returned and no new container is started. + + Returns: + ``(running, is_new)``: ``is_new`` is ``True`` when this call + actually spawned a new container, and ``False`` when an existing + registered container was returned. Callers that want to clean + up after a failed startup MUST gate the teardown on ``is_new`` + so retried requests don't tear down a live conversation. """ async with self._lock: existing = self._containers.get(conversation_id) if existing is not None: - return existing + return existing, False running = await asyncio.to_thread(self._start_blocking, conversation_id) self._containers[conversation_id] = running - return running + return running, True async def stop(self, conversation_id: UUID) -> bool: """Stop and forget the container for ``conversation_id``. @@ -211,7 +218,11 @@ def _start_blocking(self, conversation_id: UUID) -> RunningContainer: "--name", container_name, "-p", - f"{host_port}:8000", + # Bind only to loopback. The outer agent-server reaches the + # inner one via 127.0.0.1; exposing it on all host interfaces + # would bypass the outer auth and turn every per-conversation + # container into a publicly addressable agent-server. + f"127.0.0.1:{host_port}:8000", *flags, image, "--host", diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/proxy.py b/openhands-agent-server/openhands/agent_server/docker_runtime/proxy.py index 9e0d4fc07b..33c5d95015 100644 --- a/openhands-agent-server/openhands/agent_server/docker_runtime/proxy.py +++ b/openhands-agent-server/openhands/agent_server/docker_runtime/proxy.py @@ -151,6 +151,11 @@ async def bridge_websocket( (the agent-server's WebSocket auth also accepts that header — see :mod:`openhands.agent_server.sockets`). + Precondition: ``client_ws`` MUST already be accepted by the caller. The + bridge does not call ``accept()`` itself because the outer server's + WebSocket-auth helper (which accepts on success) needs to run first. + Calling ``accept()`` a second time would raise. + Closure semantics: when either side closes (or errors), we close the other side and return. We do not attempt to reconnect. """ @@ -159,8 +164,6 @@ async def bridge_websocket( + upstream_path ) - await client_ws.accept() - extra_headers = {"X-Session-API-Key": running.session_api_key} try: diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/routers.py b/openhands-agent-server/openhands/agent_server/docker_runtime/routers.py index ee60efc024..8bb89e26be 100644 --- a/openhands-agent-server/openhands/agent_server/docker_runtime/routers.py +++ b/openhands-agent-server/openhands/agent_server/docker_runtime/routers.py @@ -33,7 +33,7 @@ WebSocket, status, ) -from starlette.responses import JSONResponse, StreamingResponse +from starlette.responses import JSONResponse, Response, StreamingResponse from openhands.agent_server.docker_runtime.container_manager import ( ContainerManager, @@ -139,7 +139,7 @@ async def docker_start_conversation( body["workspace"] = workspace try: - running = await manager.start(conversation_id) + running, is_new = await manager.start(conversation_id) except DockerUnavailableError as exc: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc) @@ -167,23 +167,27 @@ async def docker_start_conversation( ) except httpx.HTTPError as exc: # If we managed to start the container but the very first request - # failed, that's a startup race. Treat as 502 and tear down so we - # don't leak a half-initialized container. + # failed, that's a startup race. Tear down only the container WE + # just created — otherwise a retry against an existing + # conversation would kill the live one. logger.warning( "Initial request to fresh container %s failed: %s", running.container_id[:12], exc, ) - await manager.stop(conversation_id) + if is_new: + await manager.stop(conversation_id) raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Conversation container could not accept the request: {exc}", ) from exc - if response.status_code >= 400: + if response.status_code >= 400 and is_new: # The inner server rejected the create. Don't leave the container # behind in that case — it'd be orphaned, since no client will know - # to send DELETE. + # to send DELETE. But only if we were the ones who started it: + # a retried create against an existing conversation must not tear + # down the live conversation. await manager.stop(conversation_id) return JSONResponse( @@ -229,22 +233,74 @@ async def docker_delete_conversation( ) -_FANOUT_LIST_PATHS = {"", "/", "/search", "/count"} - - @docker_conversation_router.get("/search") async def docker_search_conversations(request: Request) -> JSONResponse: - return await _fanout_get_aggregate(request, "/api/conversations/search") + """Fan-out listing endpoint — preserves the local + :class:`ConversationPage` wire shape (``{"items": [...], "next_page_id": + null}``). Each inner agent-server has at most one conversation, so we + just concatenate the inner ``items`` lists. ``page_id`` / ``limit`` / + ``sort_order`` are not honored across containers in this first cut. + """ + return await _fanout_search(request) @docker_conversation_router.get("/count") -async def docker_count_conversations(request: Request) -> JSONResponse: +async def docker_count_conversations(request: Request) -> Response: + """Fan-out count — preserves the local contract of returning a bare + JSON integer (not ``{"count": N}``). Honors the ``?status=`` filter by + forwarding it to each container, so containers whose conversation does + not match contribute 0. + """ return await _fanout_count(request) @docker_conversation_router.get("") -async def docker_list_conversations(request: Request) -> JSONResponse: - return await _fanout_get_aggregate(request, "/api/conversations") +async def docker_batch_get_conversations( + request: Request, + ids: Annotated[list[UUID], Query()], + include_skills: Annotated[bool, Query()] = False, +) -> JSONResponse: + """Batch-get conversations by id — preserves the local + ``GET /api/conversations?ids=...`` contract (returns + ``list[ConversationInfo | None]`` with ``None`` for missing ids). + + Each id is looked up in the container registry; matched ids are + fetched from their respective container, mismatched ids slot in as + ``None``. ``ids`` is required (same as local mode). + """ + if len(ids) >= 100: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Too many ids requested (limit 100).", + ) + + manager = get_container_manager(request) + + async def _fetch_one(cid: UUID): + running = manager.get(cid) + if running is None: + return None + suffix = "?include_skills=true" if include_skills else "" + url = f"{running.base_url}/api/conversations/{cid}{suffix}" + try: + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.get( + url, headers={"X-Session-API-Key": running.session_api_key} + ) + except httpx.HTTPError as exc: + logger.warning("Batch-get failed for %s: %s", cid, exc) + return None + if resp.status_code == 404: + return None + if resp.status_code != 200: + return None + try: + return resp.json() + except json.JSONDecodeError: + return None + + results = await asyncio.gather(*[_fetch_one(cid) for cid in ids]) + return JSONResponse(content=list(results), status_code=200) @docker_conversation_router.api_route( @@ -289,59 +345,62 @@ async def docker_proxy_conversation_subpath( # --------------------------------------------------------------------------- -# HTTP fan-out helpers (list / search / count) +# HTTP fan-out helpers (search / count) # --------------------------------------------------------------------------- -async def _fanout_get_aggregate(request: Request, path: str) -> JSONResponse: - """Aggregate ``items`` lists from every container. +async def _inner_get_json(running: RunningContainer, upstream_path: str): + """Issue a GET to one container's inner agent-server and decode JSON. + + Returns ``None`` on any failure so the caller can treat that container + as contributing nothing to the aggregate. + """ + try: + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.get( + running.base_url + upstream_path, + headers={"X-Session-API-Key": running.session_api_key}, + ) + except httpx.HTTPError as exc: + logger.warning( + "Fan-out GET %s failed for %s: %s", + upstream_path, + running.container_id[:12], + exc, + ) + return None + if resp.status_code != 200: + return None + try: + return resp.json() + except json.JSONDecodeError: + return None + - Each inner agent-server has at most one conversation, so the inner - response always looks like ``{"items": [...], "next_page_id": null}``. - We concatenate the items in deterministic order (by container start - time, which is the registry insertion order). +async def _fanout_search(request: Request) -> JSONResponse: + """Fan-out for ``/api/conversations/search``. - Pagination is intentionally not implemented in this first cut: ``GET - /api/conversations`` returns at most ``len(containers)`` rows, which - is bounded by however many conversations the operator allows. + Returns the same wire shape as :class:`ConversationPage` — + ``{"items": [...], "next_page_id": null}``. Each inner server runs at + most one conversation, so we just concatenate ``items`` from every + container's response. """ manager = get_container_manager(request) query = request.url.query - upstream_path = f"{path}?{query}" if query else path - - async def _fetch(running: RunningContainer): - try: - async with httpx.AsyncClient(timeout=15.0) as client: - resp = await client.get( - running.base_url + upstream_path, - headers={"X-Session-API-Key": running.session_api_key}, - ) - except httpx.HTTPError as exc: - logger.warning( - "Fan-out GET %s failed for %s: %s", - upstream_path, - running.container_id[:12], - exc, - ) - return None - if resp.status_code != 200: - return None - try: - return resp.json() - except json.JSONDecodeError: - return None + upstream_path = ( + f"/api/conversations/search?{query}" if query else "/api/conversations/search" + ) results = await asyncio.gather( - *[_fetch(c) for c in manager.list()], return_exceptions=False + *[_inner_get_json(c, upstream_path) for c in manager.list()] ) aggregated_items: list = [] for result in results: - if not result: - continue - items = result.get("items") - if isinstance(items, list): - aggregated_items.extend(items) + if isinstance(result, dict): + items = result.get("items") + if isinstance(items, list): + aggregated_items.extend(items) return JSONResponse( content={"items": aggregated_items, "next_page_id": None}, @@ -349,9 +408,66 @@ async def _fetch(running: RunningContainer): ) -async def _fanout_count(request: Request) -> JSONResponse: +async def _fanout_count(request: Request) -> Response: + """Fan-out for ``/api/conversations/count``. + + The local endpoint returns a bare JSON integer. We forward the same + query string (``?status=...``) to every inner container, sum the + returned integers, and emit a bare integer. + """ + manager = get_container_manager(request) + query = request.url.query + upstream_path = ( + f"/api/conversations/count?{query}" if query else "/api/conversations/count" + ) + + results = await asyncio.gather( + *[_inner_get_json(c, upstream_path) for c in manager.list()] + ) + + total = 0 + for result in results: + if isinstance(result, int): + total += result + # Be tolerant of an inner server that ever returns ``{"count": N}``. + elif isinstance(result, dict) and isinstance(result.get("count"), int): + total += result["count"] + + return Response(content=json.dumps(total), media_type="application/json") + + +# --------------------------------------------------------------------------- +# Workspace static files — same path as the local workspace_router, but +# served under the workspace-cookie auth group so that browser iframe / +# embeds work without the X-Session-API-Key header. Registered +# BEFORE ``docker_conversation_router``'s catch-all so the more specific +# workspace path wins. We deliberately do NOT pull in the cookie +# dependency here — ``api.py`` mounts this router under the existing +# ``workspace_api_router`` whose dependencies already implement +# cookie-or-header auth. +# --------------------------------------------------------------------------- + +docker_workspace_router = APIRouter(prefix="/conversations", tags=["Docker Workspace"]) + + +@docker_workspace_router.get("/{conversation_id}/workspace/{file_path:path}") +async def docker_proxy_workspace_file( + conversation_id: UUID, file_path: str, request: Request +) -> StreamingResponse: + """Proxy workspace static-file reads to the per-conversation container. + + The local :class:`workspace_router` resolves ``file_path`` against the + conversation's working dir on the host. In docker mode the canonical + filesystem lives inside the container, so we just hand the request + through to the inner server's identical route. + """ manager = get_container_manager(request) - return JSONResponse(content={"count": len(manager.list())}, status_code=200) + running = _container_or_404(manager, conversation_id) + upstream_path = _build_upstream_path( + request, + f"/api/conversations/{conversation_id}/workspace/{file_path}", + ) + return await proxy_http(request, running, upstream_path=upstream_path) # --------------------------------------------------------------------------- @@ -362,7 +478,30 @@ async def _fanout_count(request: Request) -> JSONResponse: @docker_sockets_router.websocket("/events/{conversation_id}") -async def docker_events_websocket(websocket: WebSocket, conversation_id: UUID) -> None: +async def docker_events_websocket( + websocket: WebSocket, + conversation_id: UUID, + session_api_key: Annotated[str | None, Query(alias="session_api_key")] = None, +) -> None: + """Authenticated WebSocket bridge to the per-conversation container. + + Auth must succeed against the OUTER server's session keys before we + reach the inner container — otherwise a request with no key would be + indistinguishable from one the outer server has already authorized, + since the bridge re-signs with the container's session key. We reuse + :func:`openhands.agent_server.sockets._accept_authenticated_websocket`, + which supports the same three auth methods the local sockets router + accepts (header / query / first-message ``{"type": "auth", ...}``). + On success the helper has already ``accept()``ed the socket, so the + downstream bridge must NOT accept again. + """ + # Imported lazily to avoid a circular import: the sockets module pulls + # in the in-process conversation service at module scope. + from openhands.agent_server.sockets import _accept_authenticated_websocket + + if not await _accept_authenticated_websocket(websocket, session_api_key): + return + manager = getattr(websocket.app.state, "container_manager", None) if manager is None: await websocket.close(code=1011) @@ -372,7 +511,30 @@ async def docker_events_websocket(websocket: WebSocket, conversation_id: UUID) - # 1008 == policy violation; closest standard code for "no such conv". await websocket.close(code=1008) return + + # Strip the auth query param before forwarding upstream — the inner + # server is reached via the container session key (in the header), + # never the outer-facing one. upstream_path = f"/sockets/events/{conversation_id}" - if websocket.url.query: - upstream_path = f"{upstream_path}?{websocket.url.query}" + forwarded_query = _strip_auth_query(websocket.url.query) + if forwarded_query: + upstream_path = f"{upstream_path}?{forwarded_query}" await bridge_websocket(websocket, running, upstream_path=upstream_path) + + +def _strip_auth_query(query: str) -> str: + """Remove ``session_api_key`` from a urlencoded query string. + + The outer server's session key must never leak into the inner + container's logs or request history. + """ + if not query: + return "" + from urllib.parse import parse_qsl, urlencode + + keep = [ + (k, v) + for k, v in parse_qsl(query, keep_blank_values=True) + if k != "session_api_key" + ] + return urlencode(keep) diff --git a/tests/agent_server/docker_runtime/test_container_manager.py b/tests/agent_server/docker_runtime/test_container_manager.py index 3304048a6e..6dbf2bd697 100644 --- a/tests/agent_server/docker_runtime/test_container_manager.py +++ b/tests/agent_server/docker_runtime/test_container_manager.py @@ -111,8 +111,9 @@ async def test_start_invokes_docker_run_and_waits_for_health(monkeypatch): ) cid = uuid4() - running = await manager.start(cid) + running, is_new = await manager.start(cid) + assert is_new is True assert running.conversation_id == cid assert running.container_id == "abcdef1234567890" assert running.host_port == port @@ -122,7 +123,9 @@ async def test_start_invokes_docker_run_and_waits_for_health(monkeypatch): run_argv = next(call for call in run.calls if call[1] == "run") assert "-p" in run_argv port_arg_index = run_argv.index("-p") + 1 - assert run_argv[port_arg_index] == f"{port}:8000" + # Inner container ports MUST be bound only to loopback so the only + # path to them is via the outer agent-server's authenticated proxy. + assert run_argv[port_arg_index] == f"127.0.0.1:{port}:8000" assert "ghcr.io/openhands/agent-server:test" in run_argv # Session key got injected via env so the inner server requires it. env_args = [run_argv[i + 1] for i, arg in enumerate(run_argv) if arg == "-e"] @@ -149,9 +152,13 @@ async def test_start_is_idempotent_per_conversation_id(monkeypatch): _docker_config(), run_command=run, sleep=lambda _s: None ) cid = uuid4() - first = await manager.start(cid) - second = await manager.start(cid) + first, first_new = await manager.start(cid) + second, second_new = await manager.start(cid) assert first is second + assert first_new is True + # Second call reused the existing container; callers depend on this + # ``False`` to skip teardown after a retried-create failure. + assert second_new is False # Only one ``docker run`` was issued. assert sum(1 for c in run.calls if c[1] == "run") == 1 diff --git a/tests/agent_server/docker_runtime/test_docker_routers.py b/tests/agent_server/docker_runtime/test_docker_routers.py index 1461810680..ab81a65ea9 100644 --- a/tests/agent_server/docker_runtime/test_docker_routers.py +++ b/tests/agent_server/docker_runtime/test_docker_routers.py @@ -44,6 +44,12 @@ async def create_conversation( ): if not _check(x_session_api_key): return {"detail": "unauthorized"}, 401 + # Magic flag used by ``test_post_retry_does_not_stop_existing_container`` + # to drive the inner-server-rejects-the-create branch deterministically. + if payload.get("_force_400"): + from fastapi import HTTPException + + raise HTTPException(status_code=400, detail="forced") return {"id": payload.get("conversation_id"), "echoed": payload} @api.delete("/conversations/{cid}") @@ -70,15 +76,25 @@ async def serve_workspace( return {"detail": "unauthorized"}, 401 return {"file": file_path, "cid": cid} - @api.get("/conversations") - async def list_conversations(x_session_api_key: str = Header(default="")): - if not _check(x_session_api_key): - return {"detail": "unauthorized"}, 401 - return {"items": [{"id": "inner-1"}], "next_page_id": None} - + # NB: more specific paths must be registered before ``/{cid}`` so + # FastAPI's first-match wins doesn't treat "search" / "count" as a cid. @api.get("/conversations/search") async def search_conversations(x_session_api_key: str = Header(default="")): - return {"items": [{"id": "inner-1"}], "next_page_id": None} + return { + "items": [{"id": "inner-1", "workspace": {"working_dir": "/workspace"}}], + "next_page_id": None, + } + + # Inner agent-server's /count returns a BARE JSON integer (not an object). + @api.get("/conversations/count") + async def inner_count(x_session_api_key: str = Header(default="")): + return 1 + + @api.get("/conversations/{cid}") + async def get_conversation(cid: str, x_session_api_key: str = Header(default="")): + if not _check(x_session_api_key): + return {"detail": "unauthorized"}, 401 + return {"id": cid, "workspace": {"working_dir": "/workspace"}} app.include_router(api) @@ -154,16 +170,24 @@ def _make(self, cid: UUID) -> RunningContainer: image="fake", ) + def preregister(self, cid: UUID) -> RunningContainer: + """Test helper: seed the registry with a pre-existing container so + the next ``manager.start(cid)`` call hits the ``is_new=False`` path. + """ + self._containers[cid] = self._make(cid) + return self._containers[cid] + def get(self, cid: UUID) -> RunningContainer | None: return self._containers.get(cid) def list(self): return list(self._containers.values()) - async def start(self, cid: UUID) -> RunningContainer: + async def start(self, cid: UUID) -> tuple[RunningContainer, bool]: if cid not in self._containers: self._containers[cid] = self._make(cid) - return self._containers[cid] + return self._containers[cid], True + return self._containers[cid], False async def stop(self, cid: UUID) -> bool: return self._containers.pop(cid, None) is not None @@ -258,24 +282,67 @@ def test_delete_proxies_then_stops_container(docker_app): assert app.state.container_manager.get(cid) is None -def test_list_aggregates_across_containers(docker_app): +def test_search_aggregates_across_containers(docker_app): + """``GET /api/conversations/search`` fans out and concatenates items. + + The wire shape must match the local ``ConversationPage`` + (``{"items": [...], "next_page_id": null}``). + """ client, _ = docker_app # Spawn two conversations client.post("/api/conversations", json={"workspace": {}, "agent": {}}) client.post("/api/conversations", json={"workspace": {}, "agent": {}}) - resp = client.get("/api/conversations") + resp = client.get("/api/conversations/search") assert resp.status_code == 200 payload = resp.json() + assert payload["next_page_id"] is None # Each inner server returns one item -> we should see two. assert len(payload["items"]) == 2 -def test_count_uses_local_registry(docker_app): +def test_batch_get_preserves_local_contract(docker_app): + """``GET /api/conversations?ids=...`` must keep the local contract: + + * ``ids`` is required (no ``ids`` -> 422), + * the response is a JSON list (NOT a page object), + * missing ids slot in as ``null``. + """ client, _ = docker_app - assert client.get("/api/conversations/count").json() == {"count": 0} + + # No ids -> 422 (FastAPI validation), matching local behaviour. + no_ids = client.get("/api/conversations") + assert no_ids.status_code == 422 + + # Spawn one conversation; look up alongside a fake id. + created = client.post("/api/conversations", json={"workspace": {}, "agent": {}}) + cid = UUID(created.json()["echoed"]["conversation_id"]) + missing = uuid4() + + resp = client.get(f"/api/conversations?ids={cid}&ids={missing}") + assert resp.status_code == 200 + body = resp.json() + assert isinstance(body, list) + assert len(body) == 2 + assert body[0] is not None and body[0]["id"] == str(cid) + assert body[1] is None + + +def test_count_returns_bare_integer(docker_app): + """``GET /api/conversations/count`` must return a bare integer + (matching the local-mode wire contract), not ``{"count": N}``. + """ + client, _ = docker_app + + # Empty registry: sum across zero containers is 0. + zero = client.get("/api/conversations/count") + assert zero.status_code == 200 + assert zero.json() == 0 # bare int, not {"count": 0} + client.post("/api/conversations", json={"workspace": {}, "agent": {}}) - assert client.get("/api/conversations/count").json() == {"count": 1} + one = client.get("/api/conversations/count") + assert one.json() == 1 + assert one.headers["content-type"].startswith("application/json") def test_websocket_bridges_to_inner_server(docker_app): @@ -294,11 +361,20 @@ def test_websocket_bridges_to_inner_server(docker_app): def test_websocket_closes_when_conversation_unknown(docker_app): + """When no container exists for the requested conversation, the bridge + must close the (already-accepted) socket. Auth-on-accept happens FIRST, + so the close fires after the handshake, not instead of it. + """ + from starlette.websockets import WebSocketDisconnect + client, _ = docker_app cid = uuid4() - with pytest.raises(Exception): - with client.websocket_connect(f"/sockets/events/{cid}"): - pass + with client.websocket_connect(f"/sockets/events/{cid}") as ws: + # Server accepts then immediately closes with 1008. Reading any + # frame raises ``WebSocketDisconnect``. + with pytest.raises(WebSocketDisconnect) as exc_info: + ws.receive_text() + assert exc_info.value.code == 1008 def test_local_mode_routes_are_unchanged(): @@ -311,3 +387,186 @@ def test_local_mode_routes_are_unchanged(): # appear in local mode. assert "/api/conversations" in paths assert "/api/conversations/{conversation_id}/{tail:path}" not in paths + + +# --------------------------------------------------------------------------- +# Authentication: WebSocket bridge MUST enforce the outer server's session +# keys before opening a connection to the inner container. This was a +# critical review finding. +# --------------------------------------------------------------------------- + + +@pytest.fixture +def docker_app_with_auth(): + """Docker-mode app with ``session_api_keys`` configured, for auth tests. + + Like ``docker_app`` we deliberately skip entering the lifespan (no + ``with TestClient(app) as ...``): the agent-server lifespan starts a + tmux/vscode/desktop bundle we don't want to drag into these tests. + """ + session_key = "inner-secret" + outer_key = "outer-secret" + with _run_inner_app(session_key) as port: + app = create_app( + Config( + conversation_runtime="docker", + session_api_keys=[outer_key], + ) + ) + app.state.container_manager = _StubContainerManager(port, session_key) + client = TestClient(app) + try: + yield client, app, outer_key + finally: + client.close() + + +def test_websocket_rejects_wrong_session_key(docker_app_with_auth): + """With ``session_api_keys`` set, a WS upgrade carrying a wrong key in + the query string must be rejected BEFORE the connection is accepted. + + The auth helper's "key provided but invalid" branch rejects pre-accept, + so TestClient surfaces this as an exception on ``websocket_connect``. + + Regression guard for review finding R3311480598. + """ + client, app, _outer_key = docker_app_with_auth + cid = uuid4() + app.state.container_manager.preregister(cid) + + with pytest.raises(Exception): + with client.websocket_connect( + f"/sockets/events/{cid}?session_api_key=wrong", + ): + pass + + +def test_websocket_rejects_missing_first_message_auth(docker_app_with_auth): + """With ``session_api_keys`` set and no key supplied at upgrade time, + the auth helper falls through to first-message-auth: it accepts the + socket, then waits for ``{"type": "auth", ...}``. A client that + instead disconnects or sends a non-auth frame must be cut off with + a 4001 close BEFORE the bridge to the inner server is created. + + Regression guard for review finding R3311480598. + """ + from starlette.websockets import WebSocketDisconnect + + client, app, _outer_key = docker_app_with_auth + cid = uuid4() + app.state.container_manager.preregister(cid) + + # Connect with no key; the server accepts (so first-message-auth can + # read a frame) but will close 4001 once we send a non-auth frame. + with client.websocket_connect(f"/sockets/events/{cid}") as ws: + ws.send_text("not an auth frame") + with pytest.raises(WebSocketDisconnect) as exc_info: + ws.receive_text() + assert exc_info.value.code == 4001 + + +def test_websocket_accepts_with_valid_outer_key(docker_app_with_auth): + """A WS with the correct outer session key must bridge to the inner server.""" + client, app, outer_key = docker_app_with_auth + cid = uuid4() + app.state.container_manager.preregister(cid) + + with client.websocket_connect( + f"/sockets/events/{cid}?session_api_key={outer_key}", + ) as ws: + # Inner app's echo loop is reached, so we end up with a standard + # hello + echo round-trip. + assert ws.receive_text() == f"hello {cid}" + ws.send_text("ping") + assert ws.receive_text() == "echo:ping" + + +# --------------------------------------------------------------------------- +# Idempotent POST: a retried-create against an existing conversation must +# NOT tear down the live container if the inner server returns 4xx. +# --------------------------------------------------------------------------- + + +def test_post_retry_does_not_stop_existing_container_on_inner_4xx(docker_app): + """If ``manager.start()`` returns an existing container (``is_new=False``) + and the inner server then returns a 4xx, we MUST leave the container + running. Regression guard for review finding R3311480570. + """ + client, app = docker_app + + # Seed a container so the next POST hits the "already running" branch. + cid = uuid4() + app.state.container_manager.preregister(cid) + assert app.state.container_manager.get(cid) is not None + + # Force the inner server to reject the create with 400. Because the + # container already existed (``is_new=False``), the outer route must + # NOT tear it down. + resp = client.post( + "/api/conversations", + json={ + "conversation_id": str(cid), + "workspace": {}, + "agent": {}, + "_force_400": True, + }, + ) + assert resp.status_code == 400 + # The live container survived the failed retry. + assert app.state.container_manager.get(cid) is not None + + +def test_post_first_create_tears_down_on_inner_4xx(docker_app): + """When ``manager.start()`` actually spawned a fresh container + (``is_new=True``) and the inner server then rejects the create, we + DO tear it down — otherwise an orphan container would leak. + """ + client, app = docker_app + + cid = uuid4() + assert app.state.container_manager.get(cid) is None + + resp = client.post( + "/api/conversations", + json={ + "conversation_id": str(cid), + "workspace": {}, + "agent": {}, + "_force_400": True, + }, + ) + assert resp.status_code == 400 + # Fresh container that the inner server rejected got cleaned up. + assert app.state.container_manager.get(cid) is None + + +# --------------------------------------------------------------------------- +# Workspace static-file proxy is registered under the cookie-auth group +# so iframe/ embeds can authenticate via the workspace cookie. +# --------------------------------------------------------------------------- + + +def test_workspace_router_registered_under_cookie_auth_in_docker_mode(): + """In docker mode the workspace path must be routed via the + workspace-cookie auth group, not via the header-only catch-all. + + Regression guard for review finding R3311480555. + """ + app = create_app(Config(conversation_runtime="docker")) + + workspace_path = "/api/conversations/{conversation_id}/workspace/{file_path:path}" + catchall_path = "/api/conversations/{conversation_id}/{tail:path}" + + workspace_route_index = next( + i + for i, route in enumerate(app.routes) + if getattr(route, "path", None) == workspace_path + ) + catchall_route_index = next( + i + for i, route in enumerate(app.routes) + if getattr(route, "path", None) == catchall_path + ) + # The more specific workspace route MUST be registered before the + # catch-all so starlette's first-match wins picks it. + assert workspace_route_index < catchall_route_index From 151cd5256f33d2a191db11573a54520450d32668 Mon Sep 17 00:00:00 2001 From: openhands Date: Wed, 27 May 2026 19:16:37 +0000 Subject: [PATCH 3/3] refactor(agent-server,docker_runtime): reuse DockerWorkspace; drop fan-out for shared-disk read-only metadata MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Drop the 339-line ContainerManager and its bespoke docker run wrapping; replace with DockerConversationRegistry (190 lines) that's a thin shell around DockerWorkspace. Image pulls, GPU, network, port allocation, log streaming, healthchecks, lifecycle cleanup are all delegated. * Add bind_host to DockerWorkspace so the outer can publish the inner agent-server on 127.0.0.1 only (defense-in-depth: only the outer reaches the inner; other hosts on the network can't bypass outer auth). * Replace the docker fan-out across containers (batch_get / count / search) with shared-disk metadata reads. Outer's ConversationService runs in a new read_only_metadata mode: skips lease acquisition and EventService startup; get / search / count / batch_get re-read meta.json / base_state.json off disk on every call so sub-container writes show up immediately. * Bind-mount layout: per-cid conversations/{cid_hex} is the only conversation dir each sub-container can see (the outer sees all of them and reads on-disk metadata). Settings/secrets dir is shared via OH_PERSISTENCE_DIR so cipher keys match. * Global per-host routers (bash/git/file/vscode/desktop/hooks/mcp/ skills/tools/llm) are reverse-proxied via a required ?cid=… query parameter — registered one route per prefix so the catch-all doesn't shadow /api/conversations, /api/settings, etc. * Auth: outer and inner share OH_SESSION_API_KEYS_0 via conversation_container_forward_env. The proxy synthesizes the X-Session-API-Key header from the shared workspace key when the inbound request authenticated via the workspace-session cookie (so iframe/ embeds still reach the inner static file server). * Drop the fan-out tests; add tests for the read-only mode + ?cid= routing. Net diff: -132 LoC across the package while adding new behaviour-level tests. Co-authored-by: openhands --- .../openhands/agent_server/api.py | 63 +- .../openhands/agent_server/config.py | 17 +- .../agent_server/conversation_service.py | 187 +++++- .../agent_server/docker_runtime/__init__.py | 43 +- .../docker_runtime/container_manager.py | 339 ----------- .../agent_server/docker_runtime/proxy.py | 80 ++- .../agent_server/docker_runtime/registry.py | 205 +++++++ .../agent_server/docker_runtime/routers.py | 552 ++++++++---------- openhands-agent-server/pyproject.toml | 1 + .../openhands/workspace/docker/workspace.py | 33 +- .../docker_runtime/test_container_manager.py | 253 -------- .../docker_runtime/test_docker_routers.py | 394 ++++++------- .../agent_server/test_conversation_service.py | 131 +++++ 13 files changed, 1083 insertions(+), 1215 deletions(-) delete mode 100644 openhands-agent-server/openhands/agent_server/docker_runtime/container_manager.py create mode 100644 openhands-agent-server/openhands/agent_server/docker_runtime/registry.py delete mode 100644 tests/agent_server/docker_runtime/test_container_manager.py diff --git a/openhands-agent-server/openhands/agent_server/api.py b/openhands-agent-server/openhands/agent_server/api.py index 1bc995df72..c56647a38e 100644 --- a/openhands-agent-server/openhands/agent_server/api.py +++ b/openhands-agent-server/openhands/agent_server/api.py @@ -34,11 +34,12 @@ ) from openhands.agent_server.desktop_router import desktop_router from openhands.agent_server.desktop_service import get_desktop_service -from openhands.agent_server.docker_runtime.container_manager import ContainerManager +from openhands.agent_server.docker_runtime import DockerConversationRegistry from openhands.agent_server.docker_runtime.routers import ( - docker_conversation_router, + docker_conversation_proxy_router, + docker_global_proxy_router, docker_sockets_router, - docker_workspace_router, + docker_workspace_proxy_router, ) from openhands.agent_server.event_router import event_router from openhands.agent_server.file_router import file_router @@ -209,15 +210,18 @@ async def start_tool_preload_service(): config.bash_events_retention_seconds, ) - # Docker runtime: install the per-conversation container manager. - # The proxy routers each construct their own short-lived - # ``httpx.AsyncClient`` per request, so there is no shared client - # to plumb in here. The in-process conversation_service stays - # live as well — in docker mode it just isn't routed to. - container_manager: ContainerManager | None = None + # Docker runtime: per-conversation container registry, backed by + # ``DockerWorkspace``. The proxy routers each construct their own + # short-lived ``httpx.AsyncClient`` per request, so there is no + # shared HTTP client to plumb in here. The in-process + # ``ConversationService`` stays live but runs in + # ``read_only_metadata`` mode — it answers list/count/search/get + # by reading the shared persistence directory off disk while the + # sub-containers own the actual conversations. + docker_registry: DockerConversationRegistry | None = None if config.conversation_runtime == "docker": - container_manager = ContainerManager(config) - api.state.container_manager = container_manager + docker_registry = DockerConversationRegistry(config) + api.state.docker_registry = docker_registry logger.info( "Docker conversation runtime enabled (image=%s)", config.conversation_image, @@ -226,8 +230,8 @@ async def start_tool_preload_service(): try: yield finally: - if container_manager is not None: - await container_manager.shutdown() + if docker_registry is not None: + await docker_registry.shutdown() if retention_task is not None: retention_task.cancel() with suppress(asyncio.CancelledError): @@ -322,11 +326,30 @@ def _add_api_routes(app: FastAPI, config: Config) -> None: api_router = APIRouter(prefix="/api", dependencies=dependencies) if config.conversation_runtime == "docker": - # Docker mode: conversation/event/workspace traffic is reverse-proxied - # to a per-conversation container. The non-conversation routers - # (settings, profiles, workspaces, auth, ...) still run in-process on - # the outer server. - api_router.include_router(docker_conversation_router) + # Docker mode: per-conversation mutations and the global per-host + # routers reverse-proxy to a per-conversation container. Metadata + # (list / count / search / get) and the cross-conversation routers + # (settings, profiles, workspaces, auth, …) still run in-process + # on the outer server, reading the shared on-disk persistence dir. + # + # Order matters: + # 1. ``docker_global_proxy_router`` catches ``/api/{bash,git, + # file,vscode,desktop,hooks,mcp,skills,tools,llm}/...`` and + # forwards them based on the required ``?cid=`` query param. + # For any other path it raises 404 so the local routers get a + # chance to match. + # 2. ``docker_conversation_proxy_router`` claims the mutation + # verbs on ``/api/conversations/...``. It intentionally does + # NOT register ``GET`` on the metadata paths so the local + # ``conversation_router``'s GETs match. + # 3. The unchanged ``conversation_router`` / ``event_router`` + # provide GET metadata and serve the workspace static-file + # tree for sub-container conversations via the same routes. + api_router.include_router(docker_global_proxy_router) + api_router.include_router(docker_conversation_proxy_router) + api_router.include_router(event_router) + api_router.include_router(conversation_router) + api_router.include_router(conversation_router_acp) else: api_router.include_router(event_router) api_router.include_router(conversation_router) @@ -340,7 +363,7 @@ def _add_api_routes(app: FastAPI, config: Config) -> None: api_router.include_router(skills_router) api_router.include_router(hooks_router) api_router.include_router(mcp_router) - api_router.include_router(llm_router) + api_router.include_router(llm_router) api_router.include_router(settings_router) api_router.include_router(workspaces_router) api_router.include_router(profiles_router) @@ -363,7 +386,7 @@ def _add_api_routes(app: FastAPI, config: Config) -> None: if config.conversation_runtime == "docker": # Proxy workspace static files via the per-conversation container, # but under the workspace-cookie auth group so iframe/img embeds work. - workspace_api_router.include_router(docker_workspace_router) + workspace_api_router.include_router(docker_workspace_proxy_router) else: workspace_api_router.include_router(workspace_router) diff --git a/openhands-agent-server/openhands/agent_server/config.py b/openhands-agent-server/openhands/agent_server/config.py index 5a43ba998f..d06a63b393 100644 --- a/openhands-agent-server/openhands/agent_server/config.py +++ b/openhands-agent-server/openhands/agent_server/config.py @@ -249,10 +249,23 @@ class Config(BaseModel): ), ) conversation_container_forward_env: list[str] = Field( - default_factory=lambda: ["DEBUG"], + default_factory=lambda: [ + "DEBUG", + "OH_SECRET_KEY", + "OH_SESSION_API_KEYS_0", + ], description=( "Environment variable names to forward from this server's " - "environment into every per-conversation container." + "environment into every per-conversation container.\n\n" + "Defaults explained:\n" + "* ``OH_SECRET_KEY`` — outer server and sub-containers share " + "the same persisted settings/secrets directory and must derive " + "the same cipher key, otherwise encrypted values won't " + "round-trip.\n" + "* ``OH_SESSION_API_KEYS_0`` — the outer's reverse-proxy " + "forwards the client's ``X-Session-API-Key`` header verbatim, " + "so the inner must accept the same key. (When the outer has " + "no session-key requirement at all, omitting this is fine.)" ), ) conversation_container_platform: str = Field( diff --git a/openhands-agent-server/openhands/agent_server/conversation_service.py b/openhands-agent-server/openhands/agent_server/conversation_service.py index ecd3d1d891..2f60a7b6a6 100644 --- a/openhands-agent-server/openhands/agent_server/conversation_service.py +++ b/openhands-agent-server/openhands/agent_server/conversation_service.py @@ -323,6 +323,16 @@ class ConversationService: cipher: Cipher | None = None owner_instance_id: str = field(default_factory=lambda: uuid4().hex) max_concurrent_runs: int = 10 + # When True, ``__aenter__`` skips starting per-conversation EventServices + # and never acquires conversation leases. Metadata-only methods + # (``get_conversation``, ``search_conversations``, ``count_conversations``, + # ``batch_get_conversations``) read ``(meta.json, base_state.json)`` + # snapshots directly off disk on every call. Mutation methods raise. + # + # Used by docker-runtime mode: the outer agent-server reads the shared + # persistence directory (bind-mounted into every sub-container) for + # listings while sub-containers own the per-conversation work. + read_only_metadata: bool = False _event_services: dict[UUID, EventService] | None = field(default=None, init=False) _conversation_webhook_subscribers: list["ConversationWebhookSubscriber"] = field( default_factory=list, init=False @@ -330,7 +340,105 @@ class ConversationService: _lease_renewal_task: asyncio.Task | None = field(default=None, init=False) _run_executor: ThreadPoolExecutor | None = field(default=None, init=False) + def _load_disk_snapshot( + self, conversation_id: UUID + ) -> tuple[StoredConversation, ConversationState] | None: + """Read ``(meta.json, base_state.json)`` for one conversation off disk. + + Used by :data:`read_only_metadata` mode (docker-runtime). Returns + ``None`` if the conversation directory or its ``meta.json`` is + missing. ``base_state.json`` not existing is treated as "conversation + was created but hasn't been started yet" — we return a minimal + in-memory state seeded from the stored ``agent`` / ``workspace`` so + a listing still includes the conversation. + """ + from openhands.sdk.conversation.persistence_const import BASE_STATE + + conv_dir = self.conversations_dir / conversation_id.hex + meta_file = conv_dir / "meta.json" + if not meta_file.exists(): + return None + context = {"cipher": self.cipher} if self.cipher else None + try: + stored = StoredConversation.model_validate_json( + meta_file.read_text(), context=context + ) + except Exception: + logger.exception( + "Failed to load meta.json for conversation %s", conversation_id + ) + return None + + base_state_file = conv_dir / BASE_STATE + if base_state_file.exists(): + try: + state = ConversationState.model_validate_json( + base_state_file.read_text(), context=context + ) + except Exception: + logger.exception( + "Failed to load base_state.json for conversation %s; " + "falling back to a synthetic snapshot", + conversation_id, + ) + state = self._synthesize_state(stored) + else: + state = self._synthesize_state(stored) + return stored, state + + def _synthesize_state(self, stored: StoredConversation) -> ConversationState: + """Build a minimal :class:`ConversationState` from a + :class:`StoredConversation` when ``base_state.json`` is missing. + + This happens briefly between ``meta.json`` being written and the + first ``base_state.json`` snapshot — for example, immediately after + the outer server's POST proxy returns to the client. Producing a + thin state here keeps the listing endpoints consistent with the + local-mode contract (a freshly-created conversation always shows + up in ``search`` / ``count``). + """ + return ConversationState( + id=stored.id, + agent=stored.agent, + workspace=stored.workspace, + persistence_dir=str(self.conversations_dir), + ) + + def _iter_disk_snapshots( + self, + ) -> list[tuple[UUID, StoredConversation, ConversationState]]: + """Walk ``conversations_dir`` and load every conversation snapshot. + + Re-reads the directory on every call so that conversations created + by sub-containers (in docker mode) show up immediately without any + outer-side cache invalidation. + """ + results: list[tuple[UUID, StoredConversation, ConversationState]] = [] + if not self.conversations_dir.exists(): + return results + for conv_dir in self.conversations_dir.iterdir(): + if not conv_dir.is_dir(): + continue + try: + cid = UUID(conv_dir.name) + except ValueError: + continue + snapshot = self._load_disk_snapshot(cid) + if snapshot is None: + continue + stored, state = snapshot + results.append((cid, stored, state)) + return results + async def get_conversation(self, conversation_id: UUID) -> ConversationInfo | None: + if self.read_only_metadata: + snapshot = await asyncio.to_thread( + self._load_disk_snapshot, conversation_id + ) + if snapshot is None: + return None + stored, state = snapshot + return _compose_conversation_info(stored, state) if self._event_services is None: raise ValueError("inactive_service") event_service = self._event_services.get(conversation_id) @@ -342,13 +450,9 @@ async def get_conversation(self, conversation_id: UUID) -> ConversationInfo | No async def get_acp_conversation( self, conversation_id: UUID ) -> ConversationInfo | None: - if self._event_services is None: - raise ValueError("inactive_service") - event_service = self._event_services.get(conversation_id) - if event_service is None: - return None - state = await event_service.get_state() - return _compose_conversation_info(event_service.stored, state) + # Same contract as ``get_conversation`` — historically there was a + # separate ACP variant, but both branches resolve to the same data. + return await self.get_conversation(conversation_id) async def search_conversations( self, @@ -393,22 +497,35 @@ async def _search_conversations( execution_status: ConversationExecutionStatus | None, sort_order: ConversationSortOrder, ) -> tuple[list[ConversationInfo], str | None]: - if self._event_services is None: - raise ValueError("inactive_service") - # Collect all conversations with their info - all_conversations = [] - for id, event_service in self._event_services.items(): - state = await event_service.get_state() - conversation_info = _compose_conversation_info(event_service.stored, state) - # Apply status filter if provided - if ( - execution_status is not None - and conversation_info.execution_status != execution_status - ): - continue + all_conversations: list[tuple[UUID, ConversationInfo]] = [] + if self.read_only_metadata: + snapshots = await asyncio.to_thread(self._iter_disk_snapshots) + pairs: list[tuple[UUID, StoredConversation, ConversationState]] = snapshots + for cid, stored, state in pairs: + conversation_info = _compose_conversation_info(stored, state) + if ( + execution_status is not None + and conversation_info.execution_status != execution_status + ): + continue + all_conversations.append((cid, conversation_info)) + else: + if self._event_services is None: + raise ValueError("inactive_service") + for id, event_service in self._event_services.items(): + state = await event_service.get_state() + conversation_info = _compose_conversation_info( + event_service.stored, state + ) + # Apply status filter if provided + if ( + execution_status is not None + and conversation_info.execution_status != execution_status + ): + continue - all_conversations.append((id, conversation_info)) + all_conversations.append((id, conversation_info)) # Sort conversations based on sort_order if sort_order == ConversationSortOrder.CREATED_AT: @@ -454,6 +571,16 @@ async def _count_conversations( execution_status: ConversationExecutionStatus | None, ) -> int: """Count conversations matching the given filters.""" + if self.read_only_metadata: + snapshots = await asyncio.to_thread(self._iter_disk_snapshots) + if execution_status is None: + return len(snapshots) + return sum( + 1 + for _, _, state in snapshots + if state.execution_status == execution_status + ) + if self._event_services is None: raise ValueError("inactive_service") @@ -887,6 +1014,20 @@ async def __aenter__(self): thread_name_prefix="conversation-run", ) self._event_services = {} + # In read-only metadata mode the outer doesn't own conversations: + # sub-containers do, via their own bind-mounted persistence dir. + # Skip the eager EventService startup (no lease acquisition, no + # LocalConversation) — metadata methods read snapshots off disk + # on demand instead. + if self.read_only_metadata: + self._conversation_webhook_subscribers = [ + ConversationWebhookSubscriber( + spec=webhook_spec, + session_api_key=self.session_api_key, + ) + for webhook_spec in self.webhook_specs + ] + return self for conversation_dir in self.conversations_dir.iterdir(): stored: StoredConversation | None = None try: @@ -1014,6 +1155,10 @@ def get_instance(cls, config: Config) -> "ConversationService": ), cipher=config.cipher, max_concurrent_runs=config.max_concurrent_runs, + # Outer agent-server reads disk for metadata in docker mode; + # the actual conversations are owned by sub-containers, each + # of which sees only its own subdirectory. + read_only_metadata=(config.conversation_runtime == "docker"), ) async def _start_event_service(self, stored: StoredConversation) -> EventService: diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/__init__.py b/openhands-agent-server/openhands/agent_server/docker_runtime/__init__.py index f938053c8d..6718b113ca 100644 --- a/openhands-agent-server/openhands/agent_server/docker_runtime/__init__.py +++ b/openhands-agent-server/openhands/agent_server/docker_runtime/__init__.py @@ -1,26 +1,39 @@ """Docker-runtime mode for the agent-server. When ``Config.conversation_runtime == "docker"`` the outer agent-server stops -running conversations in-process and instead spawns a Docker container per -conversation. Each container hosts its own agent-server (configured in -``local`` mode), and this outer server acts as a thin reverse proxy in front -of those containers. +running conversations in-process and instead spawns one Docker container +per conversation. Each container hosts its own (``local`` mode) agent-server +that actually runs the agent loop, and this outer server: + +* reverse-proxies every conversation-scoped HTTP / WebSocket request to the + matching sub-container, and +* answers list / count / search / get queries directly from the shared + on-disk persistence directory (no fan-out across containers needed). + +The outer and the sub-containers share the same ``conversations_path`` and +``.openhands`` (settings/secrets/workspaces) directories via bind-mounts. +The outer NEVER acquires a conversation lease — sub-containers own the +work, the outer only reads metadata and proxies mutations. Submodules: -* :mod:`.container_manager` — spawns / tracks / stops per-conversation - containers. Wraps ``docker run`` via subprocess. -* :mod:`.proxy` — low-level HTTP and WebSocket forwarding helpers that - stream bytes between the outer server and the appropriate container. -* :mod:`.routers` — FastAPI routers that replace the in-process - ``conversation_router``/``event_router``/``workspace_router``/``sockets_router`` - when docker mode is active. +* :mod:`.registry` — per-conversation :class:`DockerWorkspace` registry. +* :mod:`.proxy` — low-level HTTP and WebSocket forwarding helpers. +* :mod:`.routers` — FastAPI routes that intercept conversation-mutation + paths in docker mode (POST create, per-cid catch-all proxy, WS bridge). """ -from openhands.agent_server.docker_runtime.container_manager import ( - ContainerManager, - RunningContainer, +from openhands.agent_server.docker_runtime.registry import ( + DockerConversationRegistry, + container_conv_dir, + container_persist_dir, + host_conv_subdir, ) -__all__ = ["ContainerManager", "RunningContainer"] +__all__ = [ + "DockerConversationRegistry", + "container_conv_dir", + "container_persist_dir", + "host_conv_subdir", +] diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/container_manager.py b/openhands-agent-server/openhands/agent_server/docker_runtime/container_manager.py deleted file mode 100644 index 265c3e7f23..0000000000 --- a/openhands-agent-server/openhands/agent_server/docker_runtime/container_manager.py +++ /dev/null @@ -1,339 +0,0 @@ -"""Spawn and track per-conversation Docker containers. - -This is a deliberately small module: it owns the lifecycle of the inner -agent-server containers (one per conversation), the in-memory map from -``conversation_id`` to the container's local URL + session key, and the -``docker`` CLI calls needed to make that work. Everything else (HTTP -proxying, WebSocket bridging, request validation) lives in sibling modules. - -The shape of ``docker run`` invoked here mirrors what -:class:`openhands.workspace.docker.workspace.DockerWorkspace` does in the SDK, -just adapted to live inside the agent-server. -""" - -from __future__ import annotations - -import asyncio -import random -import secrets -import socket -import subprocess -import time -import uuid -from dataclasses import dataclass, field -from urllib.request import urlopen -from uuid import UUID - -from openhands.agent_server.config import Config -from openhands.sdk.logger import get_logger - - -logger = get_logger(__name__) - -# Port range to allocate host ports from for inner-container forwards. -# Mirrors DockerWorkspace's range so the two implementations don't fight. -_PORT_MIN = 30000 -_PORT_MAX = 39999 -_PORT_MAX_ATTEMPTS = 50 - - -def _check_port_available(port: int) -> bool: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - sock.bind(("0.0.0.0", port)) - return True - except OSError: - return False - finally: - sock.close() - - -def _find_available_tcp_port() -> int: - rng = random.SystemRandom() - ports = list(range(_PORT_MIN, _PORT_MAX + 1)) - rng.shuffle(ports) - for port in ports[:_PORT_MAX_ATTEMPTS]: - if _check_port_available(port): - return port - raise RuntimeError( - f"No available TCP port found in [{_PORT_MIN},{_PORT_MAX}] after " - f"{_PORT_MAX_ATTEMPTS} attempts" - ) - - -@dataclass -class RunningContainer: - """Bookkeeping for one running per-conversation agent-server container. - - Attributes: - conversation_id: The conversation that owns this container. - container_id: Docker container id (long form) returned by ``docker run``. - host_port: Host port the container's ``:8000`` is mapped to. - session_api_key: The session API key the inner agent-server was - configured with. The outer server injects this on every proxied - request so the inner container cannot be reached without going - through us (assuming the host port is not exposed externally). - """ - - conversation_id: UUID - container_id: str - host_port: int - session_api_key: str - image: str = "" - # Locking primitive so concurrent requests for the same conversation don't - # race during shutdown. - lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False) - - @property - def base_url(self) -> str: - return f"http://127.0.0.1:{self.host_port}" - - -class DockerUnavailableError(RuntimeError): - """Raised when ``docker`` is not reachable from this process.""" - - -class ContainerStartupError(RuntimeError): - """Raised when a freshly-started container fails to become healthy.""" - - -class ContainerManager: - """Tracks per-conversation Docker containers. - - The manager is in-memory only — restarting the outer agent-server forgets - every running container. That's intentional for a first cut: durable - container <-> conversation tracking is a follow-up concern. - """ - - def __init__( - self, - config: Config, - *, - run_command=subprocess.run, # injectable for tests - sleep=time.sleep, - ) -> None: - self._config = config - self._containers: dict[UUID, RunningContainer] = {} - self._lock = asyncio.Lock() - # Injected so tests can stub out ``docker`` invocations cleanly. - self._run_command = run_command - self._sleep = sleep - - # -- public API -------------------------------------------------------- - - @property - def config(self) -> Config: - return self._config - - def list(self) -> list[RunningContainer]: - return list(self._containers.values()) - - def get(self, conversation_id: UUID) -> RunningContainer | None: - return self._containers.get(conversation_id) - - async def start(self, conversation_id: UUID) -> tuple[RunningContainer, bool]: - """Spawn a fresh container for ``conversation_id``. - - Idempotent: if a container is already registered for this conversation - the existing :class:`RunningContainer` is returned and no new - container is started. - - Returns: - ``(running, is_new)``: ``is_new`` is ``True`` when this call - actually spawned a new container, and ``False`` when an existing - registered container was returned. Callers that want to clean - up after a failed startup MUST gate the teardown on ``is_new`` - so retried requests don't tear down a live conversation. - """ - async with self._lock: - existing = self._containers.get(conversation_id) - if existing is not None: - return existing, False - - running = await asyncio.to_thread(self._start_blocking, conversation_id) - self._containers[conversation_id] = running - return running, True - - async def stop(self, conversation_id: UUID) -> bool: - """Stop and forget the container for ``conversation_id``. - - Returns ``True`` if a container was running, ``False`` otherwise. - """ - async with self._lock: - running = self._containers.pop(conversation_id, None) - if running is None: - return False - await asyncio.to_thread(self._stop_blocking, running) - return True - - async def shutdown(self) -> None: - """Stop every tracked container. Best-effort; logs errors and - continues so a single broken container doesn't block the rest.""" - async with self._lock: - containers = list(self._containers.values()) - self._containers.clear() - for running in containers: - try: - await asyncio.to_thread(self._stop_blocking, running) - except Exception: - logger.exception( - "Failed to stop container %s during shutdown", - running.container_id, - ) - - # -- internals --------------------------------------------------------- - - def _start_blocking(self, conversation_id: UUID) -> RunningContainer: - self._ensure_docker_available() - - host_port = _find_available_tcp_port() - session_api_key = secrets.token_urlsafe(32) - container_name = f"oh-conv-{conversation_id.hex}-{uuid.uuid4().hex[:8]}" - image = self._config.conversation_image - - flags: list[str] = [] - for env_name in self._config.conversation_container_forward_env: - value = self._env_for_forward(env_name) - if value is not None: - flags += ["-e", f"{env_name}={value}"] - # Always tell the inner agent-server to require this key. We inject - # it on every proxied request from the outer server. - flags += ["-e", f"OH_SESSION_API_KEYS_0={session_api_key}"] - - for volume in self._config.conversation_container_volumes: - flags += ["-v", volume] - - if self._config.conversation_container_network: - flags += ["--network", self._config.conversation_container_network] - - run_cmd = [ - "docker", - "run", - "-d", - "--platform", - self._config.conversation_container_platform, - "--rm", - "--ulimit", - "nofile=65536:65536", - "--name", - container_name, - "-p", - # Bind only to loopback. The outer agent-server reaches the - # inner one via 127.0.0.1; exposing it on all host interfaces - # would bypass the outer auth and turn every per-conversation - # container into a publicly addressable agent-server. - f"127.0.0.1:{host_port}:8000", - *flags, - image, - "--host", - "0.0.0.0", - "--port", - "8000", - ] - logger.info( - "Starting conversation container for %s on host port %d", - conversation_id, - host_port, - ) - proc = self._run_command(run_cmd, capture_output=True, text=True, check=False) - if proc.returncode != 0: - raise ContainerStartupError( - f"docker run failed: {proc.stderr.strip() or proc.stdout.strip()}" - ) - container_id = (proc.stdout or "").strip() - if not container_id: - raise ContainerStartupError("docker run returned no container id") - - running = RunningContainer( - conversation_id=conversation_id, - container_id=container_id, - host_port=host_port, - session_api_key=session_api_key, - image=image, - ) - - try: - self._wait_for_health( - running, timeout=self._config.conversation_container_startup_timeout - ) - except Exception: - # Don't leave a stuck container behind. - self._stop_blocking(running) - raise - - logger.info( - "Conversation container ready: id=%s port=%d cid=%s", - container_id[:12], - host_port, - conversation_id, - ) - return running - - def _stop_blocking(self, running: RunningContainer) -> None: - logger.info("Stopping conversation container %s", running.container_id[:12]) - self._run_command( - ["docker", "stop", running.container_id], - capture_output=True, - text=True, - check=False, - ) - - def _ensure_docker_available(self) -> None: - proc = self._run_command( - ["docker", "version"], capture_output=True, text=True, check=False - ) - if proc.returncode != 0: - raise DockerUnavailableError( - "Docker is not available; cannot start conversation containers" - ) - - def _wait_for_health(self, running: RunningContainer, *, timeout: float) -> None: - deadline = time.monotonic() + timeout - health_url = f"{running.base_url}/health" - while time.monotonic() < deadline: - try: - with urlopen(health_url, timeout=1.0) as resp: - if 200 <= getattr(resp, "status", 200) < 300: - return - except Exception: - pass - # Bail out early if the container has already died: avoids - # ticking down the entire timeout when ``docker run`` accepted - # the command but the process inside exited immediately. - inspect = self._run_command( - [ - "docker", - "inspect", - "-f", - "{{.State.Running}}", - running.container_id, - ], - capture_output=True, - text=True, - check=False, - ) - if (inspect.stdout or "").strip() != "true": - logs = self._run_command( - ["docker", "logs", running.container_id], - capture_output=True, - text=True, - check=False, - ) - raise ContainerStartupError( - "Container stopped during startup. Logs:\n" - f"{(logs.stdout or '')}\n{(logs.stderr or '')}" - ) - self._sleep(1) - raise ContainerStartupError( - f"Container {running.container_id[:12]} did not become healthy " - f"within {timeout}s" - ) - - def _env_for_forward(self, name: str) -> str | None: - """Look up an env var value to forward into a container. - - Pulled out so tests can override without monkeypatching ``os.environ``. - """ - import os - - return os.environ.get(name) diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/proxy.py b/openhands-agent-server/openhands/agent_server/docker_runtime/proxy.py index 33c5d95015..c15ed20920 100644 --- a/openhands-agent-server/openhands/agent_server/docker_runtime/proxy.py +++ b/openhands-agent-server/openhands/agent_server/docker_runtime/proxy.py @@ -2,8 +2,13 @@ Both helpers are deliberately dumb: they stream bytes between the outer agent-server and an inner per-conversation container, without inspecting -request bodies or response shapes. The auth header for the inner container -is injected here (see ``X-Session-API-Key``) so callers don't have to know. +request bodies or response shapes. + +The inner agent-server is reached on ``127.0.0.1:`` (loopback +only — see :attr:`DockerWorkspace.bind_host`) and currently doesn't +require any auth header: defense-in-depth is provided by the loopback +binding, and the OUTER server has already enforced its session-API-key +checks (or other auth) before these helpers run. These helpers are *only* used by routes in :mod:`openhands.agent_server.docker_runtime.routers`; nothing outside the @@ -22,8 +27,8 @@ from starlette.responses import StreamingResponse from starlette.websockets import WebSocket, WebSocketDisconnect -from openhands.agent_server.docker_runtime.container_manager import RunningContainer from openhands.sdk.logger import get_logger +from openhands.workspace.docker.workspace import DockerWorkspace logger = get_logger(__name__) @@ -57,7 +62,7 @@ def _filter_headers(headers) -> dict[str, str]: async def proxy_http( request: Request, - running: RunningContainer, + workspace: DockerWorkspace, *, upstream_path: str, timeout: float | None = None, @@ -66,7 +71,7 @@ async def proxy_http( Args: request: Incoming Starlette request on the outer agent-server. - running: Bookkeeping for the target container. + workspace: The :class:`DockerWorkspace` for the target container. upstream_path: Path (including any query string) on the inner agent-server to forward to. Typically the same path the outer server received, since the inner agent-server exposes the same @@ -74,25 +79,22 @@ async def proxy_http( timeout: Per-request timeout in seconds. ``None`` (the default) means no read timeout — conversation event streams can be long-lived. - Returns: - A :class:`starlette.responses.StreamingResponse` that streams the - inner container's response body back to the original caller. - Notes: A fresh :class:`httpx.AsyncClient` is created per request. We avoid a long-lived pool because the outer server can serve many concurrent conversations and each one talks to a different upstream port — and because making the client per-request keeps the lifespan/teardown - story trivial. If profiling later shows per-request client setup is a - bottleneck we can revisit. + story trivial. """ - url = running.base_url + upstream_path + url = workspace.host + upstream_path headers = _filter_headers(request.headers) - # Inject the per-container session API key so the inner server accepts - # us. We deliberately replace any X-Session-API-Key the *client* sent — - # the outer server has already validated the user's key by the time - # this helper runs (via FastAPI's session_api_key dependency). - headers["X-Session-API-Key"] = running.session_api_key + # If the client authenticated via the workspace-session cookie (used by + # iframe / img embeds that can't attach custom headers), there's no + # ``X-Session-API-Key`` on the inbound request — but the inner + # agent-server only knows about the header. Synthesize one from the + # workspace's stored key so the inner accepts the proxied request. + if workspace.api_key and "x-session-api-key" not in {k.lower() for k in headers}: + headers["X-Session-API-Key"] = workspace.api_key async def _request_body() -> AsyncIterator[bytes]: async for chunk in request.stream(): @@ -114,9 +116,7 @@ async def _request_body() -> AsyncIterator[bytes]: upstream = await client.send(req, stream=True) except (httpx.ConnectError, httpx.ReadError) as exc: await client.aclose() - logger.warning( - "Upstream connection error to %s: %s", running.container_id[:12], exc - ) + logger.warning("Upstream connection error to %s: %s", workspace.host, exc) raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Conversation container unreachable: {exc}", @@ -140,49 +140,49 @@ async def _response_body() -> AsyncIterator[bytes]: async def bridge_websocket( client_ws: WebSocket, - running: RunningContainer, + workspace: DockerWorkspace, *, upstream_path: str, ) -> None: """Bridge a WebSocket session between the browser and an inner container. - The bridge speaks both text and binary frames. Auth for the inner server - is injected via the ``X-Session-API-Key`` header on the connect handshake - (the agent-server's WebSocket auth also accepts that header — see - :mod:`openhands.agent_server.sockets`). - Precondition: ``client_ws`` MUST already be accepted by the caller. The bridge does not call ``accept()`` itself because the outer server's - WebSocket-auth helper (which accepts on success) needs to run first. - Calling ``accept()`` a second time would raise. + WebSocket-auth helper accepts on success and calling ``accept()`` a + second time would raise. Closure semantics: when either side closes (or errors), we close the - other side and return. We do not attempt to reconnect. + other side and return. No reconnect. """ upstream_url = ( - running.base_url.replace("http://", "ws://").replace("https://", "wss://") + workspace.host.replace("http://", "ws://").replace("https://", "wss://") + upstream_path ) - extra_headers = {"X-Session-API-Key": running.session_api_key} + # The local sockets router accepts auth via header / query param / first + # message. By the time we get here the outer has already accepted the + # socket — but the inner is a separate server that requires its own + # auth. Mint the inner-side ``X-Session-API-Key`` from the workspace's + # shared key. (If both outer and inner have no key requirement, the + # workspace.api_key is None and we send no header — that's fine.) + upstream_headers: dict[str, str] = {} + if workspace.api_key: + upstream_headers["X-Session-API-Key"] = workspace.api_key try: async with websockets.connect( - upstream_url, additional_headers=extra_headers + upstream_url, + additional_headers=upstream_headers or None, ) as upstream_ws: await _bridge_websocket_loop(client_ws, upstream_ws) except websockets.exceptions.InvalidStatus as exc: - logger.warning( - "Upstream WebSocket rejected (%s) to %s", exc, running.container_id[:12] - ) + logger.warning("Upstream WebSocket rejected (%s) to %s", exc, workspace.host) # 1011 == "internal error"; closest match for an upstream HTTP failure # since browsers can't see HTTP status codes from a failed upgrade. await client_ws.close(code=1011) except (OSError, websockets.exceptions.WebSocketException) as exc: logger.warning( - "Upstream WebSocket connect failed to %s: %s", - running.container_id[:12], - exc, + "Upstream WebSocket connect failed to %s: %s", workspace.host, exc ) await client_ws.close(code=1011) @@ -213,7 +213,7 @@ async def _upstream_to_client() -> None: task_a = asyncio.create_task(_client_to_upstream()) task_b = asyncio.create_task(_upstream_to_client()) - done, pending = await asyncio.wait( + _, pending = await asyncio.wait( {task_a, task_b}, return_when=asyncio.FIRST_COMPLETED ) for task in pending: @@ -223,8 +223,6 @@ async def _upstream_to_client() -> None: await task except (asyncio.CancelledError, Exception): pass - # Whichever side closed first dictates the close. Ensure the other side - # also closes cleanly so neither leaks file descriptors. try: await upstream_ws.close() except Exception: diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/registry.py b/openhands-agent-server/openhands/agent_server/docker_runtime/registry.py new file mode 100644 index 0000000000..1c0249a64c --- /dev/null +++ b/openhands-agent-server/openhands/agent_server/docker_runtime/registry.py @@ -0,0 +1,205 @@ +"""Per-conversation Docker container registry. + +A thin async wrapper around :class:`DockerWorkspace`. The workspace owns +the heavy lifting (port allocation, ``docker run``, health checks, ``docker +stop`` on cleanup, optional log streaming, pause/resume); this class just +hands out one workspace per conversation id and serializes concurrent +start/stop calls. + +The outer agent-server and every sub-container share the same on-disk +persistence directories (``conversations_path`` and the sibling +``.openhands`` settings dir) via bind-mounts. Each sub-container only +sees its OWN conversation subdirectory under the shared +``conversations_path``, so leases never collide. The outer never claims +a lease — it reads metadata off disk and proxies all mutations. +""" + +from __future__ import annotations + +import asyncio +import os +from pathlib import Path +from typing import cast +from uuid import UUID + +from openhands.agent_server.config import V1_SESSION_API_KEY_ENV, Config +from openhands.agent_server.persistence.store import _get_persistence_dir +from openhands.sdk.logger import get_logger +from openhands.sdk.workspace import PlatformType +from openhands.workspace.docker.workspace import DockerWorkspace + + +logger = get_logger(__name__) + + +# Canonical path inside every sub-container. Doesn't have to match the +# host-side path — the agent-server inside the container is reconfigured +# via ``OH_CONVERSATIONS_PATH`` / ``OH_PERSISTENCE_DIR`` to use these. +_CONTAINER_CONV_DIR = "/var/openhands/conversations" +_CONTAINER_PERSIST_DIR = "/var/openhands/.openhands" + + +class DockerConversationRegistry: + """Hand out one :class:`DockerWorkspace` per conversation id. + + The registry is in-memory: restarting the outer agent-server forgets + every running container. That's deliberate — sub-containers are + short-lived agent-server processes whose canonical state lives on the + shared persistence volume, so any restart can re-claim them by spawning + fresh containers against the same on-disk state. + """ + + def __init__(self, config: Config) -> None: + self._config = config + self._workspaces: dict[UUID, DockerWorkspace] = {} + self._lock = asyncio.Lock() + + @property + def config(self) -> Config: + return self._config + + def get(self, conversation_id: UUID) -> DockerWorkspace | None: + return self._workspaces.get(conversation_id) + + def items(self) -> list[tuple[UUID, DockerWorkspace]]: + return list(self._workspaces.items()) + + async def get_or_create( + self, conversation_id: UUID + ) -> tuple[DockerWorkspace, bool]: + """Idempotently spawn the container for ``conversation_id``. + + Returns ``(workspace, is_new)``. Callers that need to clean up on a + failed startup must gate the teardown on ``is_new`` so retried + requests don't tear down a live conversation. + """ + async with self._lock: + existing = self._workspaces.get(conversation_id) + if existing is not None: + return existing, False + + ws = await asyncio.to_thread(self._build_workspace, conversation_id) + self._workspaces[conversation_id] = ws + return ws, True + + async def stop(self, conversation_id: UUID) -> bool: + async with self._lock: + ws = self._workspaces.pop(conversation_id, None) + if ws is None: + return False + await asyncio.to_thread(ws.cleanup) + return True + + async def shutdown(self) -> None: + """Stop every tracked container. Best-effort: a single broken + container must not block the rest from being cleaned up.""" + async with self._lock: + wss = list(self._workspaces.values()) + self._workspaces.clear() + for ws in wss: + try: + await asyncio.to_thread(ws.cleanup) + except Exception: + logger.exception( + "Failed to stop conversation container during shutdown" + ) + + # -- internals --------------------------------------------------------- + + def _build_workspace(self, conversation_id: UUID) -> DockerWorkspace: + """Construct the :class:`DockerWorkspace` for one conversation. + + Blocking: spawns the container and waits for the inner + agent-server's ``/health`` to come up. Must be called from a worker + thread. + """ + cfg = self._config + host_conv_dir = cfg.conversations_path.resolve() + host_persist_dir = _get_persistence_dir(cfg).resolve() + + # Per-cid bind-mount: the sub-container can only see ITS OWN + # conversation subdirectory under the shared dir. The outer server + # sees every subdirectory and reads metadata off disk for listings. + host_cid_dir = host_conv_dir / conversation_id.hex + host_cid_dir.mkdir(parents=True, exist_ok=True) + container_cid_dir = f"{_CONTAINER_CONV_DIR}/{conversation_id.hex}" + + volumes = list(cfg.conversation_container_volumes) + [ + f"{host_cid_dir}:{container_cid_dir}", + f"{host_persist_dir}:{_CONTAINER_PERSIST_DIR}", + ] + + # Point the inner agent-server at the canonical in-container paths. + # Mirrors how the outer server resolves them via ``Config`` / + # ``_get_persistence_dir(config)``. + extra_env = { + "OH_CONVERSATIONS_PATH": _CONTAINER_CONV_DIR, + "OH_PERSISTENCE_DIR": _CONTAINER_PERSIST_DIR, + } + + logger.info( + "Spawning conversation container: cid=%s image=%s", + conversation_id, + cfg.conversation_image, + ) + # The reverse-proxy needs an ``X-Session-API-Key`` to authenticate + # with the inner agent-server. Outer and inner share that key by + # default via ``conversation_container_forward_env`` (see + # :attr:`Config.conversation_container_forward_env`), so read it + # straight out of the outer's env. None means "no auth required", + # which matches the inner's behavior when the env is unset. + shared_session_key = os.environ.get(V1_SESSION_API_KEY_ENV) + + ws = DockerWorkspace( + server_image=cfg.conversation_image, + api_key=shared_session_key, + # The agent's tool workspace ("where bash/file ops execute") + # is separate from the conversation persistence directory we + # bind-mount above. Leave it at the container default. + working_dir="/workspace", + # Loopback-only: the outer reaches the inner over 127.0.0.1. + # Any other binding would let other hosts on the network talk + # to the inner agent-server, bypassing outer auth. + bind_host="127.0.0.1", + # ``Config.conversation_container_platform`` is a plain ``str`` so + # users can plug in any platform Docker accepts; DockerWorkspace + # narrows that to a ``Literal``. Trust the user's choice here. + platform=cast(PlatformType, cfg.conversation_container_platform), + health_check_timeout=cfg.conversation_container_startup_timeout, + volumes=volumes, + network=cfg.conversation_container_network, + forward_env=list(cfg.conversation_container_forward_env), + extra_env=extra_env, + # The outer doesn't manage image lifecycle; whoever produced + # the image is responsible for its retention. + cleanup_image=False, + # Each container hosts only one conversation; no need to expose + # the auxiliary VSCode/VNC ports outwards. The catch-all proxy + # forwards conversation-scoped HTTP via ``ws.host`` which is + # plenty. + extra_ports=False, + detach_logs=False, + ) + logger.info( + "Conversation container ready: cid=%s host=%s", + conversation_id, + ws.host, + ) + return ws + + +def container_persist_dir() -> str: + """Canonical settings/secrets path inside every sub-container.""" + return _CONTAINER_PERSIST_DIR + + +def container_conv_dir() -> str: + """Canonical conversations-root path inside every sub-container.""" + return _CONTAINER_CONV_DIR + + +def host_conv_subdir(config: Config, conversation_id: UUID) -> Path: + """Return the host-side per-conversation directory under + ``config.conversations_path``. Used by tests and by the outer's + read-only metadata loader.""" + return config.conversations_path.resolve() / conversation_id.hex diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/routers.py b/openhands-agent-server/openhands/agent_server/docker_runtime/routers.py index 8bb89e26be..97d465fa01 100644 --- a/openhands-agent-server/openhands/agent_server/docker_runtime/routers.py +++ b/openhands-agent-server/openhands/agent_server/docker_runtime/routers.py @@ -1,25 +1,35 @@ -"""FastAPI routers used when ``Config.conversation_runtime == "docker"``. - -These replace ``conversation_router``, ``event_router``, ``workspace_router`` -and the conversation half of ``sockets_router`` from the local-mode app. -Settings, profiles, workspaces, auth, the cloud proxy, the static frontend -and ``/server_info`` all continue to be served by the outer server unchanged -— they're not conversation-scoped. - -The flow on each request is: - -1. Extract the ``conversation_id`` from the path (or, for ``POST - /api/conversations``, generate one and remember it). -2. Look up — or, on creation, spawn — the matching Docker container in the - :class:`ContainerManager`. -3. Forward the request body and headers via - :func:`openhands.agent_server.docker_runtime.proxy.proxy_http` (or, for - WebSockets, :func:`bridge_websocket`). +"""FastAPI routes that intercept conversation-mutation traffic in +``Config.conversation_runtime == "docker"`` mode. + +What this module provides (the *new* surface): + +* ``docker_conversation_proxy_router`` — spawns a per-conversation + container on ``POST /api/conversations`` and forwards every per-cid + HTTP route to that container. Mounted BEFORE + :data:`openhands.agent_server.conversation_router.conversation_router` + so the proxy claims the mutation methods first; the unchanged + ``conversation_router`` keeps serving ``GET`` metadata routes from the + shared on-disk persistence dir (the outer's + :class:`ConversationService` runs in :attr:`read_only_metadata` mode). +* ``docker_sockets_router`` — authenticates WebSocket clients against + the outer's session keys, then bridges to the inner container. +* ``docker_global_proxy_router`` — reverse-proxies the *global* + (non-conversation-scoped) routers (bash, file, git, vscode, desktop, + hooks, mcp, skills, tool, llm) to a chosen sub-container. Each request + must include a ``?cid=…`` query param identifying which conversation's + container to talk to. + +What's intentionally NOT here: + +* No batch-get / count / search routes — those are served by the + unchanged ``conversation_router`` against the shared filesystem. +* No workspace static router — the catch-all + ``/{cid}/{tail:path}`` proxy covers ``/{cid}/workspace/...`` already, + and the outer mounts a thin cookie-auth wrapper that delegates here. """ from __future__ import annotations -import asyncio import json from typing import Annotated from uuid import UUID, uuid4 @@ -35,81 +45,78 @@ ) from starlette.responses import JSONResponse, Response, StreamingResponse -from openhands.agent_server.docker_runtime.container_manager import ( - ContainerManager, - ContainerStartupError, - DockerUnavailableError, - RunningContainer, -) from openhands.agent_server.docker_runtime.proxy import ( bridge_websocket, proxy_http, ) +from openhands.agent_server.docker_runtime.registry import ( + DockerConversationRegistry, +) from openhands.sdk.logger import get_logger +from openhands.workspace.docker.workspace import DockerWorkspace logger = get_logger(__name__) -def get_container_manager(request: Request) -> ContainerManager: - manager = getattr(request.app.state, "container_manager", None) - if manager is None: +def get_registry(request: Request) -> DockerConversationRegistry: + registry = getattr(request.app.state, "docker_registry", None) + if registry is None: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail="Container manager is not available", + detail="Docker conversation registry is not available", + ) + return registry + + +def _ws_get_registry(websocket: WebSocket) -> DockerConversationRegistry | None: + return getattr(websocket.app.state, "docker_registry", None) + + +def _workspace_or_404( + registry: DockerConversationRegistry, conversation_id: UUID +) -> DockerWorkspace: + ws = registry.get(conversation_id) + if ws is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Conversation not found: {conversation_id}", ) - return manager + return ws def _build_upstream_path(request: Request, path: str) -> str: """Reconstruct the inner-container path from the outer request. The inner agent-server exposes the same API surface, so we forward the - same path verbatim. Only difference: the outer path is rooted at - ``/api/conversations/...`` and so is the inner one, so we just pass it - through. + same path verbatim and append the original query string. """ query = request.url.query return f"{path}?{query}" if query else path -def _container_or_404( - manager: ContainerManager, conversation_id: UUID -) -> RunningContainer: - running = manager.get(conversation_id) - if running is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Conversation not found: {conversation_id}", - ) - return running - - # --------------------------------------------------------------------------- -# HTTP: /api/conversations +# HTTP: /api/conversations (mutation half) # --------------------------------------------------------------------------- -docker_conversation_router = APIRouter( +docker_conversation_proxy_router = APIRouter( prefix="/conversations", tags=["Docker Conversations"] ) -@docker_conversation_router.post("") +@docker_conversation_proxy_router.post("") async def docker_start_conversation( request: Request, include_skills: Annotated[bool, Query()] = False, ) -> JSONResponse: - """Spawn a fresh per-conversation container, then forward the request. + """Spawn a fresh per-conversation container, then forward the create. - The container is registered against the *resolved* conversation id (either - the one the client supplied or a fresh UUID4 minted here). The body is - rewritten to: - - * pin ``conversation_id`` so the inner agent-server agrees on the id, - * rewrite ``workspace.working_dir`` to ``/workspace`` — the inner - container's filesystem is the canonical one, not the outer host's. + The container is registered against the *resolved* conversation id + (either the one the client supplied or a fresh UUID4 minted here). + The body is rewritten to pin ``conversation_id`` so the inner + agent-server agrees on the id. """ - manager = get_container_manager(request) + registry = get_registry(request) try: body_bytes = await request.body() @@ -128,25 +135,14 @@ async def docker_start_conversation( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid conversation_id: {raw_cid!r}", ) from exc - body["conversation_id"] = str(conversation_id) - # Inside the container, the working dir is always /workspace. Whatever - # the caller passed in points to a host path we can't reach from the - # outer server's vantage point. - workspace = body.get("workspace") or {} - workspace["working_dir"] = "/workspace" - body["workspace"] = workspace - try: - running, is_new = await manager.start(conversation_id) - except DockerUnavailableError as exc: - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc) - ) from exc - except ContainerStartupError as exc: + workspace, is_new = await registry.get_or_create(conversation_id) + except Exception as exc: raise HTTPException( - status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc) + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"Failed to start conversation container: {exc}", ) from exc upstream_path = ( @@ -154,14 +150,21 @@ async def docker_start_conversation( ) headers = { "content-type": request.headers.get("content-type", "application/json"), - "X-Session-API-Key": running.session_api_key, "accept": request.headers.get("accept", "application/json"), } + # Forward auth: prefer whatever the client sent (header takes precedence), + # fall back to the workspace's stored key (set from the outer's shared + # ``OH_SESSION_API_KEYS_0``). The inner agent-server only ever knows + # about the header, never the cookie. + inbound_key = request.headers.get("x-session-api-key") + proxied_key = inbound_key or workspace.api_key + if proxied_key: + headers["X-Session-API-Key"] = proxied_key try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post( - running.base_url + upstream_path, + workspace.host + upstream_path, headers=headers, content=json.dumps(body).encode("utf-8"), ) @@ -171,12 +174,10 @@ async def docker_start_conversation( # just created — otherwise a retry against an existing # conversation would kill the live one. logger.warning( - "Initial request to fresh container %s failed: %s", - running.container_id[:12], - exc, + "Initial request to fresh container %s failed: %s", workspace.host, exc ) if is_new: - await manager.stop(conversation_id) + await registry.stop(conversation_id) raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Conversation container could not accept the request: {exc}", @@ -184,11 +185,8 @@ async def docker_start_conversation( if response.status_code >= 400 and is_new: # The inner server rejected the create. Don't leave the container - # behind in that case — it'd be orphaned, since no client will know - # to send DELETE. But only if we were the ones who started it: - # a retried create against an existing conversation must not tear - # down the live conversation. - await manager.stop(conversation_id) + # behind in that case. + await registry.stop(conversation_id) return JSONResponse( content=response.json() if response.content else None, @@ -196,14 +194,14 @@ async def docker_start_conversation( ) -@docker_conversation_router.delete("/{conversation_id}") +@docker_conversation_proxy_router.delete("/{conversation_id}") async def docker_delete_conversation( conversation_id: UUID, request: Request, -) -> JSONResponse: - manager = get_container_manager(request) - running = manager.get(conversation_id) - if running is None: +) -> Response: + registry = get_registry(request) + workspace = registry.get(conversation_id) + if workspace is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Conversation not found: {conversation_id}", @@ -214,260 +212,110 @@ async def docker_delete_conversation( # delete failed. delete_status = 200 delete_body: bytes = b"" + delete_headers: dict[str, str] = {} + inbound_key = request.headers.get("x-session-api-key") + proxied_key = inbound_key or workspace.api_key + if proxied_key: + delete_headers["X-Session-API-Key"] = proxied_key try: async with httpx.AsyncClient(timeout=30.0) as client: upstream = await client.delete( - f"{running.base_url}/api/conversations/{conversation_id}", - headers={"X-Session-API-Key": running.session_api_key}, + f"{workspace.host}/api/conversations/{conversation_id}", + headers=delete_headers, ) delete_status = upstream.status_code delete_body = upstream.content except httpx.HTTPError as exc: logger.warning("Inner DELETE failed for %s: %s", conversation_id, exc) finally: - await manager.stop(conversation_id) + await registry.stop(conversation_id) - return JSONResponse( - content=json.loads(delete_body) if delete_body else None, + return Response( + content=delete_body, status_code=delete_status, + media_type="application/json", ) -@docker_conversation_router.get("/search") -async def docker_search_conversations(request: Request) -> JSONResponse: - """Fan-out listing endpoint — preserves the local - :class:`ConversationPage` wire shape (``{"items": [...], "next_page_id": - null}``). Each inner agent-server has at most one conversation, so we - just concatenate the inner ``items`` lists. ``page_id`` / ``limit`` / - ``sort_order`` are not honored across containers in this first cut. - """ - return await _fanout_search(request) - - -@docker_conversation_router.get("/count") -async def docker_count_conversations(request: Request) -> Response: - """Fan-out count — preserves the local contract of returning a bare - JSON integer (not ``{"count": N}``). Honors the ``?status=`` filter by - forwarding it to each container, so containers whose conversation does - not match contribute 0. - """ - return await _fanout_count(request) - - -@docker_conversation_router.get("") -async def docker_batch_get_conversations( - request: Request, - ids: Annotated[list[UUID], Query()], - include_skills: Annotated[bool, Query()] = False, -) -> JSONResponse: - """Batch-get conversations by id — preserves the local - ``GET /api/conversations?ids=...`` contract (returns - ``list[ConversationInfo | None]`` with ``None`` for missing ids). - - Each id is looked up in the container registry; matched ids are - fetched from their respective container, mismatched ids slot in as - ``None``. ``ids`` is required (same as local mode). - """ - if len(ids) >= 100: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Too many ids requested (limit 100).", - ) - - manager = get_container_manager(request) - - async def _fetch_one(cid: UUID): - running = manager.get(cid) - if running is None: - return None - suffix = "?include_skills=true" if include_skills else "" - url = f"{running.base_url}/api/conversations/{cid}{suffix}" - try: - async with httpx.AsyncClient(timeout=15.0) as client: - resp = await client.get( - url, headers={"X-Session-API-Key": running.session_api_key} - ) - except httpx.HTTPError as exc: - logger.warning("Batch-get failed for %s: %s", cid, exc) - return None - if resp.status_code == 404: - return None - if resp.status_code != 200: - return None - try: - return resp.json() - except json.JSONDecodeError: - return None - - results = await asyncio.gather(*[_fetch_one(cid) for cid in ids]) - return JSONResponse(content=list(results), status_code=200) - - -@docker_conversation_router.api_route( +@docker_conversation_proxy_router.api_route( "/{conversation_id}", - methods=["GET", "PATCH"], + methods=["PATCH"], ) -async def docker_proxy_conversation_root( +async def docker_proxy_conversation_root_mutation( conversation_id: UUID, request: Request ) -> StreamingResponse: - manager = get_container_manager(request) - running = _container_or_404(manager, conversation_id) + """Proxy mutating verbs on ``/api/conversations/{cid}``. + + ``GET`` is intentionally NOT included — the outer's + ``conversation_router`` handles it locally by reading the shared + persistence dir. + """ + registry = get_registry(request) + workspace = _workspace_or_404(registry, conversation_id) return await proxy_http( request, - running, + workspace, upstream_path=_build_upstream_path( request, f"/api/conversations/{conversation_id}" ), ) -@docker_conversation_router.api_route( +@docker_conversation_proxy_router.api_route( "/{conversation_id}/{tail:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", "HEAD"], ) async def docker_proxy_conversation_subpath( conversation_id: UUID, tail: str, request: Request ) -> StreamingResponse: - """Catch-all that proxies every conversation-scoped HTTP route. + """Catch-all that proxies every conversation-scoped sub-route. Covers ``/run``, ``/pause``, ``/interrupt``, ``/secrets``, ``/confirmation_policy``, ``/switch_profile``, ``/switch_llm``, ``/condense``, ``/fork``, ``/agent_final_response``, all of - ``/events/...`` (from ``event_router``), and all of ``/workspace/...`` - (from ``workspace_router``). + ``/events/...``, and all of ``/workspace/...`` (static file + server). """ - manager = get_container_manager(request) - running = _container_or_404(manager, conversation_id) + registry = get_registry(request) + workspace = _workspace_or_404(registry, conversation_id) upstream_path = _build_upstream_path( request, f"/api/conversations/{conversation_id}/{tail}" ) - return await proxy_http(request, running, upstream_path=upstream_path) + return await proxy_http(request, workspace, upstream_path=upstream_path) # --------------------------------------------------------------------------- -# HTTP fan-out helpers (search / count) +# Workspace static files — same path as the local ``workspace_router``, +# but served under the workspace-cookie auth group so that