diff --git a/openhands-agent-server/openhands/agent_server/api.py b/openhands-agent-server/openhands/agent_server/api.py index 2040ae84c0..c56647a38e 100644 --- a/openhands-agent-server/openhands/agent_server/api.py +++ b/openhands-agent-server/openhands/agent_server/api.py @@ -34,6 +34,13 @@ ) from openhands.agent_server.desktop_router import desktop_router from openhands.agent_server.desktop_service import get_desktop_service +from openhands.agent_server.docker_runtime import DockerConversationRegistry +from openhands.agent_server.docker_runtime.routers import ( + docker_conversation_proxy_router, + docker_global_proxy_router, + docker_sockets_router, + docker_workspace_proxy_router, +) from openhands.agent_server.event_router import event_router from openhands.agent_server.file_router import file_router from openhands.agent_server.git_router import git_router @@ -203,9 +210,28 @@ async def start_tool_preload_service(): config.bash_events_retention_seconds, ) + # Docker runtime: per-conversation container registry, backed by + # ``DockerWorkspace``. The proxy routers each construct their own + # short-lived ``httpx.AsyncClient`` per request, so there is no + # shared HTTP client to plumb in here. The in-process + # ``ConversationService`` stays live but runs in + # ``read_only_metadata`` mode — it answers list/count/search/get + # by reading the shared persistence directory off disk while the + # sub-containers own the actual conversations. + docker_registry: DockerConversationRegistry | None = None + if config.conversation_runtime == "docker": + docker_registry = DockerConversationRegistry(config) + api.state.docker_registry = docker_registry + logger.info( + "Docker conversation runtime enabled (image=%s)", + config.conversation_image, + ) + try: yield finally: + if docker_registry is not None: + await docker_registry.shutdown() if retention_task is not None: retention_task.cancel() with suppress(asyncio.CancelledError): @@ -298,19 +324,46 @@ def _add_api_routes(app: FastAPI, config: Config) -> None: dependencies.append(Depends(create_session_api_key_dependency(config))) api_router = APIRouter(prefix="/api", dependencies=dependencies) - api_router.include_router(event_router) - api_router.include_router(conversation_router) - api_router.include_router(conversation_router_acp) - api_router.include_router(tool_router) - api_router.include_router(bash_router) - api_router.include_router(git_router) - api_router.include_router(file_router) - api_router.include_router(vscode_router) - api_router.include_router(desktop_router) - api_router.include_router(skills_router) - api_router.include_router(hooks_router) - api_router.include_router(llm_router) - api_router.include_router(mcp_router) + + if config.conversation_runtime == "docker": + # Docker mode: per-conversation mutations and the global per-host + # routers reverse-proxy to a per-conversation container. Metadata + # (list / count / search / get) and the cross-conversation routers + # (settings, profiles, workspaces, auth, …) still run in-process + # on the outer server, reading the shared on-disk persistence dir. + # + # Order matters: + # 1. ``docker_global_proxy_router`` catches ``/api/{bash,git, + # file,vscode,desktop,hooks,mcp,skills,tools,llm}/...`` and + # forwards them based on the required ``?cid=`` query param. + # For any other path it raises 404 so the local routers get a + # chance to match. + # 2. ``docker_conversation_proxy_router`` claims the mutation + # verbs on ``/api/conversations/...``. It intentionally does + # NOT register ``GET`` on the metadata paths so the local + # ``conversation_router``'s GETs match. + # 3. The unchanged ``conversation_router`` / ``event_router`` + # provide GET metadata and serve the workspace static-file + # tree for sub-container conversations via the same routes. + api_router.include_router(docker_global_proxy_router) + api_router.include_router(docker_conversation_proxy_router) + api_router.include_router(event_router) + api_router.include_router(conversation_router) + api_router.include_router(conversation_router_acp) + else: + api_router.include_router(event_router) + api_router.include_router(conversation_router) + api_router.include_router(conversation_router_acp) + api_router.include_router(tool_router) + api_router.include_router(bash_router) + api_router.include_router(git_router) + api_router.include_router(file_router) + api_router.include_router(vscode_router) + api_router.include_router(desktop_router) + api_router.include_router(skills_router) + api_router.include_router(hooks_router) + api_router.include_router(mcp_router) + api_router.include_router(llm_router) api_router.include_router(settings_router) api_router.include_router(workspaces_router) api_router.include_router(profiles_router) @@ -318,7 +371,6 @@ def _add_api_routes(app: FastAPI, config: Config) -> None: # /api/auth/* mints workspace cookies and requires the header to bootstrap, # so it lives under the header-only auth group. api_router.include_router(auth_router) - app.include_router(api_router) # Workspace static-file routes get their own auth group that accepts # EITHER the X-Session-API-Key header OR the workspace session cookie. @@ -331,10 +383,26 @@ def _add_api_routes(app: FastAPI, config: Config) -> None: Depends(create_workspace_session_dependency(config)) ) workspace_api_router = APIRouter(prefix="/api", dependencies=workspace_dependencies) - workspace_api_router.include_router(workspace_router) + if config.conversation_runtime == "docker": + # Proxy workspace static files via the per-conversation container, + # but under the workspace-cookie auth group so iframe/img embeds work. + workspace_api_router.include_router(docker_workspace_proxy_router) + else: + workspace_api_router.include_router(workspace_router) + + # Order matters: the workspace router is more specific + # (``/conversations/{cid}/workspace/...``) than the docker catch-all + # (``/conversations/{cid}/{tail:path}``). Starlette matches in + # registration order, so we MUST include the cookie-auth workspace + # router before the header-auth api_router; otherwise the catch-all + # would shadow workspace requests and demand the header. app.include_router(workspace_api_router) + app.include_router(api_router) - app.include_router(sockets_router) + if config.conversation_runtime == "docker": + app.include_router(docker_sockets_router) + else: + app.include_router(sockets_router) def _setup_static_files(app: FastAPI, config: Config) -> None: diff --git a/openhands-agent-server/openhands/agent_server/config.py b/openhands-agent-server/openhands/agent_server/config.py index b76a18f460..d06a63b393 100644 --- a/openhands-agent-server/openhands/agent_server/config.py +++ b/openhands-agent-server/openhands/agent_server/config.py @@ -1,7 +1,7 @@ import logging import os from pathlib import Path -from typing import ClassVar +from typing import ClassVar, Literal from pydantic import BaseModel, ConfigDict, Field, SecretStr @@ -212,6 +212,75 @@ class Config(BaseModel): "The URL where this agent server instance is available externally" ), ) + + # ---- Docker runtime mode ----------------------------------------------- + # When ``conversation_runtime == "docker"``, every conversation runs in + # its own Docker container hosting another agent-server. This outer + # agent-server then acts as a thin reverse proxy in front of the per- + # conversation containers. See ``docker_runtime/`` for the implementation. + conversation_runtime: Literal["local", "docker"] = Field( + default="local", + description=( + "How to host conversations. ``local`` runs each conversation " + "in-process on this server (the default; current behavior). " + "``docker`` spawns a fresh Docker container running an " + "agent-server image per conversation and proxies " + "conversation-scoped HTTP and WebSocket traffic to it." + ), + ) + conversation_image: str = Field( + default="ghcr.io/openhands/agent-server:latest-python", + description=( + "Container image used to host each conversation when " + "``conversation_runtime == 'docker'``. Ignored otherwise." + ), + ) + conversation_container_network: str | None = Field( + default=None, + description=( + "Optional Docker network to attach per-conversation containers to." + ), + ) + conversation_container_volumes: list[str] = Field( + default_factory=list, + description=( + "Additional ``-v`` volume mounts to apply to every per-conversation " + "container (e.g. ``'/host/cache:/cache'``). Ignored in local mode." + ), + ) + conversation_container_forward_env: list[str] = Field( + default_factory=lambda: [ + "DEBUG", + "OH_SECRET_KEY", + "OH_SESSION_API_KEYS_0", + ], + description=( + "Environment variable names to forward from this server's " + "environment into every per-conversation container.\n\n" + "Defaults explained:\n" + "* ``OH_SECRET_KEY`` — outer server and sub-containers share " + "the same persisted settings/secrets directory and must derive " + "the same cipher key, otherwise encrypted values won't " + "round-trip.\n" + "* ``OH_SESSION_API_KEYS_0`` — the outer's reverse-proxy " + "forwards the client's ``X-Session-API-Key`` header verbatim, " + "so the inner must accept the same key. (When the outer has " + "no session-key requirement at all, omitting this is fine.)" + ), + ) + conversation_container_platform: str = Field( + default="linux/amd64", + description="``--platform`` flag passed to ``docker run``.", + ) + conversation_container_startup_timeout: float = Field( + default=120.0, + gt=0.0, + description=( + "Seconds to wait for a freshly-spawned conversation container " + "to pass its /health check before giving up." + ), + ) + model_config: ClassVar[ConfigDict] = {"frozen": True} @property diff --git a/openhands-agent-server/openhands/agent_server/conversation_service.py b/openhands-agent-server/openhands/agent_server/conversation_service.py index ecd3d1d891..2f60a7b6a6 100644 --- a/openhands-agent-server/openhands/agent_server/conversation_service.py +++ b/openhands-agent-server/openhands/agent_server/conversation_service.py @@ -323,6 +323,16 @@ class ConversationService: cipher: Cipher | None = None owner_instance_id: str = field(default_factory=lambda: uuid4().hex) max_concurrent_runs: int = 10 + # When True, ``__aenter__`` skips starting per-conversation EventServices + # and never acquires conversation leases. Metadata-only methods + # (``get_conversation``, ``search_conversations``, ``count_conversations``, + # ``batch_get_conversations``) read ``(meta.json, base_state.json)`` + # snapshots directly off disk on every call. Mutation methods raise. + # + # Used by docker-runtime mode: the outer agent-server reads the shared + # persistence directory (bind-mounted into every sub-container) for + # listings while sub-containers own the per-conversation work. + read_only_metadata: bool = False _event_services: dict[UUID, EventService] | None = field(default=None, init=False) _conversation_webhook_subscribers: list["ConversationWebhookSubscriber"] = field( default_factory=list, init=False @@ -330,7 +340,105 @@ class ConversationService: _lease_renewal_task: asyncio.Task | None = field(default=None, init=False) _run_executor: ThreadPoolExecutor | None = field(default=None, init=False) + def _load_disk_snapshot( + self, conversation_id: UUID + ) -> tuple[StoredConversation, ConversationState] | None: + """Read ``(meta.json, base_state.json)`` for one conversation off disk. + + Used by :data:`read_only_metadata` mode (docker-runtime). Returns + ``None`` if the conversation directory or its ``meta.json`` is + missing. ``base_state.json`` not existing is treated as "conversation + was created but hasn't been started yet" — we return a minimal + in-memory state seeded from the stored ``agent`` / ``workspace`` so + a listing still includes the conversation. + """ + from openhands.sdk.conversation.persistence_const import BASE_STATE + + conv_dir = self.conversations_dir / conversation_id.hex + meta_file = conv_dir / "meta.json" + if not meta_file.exists(): + return None + context = {"cipher": self.cipher} if self.cipher else None + try: + stored = StoredConversation.model_validate_json( + meta_file.read_text(), context=context + ) + except Exception: + logger.exception( + "Failed to load meta.json for conversation %s", conversation_id + ) + return None + + base_state_file = conv_dir / BASE_STATE + if base_state_file.exists(): + try: + state = ConversationState.model_validate_json( + base_state_file.read_text(), context=context + ) + except Exception: + logger.exception( + "Failed to load base_state.json for conversation %s; " + "falling back to a synthetic snapshot", + conversation_id, + ) + state = self._synthesize_state(stored) + else: + state = self._synthesize_state(stored) + return stored, state + + def _synthesize_state(self, stored: StoredConversation) -> ConversationState: + """Build a minimal :class:`ConversationState` from a + :class:`StoredConversation` when ``base_state.json`` is missing. + + This happens briefly between ``meta.json`` being written and the + first ``base_state.json`` snapshot — for example, immediately after + the outer server's POST proxy returns to the client. Producing a + thin state here keeps the listing endpoints consistent with the + local-mode contract (a freshly-created conversation always shows + up in ``search`` / ``count``). + """ + return ConversationState( + id=stored.id, + agent=stored.agent, + workspace=stored.workspace, + persistence_dir=str(self.conversations_dir), + ) + + def _iter_disk_snapshots( + self, + ) -> list[tuple[UUID, StoredConversation, ConversationState]]: + """Walk ``conversations_dir`` and load every conversation snapshot. + + Re-reads the directory on every call so that conversations created + by sub-containers (in docker mode) show up immediately without any + outer-side cache invalidation. + """ + results: list[tuple[UUID, StoredConversation, ConversationState]] = [] + if not self.conversations_dir.exists(): + return results + for conv_dir in self.conversations_dir.iterdir(): + if not conv_dir.is_dir(): + continue + try: + cid = UUID(conv_dir.name) + except ValueError: + continue + snapshot = self._load_disk_snapshot(cid) + if snapshot is None: + continue + stored, state = snapshot + results.append((cid, stored, state)) + return results + async def get_conversation(self, conversation_id: UUID) -> ConversationInfo | None: + if self.read_only_metadata: + snapshot = await asyncio.to_thread( + self._load_disk_snapshot, conversation_id + ) + if snapshot is None: + return None + stored, state = snapshot + return _compose_conversation_info(stored, state) if self._event_services is None: raise ValueError("inactive_service") event_service = self._event_services.get(conversation_id) @@ -342,13 +450,9 @@ async def get_conversation(self, conversation_id: UUID) -> ConversationInfo | No async def get_acp_conversation( self, conversation_id: UUID ) -> ConversationInfo | None: - if self._event_services is None: - raise ValueError("inactive_service") - event_service = self._event_services.get(conversation_id) - if event_service is None: - return None - state = await event_service.get_state() - return _compose_conversation_info(event_service.stored, state) + # Same contract as ``get_conversation`` — historically there was a + # separate ACP variant, but both branches resolve to the same data. + return await self.get_conversation(conversation_id) async def search_conversations( self, @@ -393,22 +497,35 @@ async def _search_conversations( execution_status: ConversationExecutionStatus | None, sort_order: ConversationSortOrder, ) -> tuple[list[ConversationInfo], str | None]: - if self._event_services is None: - raise ValueError("inactive_service") - # Collect all conversations with their info - all_conversations = [] - for id, event_service in self._event_services.items(): - state = await event_service.get_state() - conversation_info = _compose_conversation_info(event_service.stored, state) - # Apply status filter if provided - if ( - execution_status is not None - and conversation_info.execution_status != execution_status - ): - continue + all_conversations: list[tuple[UUID, ConversationInfo]] = [] + if self.read_only_metadata: + snapshots = await asyncio.to_thread(self._iter_disk_snapshots) + pairs: list[tuple[UUID, StoredConversation, ConversationState]] = snapshots + for cid, stored, state in pairs: + conversation_info = _compose_conversation_info(stored, state) + if ( + execution_status is not None + and conversation_info.execution_status != execution_status + ): + continue + all_conversations.append((cid, conversation_info)) + else: + if self._event_services is None: + raise ValueError("inactive_service") + for id, event_service in self._event_services.items(): + state = await event_service.get_state() + conversation_info = _compose_conversation_info( + event_service.stored, state + ) + # Apply status filter if provided + if ( + execution_status is not None + and conversation_info.execution_status != execution_status + ): + continue - all_conversations.append((id, conversation_info)) + all_conversations.append((id, conversation_info)) # Sort conversations based on sort_order if sort_order == ConversationSortOrder.CREATED_AT: @@ -454,6 +571,16 @@ async def _count_conversations( execution_status: ConversationExecutionStatus | None, ) -> int: """Count conversations matching the given filters.""" + if self.read_only_metadata: + snapshots = await asyncio.to_thread(self._iter_disk_snapshots) + if execution_status is None: + return len(snapshots) + return sum( + 1 + for _, _, state in snapshots + if state.execution_status == execution_status + ) + if self._event_services is None: raise ValueError("inactive_service") @@ -887,6 +1014,20 @@ async def __aenter__(self): thread_name_prefix="conversation-run", ) self._event_services = {} + # In read-only metadata mode the outer doesn't own conversations: + # sub-containers do, via their own bind-mounted persistence dir. + # Skip the eager EventService startup (no lease acquisition, no + # LocalConversation) — metadata methods read snapshots off disk + # on demand instead. + if self.read_only_metadata: + self._conversation_webhook_subscribers = [ + ConversationWebhookSubscriber( + spec=webhook_spec, + session_api_key=self.session_api_key, + ) + for webhook_spec in self.webhook_specs + ] + return self for conversation_dir in self.conversations_dir.iterdir(): stored: StoredConversation | None = None try: @@ -1014,6 +1155,10 @@ def get_instance(cls, config: Config) -> "ConversationService": ), cipher=config.cipher, max_concurrent_runs=config.max_concurrent_runs, + # Outer agent-server reads disk for metadata in docker mode; + # the actual conversations are owned by sub-containers, each + # of which sees only its own subdirectory. + read_only_metadata=(config.conversation_runtime == "docker"), ) async def _start_event_service(self, stored: StoredConversation) -> EventService: diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/__init__.py b/openhands-agent-server/openhands/agent_server/docker_runtime/__init__.py new file mode 100644 index 0000000000..6718b113ca --- /dev/null +++ b/openhands-agent-server/openhands/agent_server/docker_runtime/__init__.py @@ -0,0 +1,39 @@ +"""Docker-runtime mode for the agent-server. + +When ``Config.conversation_runtime == "docker"`` the outer agent-server stops +running conversations in-process and instead spawns one Docker container +per conversation. Each container hosts its own (``local`` mode) agent-server +that actually runs the agent loop, and this outer server: + +* reverse-proxies every conversation-scoped HTTP / WebSocket request to the + matching sub-container, and +* answers list / count / search / get queries directly from the shared + on-disk persistence directory (no fan-out across containers needed). + +The outer and the sub-containers share the same ``conversations_path`` and +``.openhands`` (settings/secrets/workspaces) directories via bind-mounts. +The outer NEVER acquires a conversation lease — sub-containers own the +work, the outer only reads metadata and proxies mutations. + +Submodules: + +* :mod:`.registry` — per-conversation :class:`DockerWorkspace` registry. +* :mod:`.proxy` — low-level HTTP and WebSocket forwarding helpers. +* :mod:`.routers` — FastAPI routes that intercept conversation-mutation + paths in docker mode (POST create, per-cid catch-all proxy, WS bridge). +""" + +from openhands.agent_server.docker_runtime.registry import ( + DockerConversationRegistry, + container_conv_dir, + container_persist_dir, + host_conv_subdir, +) + + +__all__ = [ + "DockerConversationRegistry", + "container_conv_dir", + "container_persist_dir", + "host_conv_subdir", +] diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/proxy.py b/openhands-agent-server/openhands/agent_server/docker_runtime/proxy.py new file mode 100644 index 0000000000..c15ed20920 --- /dev/null +++ b/openhands-agent-server/openhands/agent_server/docker_runtime/proxy.py @@ -0,0 +1,233 @@ +"""HTTP and WebSocket reverse-proxy helpers used in docker runtime mode. + +Both helpers are deliberately dumb: they stream bytes between the outer +agent-server and an inner per-conversation container, without inspecting +request bodies or response shapes. + +The inner agent-server is reached on ``127.0.0.1:`` (loopback +only — see :attr:`DockerWorkspace.bind_host`) and currently doesn't +require any auth header: defense-in-depth is provided by the loopback +binding, and the OUTER server has already enforced its session-API-key +checks (or other auth) before these helpers run. + +These helpers are *only* used by routes in +:mod:`openhands.agent_server.docker_runtime.routers`; nothing outside the +docker runtime needs to know about them. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator + +import httpx +import websockets +from fastapi import HTTPException, status +from starlette.requests import Request +from starlette.responses import StreamingResponse +from starlette.websockets import WebSocket, WebSocketDisconnect + +from openhands.sdk.logger import get_logger +from openhands.workspace.docker.workspace import DockerWorkspace + + +logger = get_logger(__name__) + +# Hop-by-hop headers (RFC 7230) — must not be forwarded by a proxy. +_HOP_BY_HOP_HEADERS = frozenset( + { + "connection", + "keep-alive", + "proxy-authenticate", + "proxy-authorization", + "te", + "trailer", + "transfer-encoding", + "upgrade", + # ``host`` and ``content-length`` are recomputed by httpx; forwarding + # the original values causes spurious 400s when bodies are re-chunked. + "host", + "content-length", + } +) + +# Stream chunk size for request/response bodies. 64 KiB is the same default +# httpx uses internally; we pin it so behavior is stable across versions. +_CHUNK_SIZE = 64 * 1024 + + +def _filter_headers(headers) -> dict[str, str]: + return {k: v for k, v in headers.items() if k.lower() not in _HOP_BY_HOP_HEADERS} + + +async def proxy_http( + request: Request, + workspace: DockerWorkspace, + *, + upstream_path: str, + timeout: float | None = None, +) -> StreamingResponse: + """Forward ``request`` to the per-conversation container. + + Args: + request: Incoming Starlette request on the outer agent-server. + workspace: The :class:`DockerWorkspace` for the target container. + upstream_path: Path (including any query string) on the inner + agent-server to forward to. Typically the same path the outer + server received, since the inner agent-server exposes the same + API surface. + timeout: Per-request timeout in seconds. ``None`` (the default) means + no read timeout — conversation event streams can be long-lived. + + Notes: + A fresh :class:`httpx.AsyncClient` is created per request. We avoid a + long-lived pool because the outer server can serve many concurrent + conversations and each one talks to a different upstream port — and + because making the client per-request keeps the lifespan/teardown + story trivial. + """ + url = workspace.host + upstream_path + headers = _filter_headers(request.headers) + # If the client authenticated via the workspace-session cookie (used by + # iframe / img embeds that can't attach custom headers), there's no + # ``X-Session-API-Key`` on the inbound request — but the inner + # agent-server only knows about the header. Synthesize one from the + # workspace's stored key so the inner accepts the proxied request. + if workspace.api_key and "x-session-api-key" not in {k.lower() for k in headers}: + headers["X-Session-API-Key"] = workspace.api_key + + async def _request_body() -> AsyncIterator[bytes]: + async for chunk in request.stream(): + if chunk: + yield chunk + + client = httpx.AsyncClient( + timeout=httpx.Timeout(connect=10.0, read=timeout, write=30.0, pool=10.0) + ) + req = client.build_request( + request.method, + url, + headers=headers, + params=None, # query string is already part of upstream_path + content=_request_body(), + ) + + try: + upstream = await client.send(req, stream=True) + except (httpx.ConnectError, httpx.ReadError) as exc: + await client.aclose() + logger.warning("Upstream connection error to %s: %s", workspace.host, exc) + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"Conversation container unreachable: {exc}", + ) from exc + + async def _response_body() -> AsyncIterator[bytes]: + try: + async for chunk in upstream.aiter_raw(chunk_size=_CHUNK_SIZE): + yield chunk + finally: + await upstream.aclose() + await client.aclose() + + return StreamingResponse( + _response_body(), + status_code=upstream.status_code, + headers=_filter_headers(upstream.headers), + media_type=upstream.headers.get("content-type"), + ) + + +async def bridge_websocket( + client_ws: WebSocket, + workspace: DockerWorkspace, + *, + upstream_path: str, +) -> None: + """Bridge a WebSocket session between the browser and an inner container. + + Precondition: ``client_ws`` MUST already be accepted by the caller. The + bridge does not call ``accept()`` itself because the outer server's + WebSocket-auth helper accepts on success and calling ``accept()`` a + second time would raise. + + Closure semantics: when either side closes (or errors), we close the + other side and return. No reconnect. + """ + upstream_url = ( + workspace.host.replace("http://", "ws://").replace("https://", "wss://") + + upstream_path + ) + + # The local sockets router accepts auth via header / query param / first + # message. By the time we get here the outer has already accepted the + # socket — but the inner is a separate server that requires its own + # auth. Mint the inner-side ``X-Session-API-Key`` from the workspace's + # shared key. (If both outer and inner have no key requirement, the + # workspace.api_key is None and we send no header — that's fine.) + upstream_headers: dict[str, str] = {} + if workspace.api_key: + upstream_headers["X-Session-API-Key"] = workspace.api_key + + try: + async with websockets.connect( + upstream_url, + additional_headers=upstream_headers or None, + ) as upstream_ws: + await _bridge_websocket_loop(client_ws, upstream_ws) + except websockets.exceptions.InvalidStatus as exc: + logger.warning("Upstream WebSocket rejected (%s) to %s", exc, workspace.host) + # 1011 == "internal error"; closest match for an upstream HTTP failure + # since browsers can't see HTTP status codes from a failed upgrade. + await client_ws.close(code=1011) + except (OSError, websockets.exceptions.WebSocketException) as exc: + logger.warning( + "Upstream WebSocket connect failed to %s: %s", workspace.host, exc + ) + await client_ws.close(code=1011) + + +async def _bridge_websocket_loop(client_ws: WebSocket, upstream_ws) -> None: + async def _client_to_upstream() -> None: + try: + while True: + message = await client_ws.receive() + if message.get("type") == "websocket.disconnect": + return + if "bytes" in message and message["bytes"] is not None: + await upstream_ws.send(message["bytes"]) + elif "text" in message and message["text"] is not None: + await upstream_ws.send(message["text"]) + except WebSocketDisconnect: + return + + async def _upstream_to_client() -> None: + try: + async for message in upstream_ws: + if isinstance(message, (bytes, bytearray)): + await client_ws.send_bytes(bytes(message)) + else: + await client_ws.send_text(message) + except websockets.exceptions.ConnectionClosed: + return + + task_a = asyncio.create_task(_client_to_upstream()) + task_b = asyncio.create_task(_upstream_to_client()) + _, pending = await asyncio.wait( + {task_a, task_b}, return_when=asyncio.FIRST_COMPLETED + ) + for task in pending: + task.cancel() + for task in pending: + try: + await task + except (asyncio.CancelledError, Exception): + pass + try: + await upstream_ws.close() + except Exception: + pass + try: + await client_ws.close() + except Exception: + pass diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/registry.py b/openhands-agent-server/openhands/agent_server/docker_runtime/registry.py new file mode 100644 index 0000000000..1c0249a64c --- /dev/null +++ b/openhands-agent-server/openhands/agent_server/docker_runtime/registry.py @@ -0,0 +1,205 @@ +"""Per-conversation Docker container registry. + +A thin async wrapper around :class:`DockerWorkspace`. The workspace owns +the heavy lifting (port allocation, ``docker run``, health checks, ``docker +stop`` on cleanup, optional log streaming, pause/resume); this class just +hands out one workspace per conversation id and serializes concurrent +start/stop calls. + +The outer agent-server and every sub-container share the same on-disk +persistence directories (``conversations_path`` and the sibling +``.openhands`` settings dir) via bind-mounts. Each sub-container only +sees its OWN conversation subdirectory under the shared +``conversations_path``, so leases never collide. The outer never claims +a lease — it reads metadata off disk and proxies all mutations. +""" + +from __future__ import annotations + +import asyncio +import os +from pathlib import Path +from typing import cast +from uuid import UUID + +from openhands.agent_server.config import V1_SESSION_API_KEY_ENV, Config +from openhands.agent_server.persistence.store import _get_persistence_dir +from openhands.sdk.logger import get_logger +from openhands.sdk.workspace import PlatformType +from openhands.workspace.docker.workspace import DockerWorkspace + + +logger = get_logger(__name__) + + +# Canonical path inside every sub-container. Doesn't have to match the +# host-side path — the agent-server inside the container is reconfigured +# via ``OH_CONVERSATIONS_PATH`` / ``OH_PERSISTENCE_DIR`` to use these. +_CONTAINER_CONV_DIR = "/var/openhands/conversations" +_CONTAINER_PERSIST_DIR = "/var/openhands/.openhands" + + +class DockerConversationRegistry: + """Hand out one :class:`DockerWorkspace` per conversation id. + + The registry is in-memory: restarting the outer agent-server forgets + every running container. That's deliberate — sub-containers are + short-lived agent-server processes whose canonical state lives on the + shared persistence volume, so any restart can re-claim them by spawning + fresh containers against the same on-disk state. + """ + + def __init__(self, config: Config) -> None: + self._config = config + self._workspaces: dict[UUID, DockerWorkspace] = {} + self._lock = asyncio.Lock() + + @property + def config(self) -> Config: + return self._config + + def get(self, conversation_id: UUID) -> DockerWorkspace | None: + return self._workspaces.get(conversation_id) + + def items(self) -> list[tuple[UUID, DockerWorkspace]]: + return list(self._workspaces.items()) + + async def get_or_create( + self, conversation_id: UUID + ) -> tuple[DockerWorkspace, bool]: + """Idempotently spawn the container for ``conversation_id``. + + Returns ``(workspace, is_new)``. Callers that need to clean up on a + failed startup must gate the teardown on ``is_new`` so retried + requests don't tear down a live conversation. + """ + async with self._lock: + existing = self._workspaces.get(conversation_id) + if existing is not None: + return existing, False + + ws = await asyncio.to_thread(self._build_workspace, conversation_id) + self._workspaces[conversation_id] = ws + return ws, True + + async def stop(self, conversation_id: UUID) -> bool: + async with self._lock: + ws = self._workspaces.pop(conversation_id, None) + if ws is None: + return False + await asyncio.to_thread(ws.cleanup) + return True + + async def shutdown(self) -> None: + """Stop every tracked container. Best-effort: a single broken + container must not block the rest from being cleaned up.""" + async with self._lock: + wss = list(self._workspaces.values()) + self._workspaces.clear() + for ws in wss: + try: + await asyncio.to_thread(ws.cleanup) + except Exception: + logger.exception( + "Failed to stop conversation container during shutdown" + ) + + # -- internals --------------------------------------------------------- + + def _build_workspace(self, conversation_id: UUID) -> DockerWorkspace: + """Construct the :class:`DockerWorkspace` for one conversation. + + Blocking: spawns the container and waits for the inner + agent-server's ``/health`` to come up. Must be called from a worker + thread. + """ + cfg = self._config + host_conv_dir = cfg.conversations_path.resolve() + host_persist_dir = _get_persistence_dir(cfg).resolve() + + # Per-cid bind-mount: the sub-container can only see ITS OWN + # conversation subdirectory under the shared dir. The outer server + # sees every subdirectory and reads metadata off disk for listings. + host_cid_dir = host_conv_dir / conversation_id.hex + host_cid_dir.mkdir(parents=True, exist_ok=True) + container_cid_dir = f"{_CONTAINER_CONV_DIR}/{conversation_id.hex}" + + volumes = list(cfg.conversation_container_volumes) + [ + f"{host_cid_dir}:{container_cid_dir}", + f"{host_persist_dir}:{_CONTAINER_PERSIST_DIR}", + ] + + # Point the inner agent-server at the canonical in-container paths. + # Mirrors how the outer server resolves them via ``Config`` / + # ``_get_persistence_dir(config)``. + extra_env = { + "OH_CONVERSATIONS_PATH": _CONTAINER_CONV_DIR, + "OH_PERSISTENCE_DIR": _CONTAINER_PERSIST_DIR, + } + + logger.info( + "Spawning conversation container: cid=%s image=%s", + conversation_id, + cfg.conversation_image, + ) + # The reverse-proxy needs an ``X-Session-API-Key`` to authenticate + # with the inner agent-server. Outer and inner share that key by + # default via ``conversation_container_forward_env`` (see + # :attr:`Config.conversation_container_forward_env`), so read it + # straight out of the outer's env. None means "no auth required", + # which matches the inner's behavior when the env is unset. + shared_session_key = os.environ.get(V1_SESSION_API_KEY_ENV) + + ws = DockerWorkspace( + server_image=cfg.conversation_image, + api_key=shared_session_key, + # The agent's tool workspace ("where bash/file ops execute") + # is separate from the conversation persistence directory we + # bind-mount above. Leave it at the container default. + working_dir="/workspace", + # Loopback-only: the outer reaches the inner over 127.0.0.1. + # Any other binding would let other hosts on the network talk + # to the inner agent-server, bypassing outer auth. + bind_host="127.0.0.1", + # ``Config.conversation_container_platform`` is a plain ``str`` so + # users can plug in any platform Docker accepts; DockerWorkspace + # narrows that to a ``Literal``. Trust the user's choice here. + platform=cast(PlatformType, cfg.conversation_container_platform), + health_check_timeout=cfg.conversation_container_startup_timeout, + volumes=volumes, + network=cfg.conversation_container_network, + forward_env=list(cfg.conversation_container_forward_env), + extra_env=extra_env, + # The outer doesn't manage image lifecycle; whoever produced + # the image is responsible for its retention. + cleanup_image=False, + # Each container hosts only one conversation; no need to expose + # the auxiliary VSCode/VNC ports outwards. The catch-all proxy + # forwards conversation-scoped HTTP via ``ws.host`` which is + # plenty. + extra_ports=False, + detach_logs=False, + ) + logger.info( + "Conversation container ready: cid=%s host=%s", + conversation_id, + ws.host, + ) + return ws + + +def container_persist_dir() -> str: + """Canonical settings/secrets path inside every sub-container.""" + return _CONTAINER_PERSIST_DIR + + +def container_conv_dir() -> str: + """Canonical conversations-root path inside every sub-container.""" + return _CONTAINER_CONV_DIR + + +def host_conv_subdir(config: Config, conversation_id: UUID) -> Path: + """Return the host-side per-conversation directory under + ``config.conversations_path``. Used by tests and by the outer's + read-only metadata loader.""" + return config.conversations_path.resolve() / conversation_id.hex diff --git a/openhands-agent-server/openhands/agent_server/docker_runtime/routers.py b/openhands-agent-server/openhands/agent_server/docker_runtime/routers.py new file mode 100644 index 0000000000..97d465fa01 --- /dev/null +++ b/openhands-agent-server/openhands/agent_server/docker_runtime/routers.py @@ -0,0 +1,482 @@ +"""FastAPI routes that intercept conversation-mutation traffic in +``Config.conversation_runtime == "docker"`` mode. + +What this module provides (the *new* surface): + +* ``docker_conversation_proxy_router`` — spawns a per-conversation + container on ``POST /api/conversations`` and forwards every per-cid + HTTP route to that container. Mounted BEFORE + :data:`openhands.agent_server.conversation_router.conversation_router` + so the proxy claims the mutation methods first; the unchanged + ``conversation_router`` keeps serving ``GET`` metadata routes from the + shared on-disk persistence dir (the outer's + :class:`ConversationService` runs in :attr:`read_only_metadata` mode). +* ``docker_sockets_router`` — authenticates WebSocket clients against + the outer's session keys, then bridges to the inner container. +* ``docker_global_proxy_router`` — reverse-proxies the *global* + (non-conversation-scoped) routers (bash, file, git, vscode, desktop, + hooks, mcp, skills, tool, llm) to a chosen sub-container. Each request + must include a ``?cid=…`` query param identifying which conversation's + container to talk to. + +What's intentionally NOT here: + +* No batch-get / count / search routes — those are served by the + unchanged ``conversation_router`` against the shared filesystem. +* No workspace static router — the catch-all + ``/{cid}/{tail:path}`` proxy covers ``/{cid}/workspace/...`` already, + and the outer mounts a thin cookie-auth wrapper that delegates here. +""" + +from __future__ import annotations + +import json +from typing import Annotated +from uuid import UUID, uuid4 + +import httpx +from fastapi import ( + APIRouter, + HTTPException, + Query, + Request, + WebSocket, + status, +) +from starlette.responses import JSONResponse, Response, StreamingResponse + +from openhands.agent_server.docker_runtime.proxy import ( + bridge_websocket, + proxy_http, +) +from openhands.agent_server.docker_runtime.registry import ( + DockerConversationRegistry, +) +from openhands.sdk.logger import get_logger +from openhands.workspace.docker.workspace import DockerWorkspace + + +logger = get_logger(__name__) + + +def get_registry(request: Request) -> DockerConversationRegistry: + registry = getattr(request.app.state, "docker_registry", None) + if registry is None: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Docker conversation registry is not available", + ) + return registry + + +def _ws_get_registry(websocket: WebSocket) -> DockerConversationRegistry | None: + return getattr(websocket.app.state, "docker_registry", None) + + +def _workspace_or_404( + registry: DockerConversationRegistry, conversation_id: UUID +) -> DockerWorkspace: + ws = registry.get(conversation_id) + if ws is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Conversation not found: {conversation_id}", + ) + return ws + + +def _build_upstream_path(request: Request, path: str) -> str: + """Reconstruct the inner-container path from the outer request. + + The inner agent-server exposes the same API surface, so we forward the + same path verbatim and append the original query string. + """ + query = request.url.query + return f"{path}?{query}" if query else path + + +# --------------------------------------------------------------------------- +# HTTP: /api/conversations (mutation half) +# --------------------------------------------------------------------------- + +docker_conversation_proxy_router = APIRouter( + prefix="/conversations", tags=["Docker Conversations"] +) + + +@docker_conversation_proxy_router.post("") +async def docker_start_conversation( + request: Request, + include_skills: Annotated[bool, Query()] = False, +) -> JSONResponse: + """Spawn a fresh per-conversation container, then forward the create. + + The container is registered against the *resolved* conversation id + (either the one the client supplied or a fresh UUID4 minted here). + The body is rewritten to pin ``conversation_id`` so the inner + agent-server agrees on the id. + """ + registry = get_registry(request) + + try: + body_bytes = await request.body() + body = json.loads(body_bytes) if body_bytes else {} + except json.JSONDecodeError as exc: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid JSON body: {exc}", + ) from exc + + raw_cid = body.get("conversation_id") + try: + conversation_id = UUID(raw_cid) if raw_cid else uuid4() + except (TypeError, ValueError) as exc: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid conversation_id: {raw_cid!r}", + ) from exc + body["conversation_id"] = str(conversation_id) + + try: + workspace, is_new = await registry.get_or_create(conversation_id) + except Exception as exc: + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"Failed to start conversation container: {exc}", + ) from exc + + upstream_path = ( + f"/api/conversations?include_skills={'true' if include_skills else 'false'}" + ) + headers = { + "content-type": request.headers.get("content-type", "application/json"), + "accept": request.headers.get("accept", "application/json"), + } + # Forward auth: prefer whatever the client sent (header takes precedence), + # fall back to the workspace's stored key (set from the outer's shared + # ``OH_SESSION_API_KEYS_0``). The inner agent-server only ever knows + # about the header, never the cookie. + inbound_key = request.headers.get("x-session-api-key") + proxied_key = inbound_key or workspace.api_key + if proxied_key: + headers["X-Session-API-Key"] = proxied_key + + try: + async with httpx.AsyncClient(timeout=60.0) as client: + response = await client.post( + workspace.host + upstream_path, + headers=headers, + content=json.dumps(body).encode("utf-8"), + ) + except httpx.HTTPError as exc: + # If we managed to start the container but the very first request + # failed, that's a startup race. Tear down only the container WE + # just created — otherwise a retry against an existing + # conversation would kill the live one. + logger.warning( + "Initial request to fresh container %s failed: %s", workspace.host, exc + ) + if is_new: + await registry.stop(conversation_id) + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"Conversation container could not accept the request: {exc}", + ) from exc + + if response.status_code >= 400 and is_new: + # The inner server rejected the create. Don't leave the container + # behind in that case. + await registry.stop(conversation_id) + + return JSONResponse( + content=response.json() if response.content else None, + status_code=response.status_code, + ) + + +@docker_conversation_proxy_router.delete("/{conversation_id}") +async def docker_delete_conversation( + conversation_id: UUID, + request: Request, +) -> Response: + registry = get_registry(request) + workspace = registry.get(conversation_id) + if workspace is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Conversation not found: {conversation_id}", + ) + + # Best-effort: ask the inner server to delete its own state first, then + # always tear the container down so we don't leak it even if the inner + # delete failed. + delete_status = 200 + delete_body: bytes = b"" + delete_headers: dict[str, str] = {} + inbound_key = request.headers.get("x-session-api-key") + proxied_key = inbound_key or workspace.api_key + if proxied_key: + delete_headers["X-Session-API-Key"] = proxied_key + try: + async with httpx.AsyncClient(timeout=30.0) as client: + upstream = await client.delete( + f"{workspace.host}/api/conversations/{conversation_id}", + headers=delete_headers, + ) + delete_status = upstream.status_code + delete_body = upstream.content + except httpx.HTTPError as exc: + logger.warning("Inner DELETE failed for %s: %s", conversation_id, exc) + finally: + await registry.stop(conversation_id) + + return Response( + content=delete_body, + status_code=delete_status, + media_type="application/json", + ) + + +@docker_conversation_proxy_router.api_route( + "/{conversation_id}", + methods=["PATCH"], +) +async def docker_proxy_conversation_root_mutation( + conversation_id: UUID, request: Request +) -> StreamingResponse: + """Proxy mutating verbs on ``/api/conversations/{cid}``. + + ``GET`` is intentionally NOT included — the outer's + ``conversation_router`` handles it locally by reading the shared + persistence dir. + """ + registry = get_registry(request) + workspace = _workspace_or_404(registry, conversation_id) + return await proxy_http( + request, + workspace, + upstream_path=_build_upstream_path( + request, f"/api/conversations/{conversation_id}" + ), + ) + + +@docker_conversation_proxy_router.api_route( + "/{conversation_id}/{tail:path}", + methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", "HEAD"], +) +async def docker_proxy_conversation_subpath( + conversation_id: UUID, tail: str, request: Request +) -> StreamingResponse: + """Catch-all that proxies every conversation-scoped sub-route. + + Covers ``/run``, ``/pause``, ``/interrupt``, ``/secrets``, + ``/confirmation_policy``, ``/switch_profile``, ``/switch_llm``, + ``/condense``, ``/fork``, ``/agent_final_response``, all of + ``/events/...``, and all of ``/workspace/...`` (static file + server). + """ + registry = get_registry(request) + workspace = _workspace_or_404(registry, conversation_id) + upstream_path = _build_upstream_path( + request, f"/api/conversations/{conversation_id}/{tail}" + ) + return await proxy_http(request, workspace, upstream_path=upstream_path) + + +# --------------------------------------------------------------------------- +# Workspace static files — same path as the local ``workspace_router``, +# but served under the workspace-cookie auth group so that