diff --git a/pkg/server/api.go b/pkg/server/api.go index 8d412c77..f87195a2 100644 --- a/pkg/server/api.go +++ b/pkg/server/api.go @@ -883,12 +883,14 @@ func runtimeExecutionID(ctx context.Context) string { } func authOwnerID(r *http.Request) string { - user := auth.GetAuthUser(r.Context()) - if user == nil { - return "" + if user := auth.GetAuthUser(r.Context()); user != nil { + return fmt.Sprintf("%d", user.GitHubID) } - return fmt.Sprintf("%d", user.GitHubID) + // Fall back to caller attribution so sessions are owner-scoped even when + // the server runs unauthenticated (e.g. local mode, the eval harness). + // Empty when no attribution is present, preserving prior behavior. + return attribution.FromContext(r.Context()) } func parseOptionalInt(r *http.Request, key string) (int, error) { diff --git a/pkg/server/runtime_attribution_test.go b/pkg/server/runtime_attribution_test.go index 6bbd6bc5..18fd3e33 100644 --- a/pkg/server/runtime_attribution_test.go +++ b/pkg/server/runtime_attribution_test.go @@ -93,3 +93,15 @@ func TestRuntimeCallbacksInheritExecutionAttribution(t *testing.T) { require.Equal(t, "discord:sam", inherited, "runtime callbacks must inherit the spawning execution's attribution") } + +func TestAuthOwnerIDAttributionFallback(t *testing.T) { + // No auth user: fall back to the caller attribution so sessions are owner-scoped + // even on an unauthenticated server (local mode, the eval harness). + withAttr := httptest.NewRequest(http.MethodGet, "/api/v1/sessions", nil) + withAttr = withAttr.WithContext(attribution.WithValue(withAttr.Context(), "eval-worker-abc123")) + require.Equal(t, "eval-worker-abc123", authOwnerID(withAttr)) + + // Neither auth user nor attribution: empty owner, preserving prior behavior. + bare := httptest.NewRequest(http.MethodGet, "/api/v1/sessions", nil) + require.Equal(t, "", authOwnerID(bare)) +} diff --git a/tests/eval/agent/opencode_agent.py b/tests/eval/agent/opencode_agent.py index d979f740..9c3b6193 100644 --- a/tests/eval/agent/opencode_agent.py +++ b/tests/eval/agent/opencode_agent.py @@ -41,6 +41,20 @@ # Langfuse session. In CI that's the GitHub run id; locally a per-process uuid. _LANGFUSE_SESSION_ID = os.environ.get("GITHUB_RUN_ID") or uuid.uuid4().hex +# Caller-attribution env var the panda CLI forwards as the X-Panda-On-Behalf-Of +# header (pkg/attribution/attribution.go: const EnvVar). The server scopes sandbox +# sessions to this value (authOwnerID falls back to it when unauthenticated), so a +# per-worker identity lets the eval tear down exactly this worker's sessions. +_ATTRIBUTION_ENVVAR = "PANDA_ON_BEHALF_OF" + +# A unique owner id for THIS worker process. Each promptfoo worker runs its tests +# serially against its own opencode serve, so "this worker's sessions" == "the +# just-finished test's sessions". Set into the environment at import so the host +# serve inherits it; the docker serve is passed it explicitly (see _docker_serve). +# An externally-provided attribution (a real chat-agent test) is honored if set. +os.environ.setdefault(_ATTRIBUTION_ENVVAR, f"eval-worker-{uuid.uuid4().hex[:12]}") +EVAL_WORKER_OWNER_ID = os.environ[_ATTRIBUTION_ENVVAR] + SYSTEM_PROMPT_MCP = "You are an ethpandaops agent. You have access to panda via its MCP tools." SYSTEM_PROMPT_CLI = "You are an ethpandaops agent. You have access to the panda CLI." @@ -307,6 +321,9 @@ def _docker_serve( "-p", f"127.0.0.1:{port}:{port}", "--add-host=host.docker.internal:host-gateway", "-e", "OPENCODE_GO_API_KEY", # pass through (value stays out of the arg list) + # Caller attribution so the panda CLI inside carries X-Panda-On-Behalf-Of; + # the value is in os.environ (set at import), passed through by name. + "-e", _ATTRIBUTION_ENVVAR, "-v", f"{panda_dir}:/opt/pandabin:ro", "-v", f"{workdir / 'opencode.json'}:/work/opencode.json:ro", "-v", f"{workdir / 'auth.json'}:/root/.local/share/opencode/auth.json", diff --git a/tests/eval/promptfoo/provider.py b/tests/eval/promptfoo/provider.py index d9200a06..9672fc3a 100644 --- a/tests/eval/promptfoo/provider.py +++ b/tests/eval/promptfoo/provider.py @@ -13,18 +13,23 @@ import asyncio import json import os -import re import sys import urllib.request -sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) # tests/eval on path - import yaml +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) # tests/eval on path + +from agent.opencode_agent import EVAL_WORKER_OWNER_ID from config.settings import DEFAULT_AGENT_MODEL, DEFAULT_AGENT_ROUTE from harness.subject import OpencodeSubject from harness.trace import TOOLS_MARKER +# The HTTP header the panda CLI forwards the attribution env var as +# (pkg/attribution/attribution.go: const Header). The server reads it to scope +# sandbox sessions to the caller, so teardown carries it to find + delete ours. +_ATTRIBUTION_HEADER = "X-Panda-On-Behalf-Of" + _SUBJECTS: dict[tuple, OpencodeSubject] = {} @@ -37,6 +42,54 @@ def _subject(cfg: dict) -> OpencodeSubject: return _SUBJECTS[key] +def _server_base(subject) -> str: + """The base URL of the panda server THIS run talked to: the CLI route resolves it + from PANDA_CONFIG (the harden scratch config), else the subject's mcp_url.""" + cfg_path = os.environ.get("PANDA_CONFIG", "") + if cfg_path and os.path.exists(cfg_path): + try: + with open(cfg_path) as f: + cfg = yaml.safe_load(f) or {} + base = (cfg.get("server") or {}).get("base_url") + if base: + return str(base).rstrip("/") + except Exception: # noqa: BLE001 - fall through to the default + pass + return str(subject.settings.mcp_url).rstrip("/") + + +def _teardown_worker_sessions(subject) -> list[str]: + """Destroy the sandbox sessions owned by THIS worker, now its just-finished test is + done. Deterministic and server-authoritative: the agent's panda CLI carries + X-Panda-On-Behalf-Of=, so the server scopes those sessions to this worker + (authOwnerID's attribution fallback). We ask the server for this worker's sessions and + delete them, carrying the same header so DestroySession's owner check passes. Tests run + serially within a worker, so this targets exactly the test that just finished; the short + TTL reaper is only a backstop for crashed/missed workers. Best-effort throughout.""" + base = _server_base(subject) + headers = {_ATTRIBUTION_HEADER: EVAL_WORKER_OWNER_ID} + try: + req = urllib.request.Request(f"{base}/api/v1/sessions", headers=headers) # noqa: S310 + with urllib.request.urlopen(req, timeout=10) as r: # noqa: S310 + sessions = (json.loads(r.read().decode()) or {}).get("sessions") or [] + except Exception: # noqa: BLE001 - server gone / unreachable -> nothing to tear down + return [] + destroyed: list[str] = [] + for s in sessions: + sid = s.get("session_id") + if not sid: + continue + req = urllib.request.Request( # noqa: S310 + f"{base}/api/v1/sessions/{sid}", method="DELETE", headers=headers + ) + try: + with urllib.request.urlopen(req, timeout=10): # noqa: S310 + destroyed.append(sid) + except Exception: # noqa: BLE001 - already gone / not ours is fine + continue + return destroyed + + def _followups(vars_: dict) -> list: """Decode the `followups` var. It's JSON-encoded by the config (so promptfoo doesn't expand a raw list into a test matrix); tolerate a raw list too, for safety.""" @@ -87,54 +140,6 @@ def _graded_output(trace) -> str: return "\n".join(lines) -# Panda sandbox session ids are 12-char hex. We only count an id as THIS agent's if it -# appears next to a session keyword in the agent's own commands, or as a `session_id` -# in a command's response — and never from `session list` output, which shows OTHER -# agents' sessions too. -_SESSION_IN_ARGS = re.compile(r"session[^\n]{0,40}?\b([0-9a-f]{12})\b", re.IGNORECASE) -_SESSION_IN_OUTPUT = re.compile(r"session_id[\"'\s:=]{1,5}([0-9a-f]{12})", re.IGNORECASE) -_LIST_CMD = re.compile(r"session(s\b|\s+list)", re.IGNORECASE) - - -def _server_base(subject) -> str: - """The base URL of the panda server THIS run talked to: the CLI route resolves it - from PANDA_CONFIG (the harden scratch config), else the subject's mcp_url.""" - cfg_path = os.environ.get("PANDA_CONFIG", "") - if cfg_path and os.path.exists(cfg_path): - try: - with open(cfg_path) as f: - cfg = yaml.safe_load(f) or {} - base = (cfg.get("server") or {}).get("base_url") - if base: - return str(base).rstrip("/") - except Exception: # noqa: BLE001 - fall through to the default - pass - return str(subject.settings.mcp_url).rstrip("/") - - -def _teardown_agent_sessions(subject, trace) -> list[str]: - """Destroy the sandbox sessions THIS agent created/used, now that its run is over. - Per-agent by construction: only ids from this run's own transcript — concurrent - agents' sessions (or a human's) are never touched. Best-effort; already-gone is fine.""" - ids: set[str] = set() - for t in trace.tool_calls: - ids.update(_SESSION_IN_ARGS.findall(t.arguments or "")) - if not _LIST_CMD.search(t.arguments or ""): - ids.update(_SESSION_IN_OUTPUT.findall(t.output or "")) - if not ids: - return [] - base = _server_base(subject) - destroyed = [] - for sid in sorted(ids): - req = urllib.request.Request(f"{base}/api/v1/sessions/{sid}", method="DELETE") - try: - with urllib.request.urlopen(req, timeout=10): # noqa: S310 - destroyed.append(sid) - except Exception: # noqa: BLE001 - not a session / already gone - continue - return destroyed - - def call_api(prompt, options, context): cfg = (options or {}).get("config", {}) or {} # The first turn is the rendered prompt; extra turns come from the `followups` var @@ -151,7 +156,10 @@ def call_api(prompt, options, context): retried = True trace = asyncio.run(subject.run(prompts)) subject.flush() # push the run's Langfuse trace before promptfoo moves to the next - destroyed = _teardown_agent_sessions(subject, trace) # this agent's sandboxes only + # Primary cleanup: deterministically tear down THIS worker's sandbox sessions now the + # test is done (owner-scoped via attribution, server-authoritative). The server's short + # idle TTL (pkg/sandbox/session.go) is only a backstop for crashed/missed workers. + destroyed = _teardown_worker_sessions(subject) return { "output": _graded_output(trace), "tokenUsage": { diff --git a/tests/eval/scripts/_panda_env.py b/tests/eval/scripts/_panda_env.py index 98c5a8ce..5a7727e3 100644 --- a/tests/eval/scripts/_panda_env.py +++ b/tests/eval/scripts/_panda_env.py @@ -54,10 +54,18 @@ def write_scratch_config(port: int, *, source: Path | None = None) -> Path: sb["image"] = "ethpandaops-panda-sandbox:latest" sb["network"] = "ethpandaops-panda-harden" sb["host_shared_path"] = str(shared) - # Eval runs many agents at once (questions x phrasings x subjects, -j concurrent); - # the default 50-session cap can starve them. The sessions are torn down after - # every measure (purge_sessions), so a high cap costs nothing when idle. - sb.setdefault("sessions", {})["max_sessions"] = 150 + # Eval runs many agents at once (questions x phrasings x subjects, -j concurrent). + # Primary cleanup is per-test and owner-scoped (provider.py tears down each worker's + # sessions when its test finishes). This short TTL is only a BACKSTOP for crashed or + # missed workers: the server's idle reaper (pkg/sandbox/session.go) self-reaps a leaked + # session once it's idle past the TTL. 5m is comfortably longer than any inter-turn gap + # in a multi-turn run (an actively-used session's lastUsed is bumped per call and is + # never reaped mid-execution), yet far shorter than the 30m default. With both the + # per-test delete and the reaper, the cap no longer needs to be huge — 100 sits well + # above run concurrency (~24) plus a TTL-window backlog. + sessions = sb.setdefault("sessions", {}) + sessions["ttl"] = "5m" + sessions["max_sessions"] = 100 cfg["storage"] = {"base_dir": str(storage), "cache_dir": str(cache)} cfg.setdefault("observability", {})["metrics_enabled"] = False @@ -122,10 +130,10 @@ def _wait_ready(self) -> None: def purge_sessions(port: int) -> int: """Destroy all remaining sandbox sessions on the SCRATCH server, called only as it - shuts down — its containers would otherwise outlive it as orphans (per-agent - teardown in the provider handles the during-run case; this is the final sweep of a - dying server, never a server anyone else uses). Best-effort: a dead/unreachable - server just means nothing to purge.""" + shuts down — its containers would otherwise outlive it as orphans (the server's idle + reaper handles the during-run case via TTL; this is the final sweep of a dying server, + never a server anyone else uses). Best-effort: a dead/unreachable server just means + nothing to purge.""" base = f"http://localhost:{port}/api/v1/sessions" try: with urllib.request.urlopen(base, timeout=10) as r: # noqa: S310