feat(agent_service): add OpenClaw per-session agent runtime#1383
Conversation
Integrate OpenClaw as an agent_service runtime that spawns one OpenClaw gateway subprocess per RL session, each bound to its own upstream LLM key, so a session's turns are attributable to a distinct per-episode key. - examples/openclaw/agent.py: OpenClawAgent (AgentRunnable) with per-session subprocess lifecycle; drives each turn via the OpenAI chat-completions endpoint using a unique per-turn session key so DataProxy-replayed history is the single source of truth (avoids double-feeding OpenClaw's own memory). - examples/openclaw/run_agent_service.py: runnable launcher with a random admin key and auto-created fileroot. - data_proxy: reset history and reward on episode/start, and clear reward after episode/end, to prevent cross-episode state leakage. - agent_service types/worker/__init__: backward-compatible lifecycle hooks (on_episode_start/end, TrainingContext) for episode-aware runtimes. - examples/openclaw/README: document the OpenClaw agent_service design and usage. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request adds support for training episodes and rewards in the Agent Service, integrating with OpenClaw by spawning per-session subprocesses. It introduces TrainingContext and lifecycle hooks to manage these subprocesses, alongside new endpoints in the DataProxy and Worker. Feedback on the implementation identifies a resource leak if subprocess spawning fails, incorrect handling of streamed tool calls where chunks are not accumulated by index, and a memory leak due to un-cleared session locks.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| async def _spawn(self, session_key: str, upstream: _Upstream) -> _SessionState: | ||
| port = _free_port() | ||
| token = secrets.token_hex(16) | ||
| config_dir = tempfile.mkdtemp(prefix="openclaw-") | ||
| config_path = os.path.join(config_dir, "openclaw.json") | ||
| state_dir = os.path.join(config_dir, "state") | ||
| os.makedirs(state_dir, exist_ok=True) | ||
| with open(config_path, "w") as fh: | ||
| json.dump(self._render_config(port, token, upstream), fh) | ||
|
|
||
| env = dict(os.environ) | ||
| env["OPENCLAW_CONFIG_PATH"] = config_path | ||
| env["OPENCLAW_STATE_DIR"] = state_dir | ||
| if self._node_extra_ca_certs: | ||
| env["NODE_EXTRA_CA_CERTS"] = self._node_extra_ca_certs | ||
| if self._tls_insecure: | ||
| env["NODE_TLS_REJECT_UNAUTHORIZED"] = "0" | ||
|
|
||
| log_file = open(os.path.join(config_dir, "gateway.log"), "w") | ||
| proc = await asyncio.create_subprocess_exec( | ||
| self._bin, | ||
| "gateway", | ||
| "--port", | ||
| str(port), | ||
| "--auth", | ||
| "token", | ||
| "--token", | ||
| token, | ||
| "--force", | ||
| "--allow-unconfigured", | ||
| env=env, | ||
| stdout=log_file, | ||
| stderr=asyncio.subprocess.STDOUT, | ||
| ) | ||
|
|
||
| client = httpx.AsyncClient( | ||
| base_url=f"http://127.0.0.1:{port}", | ||
| timeout=self._timeout, | ||
| headers={"Authorization": f"Bearer {token}"}, | ||
| ) | ||
| state = _SessionState( | ||
| port=port, | ||
| gateway_token=token, | ||
| config_dir=config_dir, | ||
| process=proc, | ||
| client=client, | ||
| log_file=log_file, | ||
| ) | ||
| try: | ||
| await self._wait_healthy(state) | ||
| except Exception: | ||
| await self._teardown_state(state) | ||
| raise | ||
|
|
||
| logger.info( | ||
| "Spawned OpenClaw subprocess (session=%s, port=%d, pid=%s, model=%s)", | ||
| session_key, | ||
| port, | ||
| proc.pid, | ||
| upstream.model, | ||
| ) | ||
| return state |
There was a problem hiding this comment.
If asyncio.create_subprocess_exec fails (e.g., if the openclaw binary is not found on PATH and raises FileNotFoundError), the _SessionState object is never constructed, and the except Exception block on line 240 is not reached. This results in a leaked open file descriptor (log_file) and a leaked temporary directory (config_dir) on disk. Wrap the entire setup in a try...except block to guarantee cleanup on any initialization failure.
async def _spawn(self, session_key: str, upstream: _Upstream) -> _SessionState:
port = _free_port()
token = secrets.token_hex(16)
config_dir = tempfile.mkdtemp(prefix="openclaw-")
log_file = None
client = None
proc = None
try:
config_path = os.path.join(config_dir, "openclaw.json")
state_dir = os.path.join(config_dir, "state")
os.makedirs(state_dir, exist_ok=True)
with open(config_path, "w") as fh:
json.dump(self._render_config(port, token, upstream), fh)
env = dict(os.environ)
env["OPENCLAW_CONFIG_PATH"] = config_path
env["OPENCLAW_STATE_DIR"] = state_dir
if self._node_extra_ca_certs:
env["NODE_EXTRA_CA_CERTS"] = self._node_extra_ca_certs
if self._tls_insecure:
env["NODE_TLS_REJECT_UNAUTHORIZED"] = "0"
log_file = open(os.path.join(config_dir, "gateway.log"), "w")
proc = await asyncio.create_subprocess_exec(
self._bin,
"gateway",
"--port",
str(port),
"--auth",
"token",
"--token",
token,
"--force",
"--allow-unconfigured",
env=env,
stdout=log_file,
stderr=asyncio.subprocess.STDOUT,
)
client = httpx.AsyncClient(
base_url=f"http://127.0.0.1:{port}",
timeout=self._timeout,
headers={"Authorization": f"Bearer {token}"},
)
state = _SessionState(
port=port,
gateway_token=token,
config_dir=config_dir,
process=proc,
client=client,
log_file=log_file,
)
await self._wait_healthy(state)
logger.info(
"Spawned OpenClaw subprocess (session=%s, port=%d, pid=%s, model=%s)",
session_key,
port,
proc.pid,
upstream.model,
)
return state
except Exception:
if client is not None:
await client.aclose()
if proc is not None and proc.returncode is None:
with contextlib.suppress(ProcessLookupError):
proc.terminate()
with contextlib.suppress(Exception):
await asyncio.wait_for(proc.wait(), timeout=5)
if log_file is not None:
log_file.close()
shutil.rmtree(config_dir, ignore_errors=True)
raise| text_parts: list[str] = [] | ||
| tool_calls: list[dict[str, Any]] = [] |
There was a problem hiding this comment.
Initialize a dictionary to accumulate streamed tool call chunks by their index, rather than a flat list, to correctly handle streamed tool call arguments.
| text_parts: list[str] = [] | |
| tool_calls: list[dict[str, Any]] = [] | |
| text_parts: list[str] = [] | |
| active_tool_calls: dict[int, dict[str, Any]] = {} |
| for tc in delta.get("tool_calls") or []: | ||
| fn = tc.get("function") or {} | ||
| name = fn.get("name") | ||
| args = fn.get("arguments", "") | ||
| if name: | ||
| await emitter.emit_tool_call(name=name, args=args) | ||
| tool_calls.append({"name": name, "input": args}) |
There was a problem hiding this comment.
Accumulate the streamed tool call name and arguments by their index. In OpenAI-compatible streaming, the tool call name is typically only sent in the first chunk, while arguments are streamed across subsequent chunks. Checking if name: on every chunk and emitting immediately would discard the streamed arguments and emit incomplete tool calls.
| for tc in delta.get("tool_calls") or []: | |
| fn = tc.get("function") or {} | |
| name = fn.get("name") | |
| args = fn.get("arguments", "") | |
| if name: | |
| await emitter.emit_tool_call(name=name, args=args) | |
| tool_calls.append({"name": name, "input": args}) | |
| for tc in delta.get("tool_calls") or []: | |
| idx = tc.get("index") | |
| if idx is None: | |
| continue | |
| fn = tc.get("function") or {} | |
| name = fn.get("name") | |
| args = fn.get("arguments", "") | |
| if idx not in active_tool_calls: | |
| active_tool_calls[idx] = {"name": "", "arguments": []} | |
| if name: | |
| active_tool_calls[idx]["name"] = name | |
| if args: | |
| active_tool_calls[idx]["arguments"].append(args) |
| await emitter.emit_tool_call(name=name, args=args) | ||
| tool_calls.append({"name": name, "input": args}) | ||
|
|
||
| summary = "".join(text_parts) |
There was a problem hiding this comment.
After the stream completes, emit the fully accumulated tool calls and populate the tool_calls metadata list.
| summary = "".join(text_parts) | |
| tool_calls: list[dict[str, Any]] = [] | |
| for tc_info in active_tool_calls.values(): | |
| name = tc_info["name"] | |
| full_args = "".join(tc_info["arguments"]) | |
| await emitter.emit_tool_call(name=name, args=full_args) | |
| tool_calls.append({"name": name, "input": full_args}) | |
| summary = "".join(text_parts) |
| async def close_session(self, session_key: str) -> None: | ||
| state = self._sessions.pop(session_key, None) | ||
| if state is not None: | ||
| await self._teardown_state(state) |
There was a problem hiding this comment.
Clean up the session's lock from self._locks when closing the session to prevent a memory leak over time as new sessions are created and destroyed.
| async def close_session(self, session_key: str) -> None: | |
| state = self._sessions.pop(session_key, None) | |
| if state is not None: | |
| await self._teardown_state(state) | |
| async def close_session(self, session_key: str) -> None: | |
| state = self._sessions.pop(session_key, None) | |
| if state is not None: | |
| await self._teardown_state(state) | |
| async with self._locks_guard: | |
| self._locks.pop(session_key, None) |
There was a problem hiding this comment.
This file should not be placed here; put it in agent_service or create a separate folder. The content thereafter is the same.
Summary
Integrates OpenClaw as an
agent_serviceruntime.OpenClawAgentspawns one OpenClaw gateway subprocess per RL session, each bound to its own upstream LLM key, so a session's turns are attributable to a distinct per-episode key (sk-sess-*). OpenClaw config is process-global, so per-session isolation requires one process per session.examples/openclaw/agent.py—OpenClawAgent(AgentRunnable) with per-session subprocess lifecycle (spawn / health-wait / teardown). Each turn issues one OpenAIPOST /v1/chat/completionsto the session's own subprocess, using a unique per-turn session key so the DataProxy-replayed history is the single source of truth (avoids double-feeding OpenClaw's own per-session-key memory).examples/openclaw/run_agent_service.py— runnable launcher with a random admin key and an auto-createdfileroot.data_proxy— reset history and reward onepisode/start, and clear reward afterepisode/end, to prevent cross-episode state leakage.agent_servicetypes/worker/__init__— backward-compatible lifecycle hooks (on_episode_start/on_episode_end,TrainingContext) for episode-aware runtimes; existing runtimes degrade to no-ops viagetattr.examples/openclaw/README— documents the OpenClaw agent_service design and usage.Design notes
x-openclaw-session-key. Reusing a stable key would feed prior turns twice (OpenClaw memory + DataProxy replay), corrupting the captured training prompt. A unique per-turn key makes the replayed history authoritative.examples/openclaw/andexperimental/agent_service; no training/inference path importsagent_service.Test plan
OpenClawAgentround-trip against a mock OpenAI-compatible upstream: spawn → run → clean teardown; verified replayed history forwarded exactly once (no double-feed).AgentController(Gateway/Router/DataProxy/Worker):/v1/responsestwo-turn conversation returnscompleted; invalid bearer rejected with 401; clean process teardown.episode/startclears history,episode/endclears buffered reward.pre-commit run --filesgreen on all changed files.🤖 Generated with Claude Code