Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
333 changes: 38 additions & 295 deletions clawloop/environments/_car_purple.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,36 @@

from __future__ import annotations

import asyncio
import json
import logging
import threading
from typing import Any
from typing import Any, ClassVar
from uuid import uuid4

import litellm
import uvicorn
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route

from clawloop.environments._purple_base import (
_PurpleAgentBase,
create_app,
start_purple_server,
)
from clawloop.learning_layers.harness import Harness

log = logging.getLogger(__name__)

__all__ = ["CarPurpleAgent", "create_app", "start_purple_server"]


class CarPurpleAgent(_PurpleAgentBase):
"""Purple agent for CAR-bench: parses System/User prompt, wraps replies in ``message``."""

class CarPurpleAgent:
"""A2A-compliant purple agent that injects clawloop harness state into LLM calls."""
default_bench: ClassVar[str] = "car"
agent_card_name: ClassVar[str] = "clawloop-purple-agent"
agent_card_description: ClassVar[str] = "ClawLoop harness-optimized agent under test"
agent_card_skills: ClassVar[list[dict]] = [
{
"id": "car_assistant",
"name": "In-Car Voice Assistant",
"description": "Agent under test for CAR-bench evaluation",
"tags": ["benchmark", "car-bench"],
}
]

def __init__(
self,
Expand All @@ -33,310 +42,44 @@ def __init__(
api_base: str | None = None,
api_key: str | None = None,
):
self.model = model
self.harness = harness
self.bench = bench
self.api_base = api_base
self.api_key = api_key
self._sessions: dict[str, list[dict]] = {}
self._tool_cache: dict[str, list[dict]] = {}
super().__init__(model, harness, bench, api_base, api_key)
self._captured: dict[str, list[dict]] = {}

def update_harness(self, harness: Harness) -> None:
self.harness = harness

def clear_all_sessions(self) -> None:
self._sessions.clear()
self._tool_cache.clear()
super().clear_all_sessions()
self._captured.clear()

# -- Message parsing --

@staticmethod
def _parse_first_message(raw_text: str) -> tuple[str, str]:
"""Parse 'System: ...\\n\\nUser: ...' format from green agent."""
"""Parse ``System: ...\\n\\nUser: ...`` format from green agent."""
if "System:" in raw_text and "\n\nUser:" in raw_text:
parts = raw_text.split("\n\nUser:", 1)
system = parts[0].replace("System:", "", 1).strip()
user = parts[1].strip()
return system, user
return "", raw_text

@staticmethod
def _convert_tools_to_openai(car_tools: list[dict]) -> list[dict]:
"""Normalize tool schemas to OpenAI function-calling format.
def _build_initial_messages(self, text_parts: list[str]) -> list[dict]:
raw_text = text_parts[0] if text_parts else ""
system_prompt, user_text = self._parse_first_message(raw_text)

Green agent may send tools already wrapped ({type: function, function: ...})
or flat ({name, description, parameters}). Handle both.
"""
result = []
for t in car_tools:
if t.get("type") == "function" and "function" in t:
# Already in OpenAI format
result.append(t)
else:
result.append(
{
"type": "function",
"function": {
"name": t["name"],
"description": t.get("description", ""),
"parameters": t.get("parameters", {}),
},
}
)
return result
harness_prompt = self.harness.system_prompt(self.bench)
if harness_prompt:
system_prompt = f"{harness_prompt}\n\n{system_prompt}"

@staticmethod
def _normalize_assistant_msg(litellm_msg: Any) -> dict:
"""Normalize litellm response to stable internal format."""
normalized: dict[str, Any] = {
"role": "assistant",
"content": litellm_msg.content or "",
}
if litellm_msg.tool_calls:
normalized["tool_calls"] = [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments,
},
}
for tc in litellm_msg.tool_calls
]
return normalized
out: list[dict] = [{"role": "system", "content": system_prompt}]
if user_text:
out.append({"role": "user", "content": user_text})
return out

def _format_a2a_response(self, assistant_msg: Any) -> dict:
"""Format LLM response as A2A result body."""
parts: list[dict] = [{"kind": "text", "text": assistant_msg.content or ""}]

if assistant_msg.tool_calls:
tool_calls = []
for tc in assistant_msg.tool_calls:
args = tc.function.arguments
if args is None:
args = {}
elif isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
log.warning("Malformed tool args for %s", tc.function.name)
args = {"raw": args}
tool_calls.append({"tool_name": tc.function.name, "arguments": args})
parts.append({"kind": "data", "data": {"tool_calls": tool_calls}})

return {
"message": {
"messageId": uuid4().hex,
"role": "agent",
"parts": parts,
"parts": self._build_message_parts(assistant_msg),
}
}

# -- Core message handling --

def handle_message_sync(self, jsonrpc_request: dict) -> dict:
"""Handle one message/send request (sync — litellm.completion is sync)."""
params = jsonrpc_request["params"]
msg = params["message"]
context_id = params.get("contextId", "default")

text_parts = [p["text"] for p in msg["parts"] if p.get("kind") == "text"]
data_parts = [p["data"] for p in msg["parts"] if p.get("kind") == "data"]

# Initialize session
if context_id not in self._sessions:
self._sessions[context_id] = []
self._captured[context_id] = []

messages = self._sessions[context_id]

if not messages:
# First message: extract system prompt + tools
raw_text = text_parts[0] if text_parts else ""
system_prompt, user_text = self._parse_first_message(raw_text)

# HARNESS INJECTION
harness_prompt = self.harness.system_prompt(self.bench)
if harness_prompt:
system_prompt = f"{harness_prompt}\n\n{system_prompt}"

messages.append({"role": "system", "content": system_prompt})
if user_text:
messages.append({"role": "user", "content": user_text})

# Cache tools
for d in data_parts:
if "tools" in d:
self._tool_cache[context_id] = self._convert_tools_to_openai(d["tools"])
else:
# Subsequent: tool results and/or user text
for d in data_parts:
if "tool_results" in d:
for tr in d["tool_results"]:
# Reconcile tool_call_ids: rewrite last assistant msg's
# tool_call ids to match green's generated ids
green_id = tr["tool_call_id"]
tool_name = tr.get("tool_name", "")
self._reconcile_tool_call_id(messages, tool_name, green_id)

messages.append(
{
"role": "tool",
"tool_call_id": green_id,
"content": tr["content"],
}
)
for text in text_parts:
if text.strip():
messages.append({"role": "user", "content": text})

# Call LLM
tools = self._tool_cache.get(context_id)
completion_kwargs: dict[str, Any] = {
"model": self.model,
"messages": messages,
"temperature": 0.0,
}
if tools:
completion_kwargs["tools"] = tools
if self.api_base:
completion_kwargs["api_base"] = self.api_base
if self.api_key:
completion_kwargs["api_key"] = self.api_key

response = litellm.completion(**completion_kwargs)
assistant_msg = response.choices[0].message

# Normalize and store
normalized = self._normalize_assistant_msg(assistant_msg)
messages.append(normalized)
self._captured[context_id].append(normalized)

return self._format_a2a_response(assistant_msg)

@staticmethod
def _reconcile_tool_call_id(messages: list[dict], tool_name: str, green_id: str) -> None:
"""Rewrite last assistant message's tool_call id to match green's id.

Green generates its own tool_call_ids. The LLM needs matching ids between
assistant tool_calls and tool-role messages. We rewrite the assistant msg's
id to match what green sent back.

Handles duplicate tool names by only rewriting tool calls that haven't
already been reconciled (i.e., still have their original LLM-generated id).
"""
# Collect green IDs already used in existing tool-role messages
used_green_ids = {
m["tool_call_id"] for m in messages if m.get("role") == "tool" and "tool_call_id" in m
}
# Walk backwards to find the last assistant message with tool_calls
for msg in reversed(messages):
if msg.get("role") != "assistant" or "tool_calls" not in msg:
continue
for tc in msg["tool_calls"]:
# Match by name, skip if already reconciled (id is a known green id)
if tc["function"]["name"] == tool_name and tc["id"] not in used_green_ids:
tc["id"] = green_id
return
return # found assistant msg but no matching tool name


def create_app(agent: CarPurpleAgent, port: int = 0) -> Starlette:
"""Create the A2A Starlette app."""

async def agent_card(request: Request) -> JSONResponse:
return JSONResponse(
{
"name": "clawloop-purple-agent",
"description": "ClawLoop harness-optimized agent under test",
"url": f"http://127.0.0.1:{port}/",
"version": "0.1.0",
"protocol_version": "0.3.0",
"preferred_transport": "JSONRPC",
"default_input_modes": ["text/plain"],
"default_output_modes": ["text/plain"],
"capabilities": {"streaming": False, "push_notifications": False},
"skills": [
{
"id": "car_assistant",
"name": "In-Car Voice Assistant",
"description": "Agent under test for CAR-bench evaluation",
"tags": ["benchmark", "car-bench"],
}
],
}
)

async def handle_jsonrpc(request: Request) -> JSONResponse:
body = await request.json()
if body.get("jsonrpc") != "2.0" or "id" not in body:
return JSONResponse(
{
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32600, "message": "Invalid Request"},
}
)

method = body.get("method")
if method != "message/send":
return JSONResponse(
{
"jsonrpc": "2.0",
"id": body["id"],
"error": {"code": -32601, "message": f"Method not found: {method}"},
}
)

# Run sync litellm call in thread to avoid blocking event loop
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, agent.handle_message_sync, body)

return JSONResponse({"jsonrpc": "2.0", "id": body["id"], "result": result})

return Starlette(
routes=[
Route("/.well-known/agent.json", agent_card, methods=["GET"]),
Route("/.well-known/agent-card.json", agent_card, methods=["GET"]),
Route("/", handle_jsonrpc, methods=["POST"]),
]
)


def start_purple_server(
agent: CarPurpleAgent, host: str = "127.0.0.1", port: int = 0
) -> tuple[threading.Thread, int]:
"""Start the purple agent server in a background thread. Returns (thread, actual_port)."""
import socket
import time

# Bind socket to get a free port; pass it directly to uvicorn to avoid race.
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
actual_port = sock.getsockname()[1]

app = create_app(agent, actual_port)
config = uvicorn.Config(app, host=host, port=actual_port, log_level="warning")
server = uvicorn.Server(config)

thread = threading.Thread(target=server.run, kwargs={"sockets": [sock]}, daemon=True)
thread.start()

# Poll for readiness
import httpx

for _ in range(50):
try:
r = httpx.get(f"http://{host}:{actual_port}/.well-known/agent-card.json", timeout=0.5)
if r.status_code == 200:
break
except httpx.ConnectError:
time.sleep(0.1)
else:
log.warning("Purple server did not become ready within 5s")

return thread, actual_port
def _capture_assistant(self, context_id: str, normalized: dict) -> None:
self._captured.setdefault(context_id, []).append(normalized)
Loading
Loading