Skip to content

feat(agent_service): add OpenClaw per-session agent runtime#1383

Open
IF007 wants to merge 1 commit into
areal-project:mainfrom
IF007:feat/openclaw-agent-service
Open

feat(agent_service): add OpenClaw per-session agent runtime#1383
IF007 wants to merge 1 commit into
areal-project:mainfrom
IF007:feat/openclaw-agent-service

Conversation

@IF007
Copy link
Copy Markdown

@IF007 IF007 commented Jun 2, 2026

Summary

Integrates OpenClaw as an agent_service runtime. OpenClawAgent spawns one OpenClaw gateway subprocess per RL session, each bound to its own upstream LLM key, so a session's turns are attributable to a distinct per-episode key (sk-sess-*). OpenClaw config is process-global, so per-session isolation requires one process per session.

  • examples/openclaw/agent.pyOpenClawAgent (AgentRunnable) with per-session subprocess lifecycle (spawn / health-wait / teardown). Each turn issues one OpenAI POST /v1/chat/completions to the session's own subprocess, using a unique per-turn session key so the DataProxy-replayed history is the single source of truth (avoids double-feeding OpenClaw's own per-session-key memory).
  • examples/openclaw/run_agent_service.py — runnable launcher with a random admin key and an auto-created fileroot.
  • data_proxy — reset history and reward on episode/start, and clear reward after episode/end, to prevent cross-episode state leakage.
  • agent_service types/worker/__init__ — backward-compatible lifecycle hooks (on_episode_start / on_episode_end, TrainingContext) for episode-aware runtimes; existing runtimes degrade to no-ops via getattr.
  • examples/openclaw/README — documents the OpenClaw agent_service design and usage.

Design notes

  • Why one subprocess per session? OpenClaw's provider/upstream-key/model config is process-global; RL needs each session attributable to a distinct per-episode upstream key, so logical isolation in a single process is insufficient.
  • Per-turn fresh key — empirically OpenClaw is stateful by x-openclaw-session-key. Reusing a stable key would feed prior turns twice (OpenClaw memory + DataProxy replay), corrupting the captured training prompt. A unique per-turn key makes the replayed history authoritative.
  • Zero impact on core framework — all changes are additive and confined to examples/openclaw/ and experimental/agent_service; no training/inference path imports agent_service.

Test plan

  • Direct OpenClawAgent round-trip against a mock OpenAI-compatible upstream: spawn → run → clean teardown; verified replayed history forwarded exactly once (no double-feed).
  • Full stack via AgentController (Gateway/Router/DataProxy/Worker): /v1/responses two-turn conversation returns completed; invalid bearer rejected with 401; clean process teardown.
  • DataProxy cross-episode reset integration test: episode/start clears history, episode/end clears buffered reward.
  • pre-commit run --files green on all changed files.

Note: end-to-end was validated with a local mock upstream (no GPU). Real RL training integration (per-episode sk-sess-* minting through the ProxyGateway) is the documented next layer.

🤖 Generated with Claude Code

Integrate OpenClaw as an agent_service runtime that spawns one OpenClaw
gateway subprocess per RL session, each bound to its own upstream LLM
key, so a session's turns are attributable to a distinct per-episode key.

- examples/openclaw/agent.py: OpenClawAgent (AgentRunnable) with per-session
  subprocess lifecycle; drives each turn via the OpenAI chat-completions
  endpoint using a unique per-turn session key so DataProxy-replayed history
  is the single source of truth (avoids double-feeding OpenClaw's own memory).
- examples/openclaw/run_agent_service.py: runnable launcher with a random
  admin key and auto-created fileroot.
- data_proxy: reset history and reward on episode/start, and clear reward
  after episode/end, to prevent cross-episode state leakage.
- agent_service types/worker/__init__: backward-compatible lifecycle hooks
  (on_episode_start/end, TrainingContext) for episode-aware runtimes.
- examples/openclaw/README: document the OpenClaw agent_service design and usage.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds support for training episodes and rewards in the Agent Service, integrating with OpenClaw by spawning per-session subprocesses. It introduces TrainingContext and lifecycle hooks to manage these subprocesses, alongside new endpoints in the DataProxy and Worker. Feedback on the implementation identifies a resource leak if subprocess spawning fails, incorrect handling of streamed tool calls where chunks are not accumulated by index, and a memory leak due to un-cleared session locks.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +190 to +251
async def _spawn(self, session_key: str, upstream: _Upstream) -> _SessionState:
port = _free_port()
token = secrets.token_hex(16)
config_dir = tempfile.mkdtemp(prefix="openclaw-")
config_path = os.path.join(config_dir, "openclaw.json")
state_dir = os.path.join(config_dir, "state")
os.makedirs(state_dir, exist_ok=True)
with open(config_path, "w") as fh:
json.dump(self._render_config(port, token, upstream), fh)

env = dict(os.environ)
env["OPENCLAW_CONFIG_PATH"] = config_path
env["OPENCLAW_STATE_DIR"] = state_dir
if self._node_extra_ca_certs:
env["NODE_EXTRA_CA_CERTS"] = self._node_extra_ca_certs
if self._tls_insecure:
env["NODE_TLS_REJECT_UNAUTHORIZED"] = "0"

log_file = open(os.path.join(config_dir, "gateway.log"), "w")
proc = await asyncio.create_subprocess_exec(
self._bin,
"gateway",
"--port",
str(port),
"--auth",
"token",
"--token",
token,
"--force",
"--allow-unconfigured",
env=env,
stdout=log_file,
stderr=asyncio.subprocess.STDOUT,
)

client = httpx.AsyncClient(
base_url=f"http://127.0.0.1:{port}",
timeout=self._timeout,
headers={"Authorization": f"Bearer {token}"},
)
state = _SessionState(
port=port,
gateway_token=token,
config_dir=config_dir,
process=proc,
client=client,
log_file=log_file,
)
try:
await self._wait_healthy(state)
except Exception:
await self._teardown_state(state)
raise

logger.info(
"Spawned OpenClaw subprocess (session=%s, port=%d, pid=%s, model=%s)",
session_key,
port,
proc.pid,
upstream.model,
)
return state
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If asyncio.create_subprocess_exec fails (e.g., if the openclaw binary is not found on PATH and raises FileNotFoundError), the _SessionState object is never constructed, and the except Exception block on line 240 is not reached. This results in a leaked open file descriptor (log_file) and a leaked temporary directory (config_dir) on disk. Wrap the entire setup in a try...except block to guarantee cleanup on any initialization failure.

    async def _spawn(self, session_key: str, upstream: _Upstream) -> _SessionState:
        port = _free_port()
        token = secrets.token_hex(16)
        config_dir = tempfile.mkdtemp(prefix="openclaw-")
        log_file = None
        client = None
        proc = None
        try:
            config_path = os.path.join(config_dir, "openclaw.json")
            state_dir = os.path.join(config_dir, "state")
            os.makedirs(state_dir, exist_ok=True)
            with open(config_path, "w") as fh:
                json.dump(self._render_config(port, token, upstream), fh)

            env = dict(os.environ)
            env["OPENCLAW_CONFIG_PATH"] = config_path
            env["OPENCLAW_STATE_DIR"] = state_dir
            if self._node_extra_ca_certs:
                env["NODE_EXTRA_CA_CERTS"] = self._node_extra_ca_certs
            if self._tls_insecure:
                env["NODE_TLS_REJECT_UNAUTHORIZED"] = "0"

            log_file = open(os.path.join(config_dir, "gateway.log"), "w")
            proc = await asyncio.create_subprocess_exec(
                self._bin,
                "gateway",
                "--port",
                str(port),
                "--auth",
                "token",
                "--token",
                token,
                "--force",
                "--allow-unconfigured",
                env=env,
                stdout=log_file,
                stderr=asyncio.subprocess.STDOUT,
            )

            client = httpx.AsyncClient(
                base_url=f"http://127.0.0.1:{port}",
                timeout=self._timeout,
                headers={"Authorization": f"Bearer {token}"},
            )
            state = _SessionState(
                port=port,
                gateway_token=token,
                config_dir=config_dir,
                process=proc,
                client=client,
                log_file=log_file,
            )
            await self._wait_healthy(state)
            logger.info(
                "Spawned OpenClaw subprocess (session=%s, port=%d, pid=%s, model=%s)",
                session_key,
                port,
                proc.pid,
                upstream.model,
            )
            return state
        except Exception:
            if client is not None:
                await client.aclose()
            if proc is not None and proc.returncode is None:
                with contextlib.suppress(ProcessLookupError):
                    proc.terminate()
                with contextlib.suppress(Exception):
                    await asyncio.wait_for(proc.wait(), timeout=5)
            if log_file is not None:
                log_file.close()
            shutil.rmtree(config_dir, ignore_errors=True)
            raise

Comment on lines +372 to +373
text_parts: list[str] = []
tool_calls: list[dict[str, Any]] = []
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Initialize a dictionary to accumulate streamed tool call chunks by their index, rather than a flat list, to correctly handle streamed tool call arguments.

Suggested change
text_parts: list[str] = []
tool_calls: list[dict[str, Any]] = []
text_parts: list[str] = []
active_tool_calls: dict[int, dict[str, Any]] = {}

Comment on lines +418 to +424
for tc in delta.get("tool_calls") or []:
fn = tc.get("function") or {}
name = fn.get("name")
args = fn.get("arguments", "")
if name:
await emitter.emit_tool_call(name=name, args=args)
tool_calls.append({"name": name, "input": args})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Accumulate the streamed tool call name and arguments by their index. In OpenAI-compatible streaming, the tool call name is typically only sent in the first chunk, while arguments are streamed across subsequent chunks. Checking if name: on every chunk and emitting immediately would discard the streamed arguments and emit incomplete tool calls.

Suggested change
for tc in delta.get("tool_calls") or []:
fn = tc.get("function") or {}
name = fn.get("name")
args = fn.get("arguments", "")
if name:
await emitter.emit_tool_call(name=name, args=args)
tool_calls.append({"name": name, "input": args})
for tc in delta.get("tool_calls") or []:
idx = tc.get("index")
if idx is None:
continue
fn = tc.get("function") or {}
name = fn.get("name")
args = fn.get("arguments", "")
if idx not in active_tool_calls:
active_tool_calls[idx] = {"name": "", "arguments": []}
if name:
active_tool_calls[idx]["name"] = name
if args:
active_tool_calls[idx]["arguments"].append(args)

await emitter.emit_tool_call(name=name, args=args)
tool_calls.append({"name": name, "input": args})

summary = "".join(text_parts)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

After the stream completes, emit the fully accumulated tool calls and populate the tool_calls metadata list.

Suggested change
summary = "".join(text_parts)
tool_calls: list[dict[str, Any]] = []
for tc_info in active_tool_calls.values():
name = tc_info["name"]
full_args = "".join(tc_info["arguments"])
await emitter.emit_tool_call(name=name, args=full_args)
tool_calls.append({"name": name, "input": full_args})
summary = "".join(text_parts)

Comment on lines +339 to +342
async def close_session(self, session_key: str) -> None:
state = self._sessions.pop(session_key, None)
if state is not None:
await self._teardown_state(state)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Clean up the session's lock from self._locks when closing the session to prevent a memory leak over time as new sessions are created and destroyed.

Suggested change
async def close_session(self, session_key: str) -> None:
state = self._sessions.pop(session_key, None)
if state is not None:
await self._teardown_state(state)
async def close_session(self, session_key: str) -> None:
state = self._sessions.pop(session_key, None)
if state is not None:
await self._teardown_state(state)
async with self._locks_guard:
self._locks.pop(session_key, None)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file should not be placed here; put it in agent_service or create a separate folder. The content thereafter is the same.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants