diff --git a/.gitignore b/.gitignore index 02feb9c..da58962 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ __pycache__/ .venv/ .agentflow/ reference/ +docs/superpowers/ node_modules/ playwright-report/ test-results/ @@ -11,4 +12,3 @@ test-results/ dist/ *.egg-info/ .worktrees/ -docs/superpowers/ diff --git a/agentflow/cli.py b/agentflow/cli.py index 7a9860c..2d95365 100644 --- a/agentflow/cli.py +++ b/agentflow/cli.py @@ -1,10 +1,12 @@ from __future__ import annotations import asyncio -import os +from contextlib import contextmanager import json +import os import subprocess import sys +import time from dataclasses import dataclass, replace from datetime import datetime try: @@ -16,9 +18,15 @@ class StrEnum(str, Enum): pass from pathlib import Path +import httpx import typer from jinja2 import TemplateError from pydantic import ValidationError + +try: + import fcntl +except ImportError: # pragma: no cover - Windows fallback + fcntl = None from agentflow.defaults import ( bundled_templates, bundled_template_names, @@ -60,7 +68,7 @@ class StrEnum(str, Enum): target_uses_login_bash, ) from agentflow.prepared import resolve_local_workdir -from agentflow.specs import AgentKind, LocalTarget, PipelineSpec, normalize_agent_name, provider_uses_kimi_anthropic_auth, resolve_provider +from agentflow.specs import AgentKind, LocalTarget, PipelineSpec, RunRecord, normalize_agent_name, provider_uses_kimi_anthropic_auth, resolve_provider from agentflow.tuned_agents import list_tuned_agent_records, resolve_tuned_agent_version, run_evolution_from_payload app = typer.Typer(add_completion=False) @@ -118,6 +126,149 @@ def _build_store(runs_dir: str) -> object: return RunStore(runs_dir) +def _daemon_metadata_path(runs_dir: str) -> Path: + override = os.getenv("AGENTFLOW_DAEMON_METADATA_PATH") + if override: + return Path(override).expanduser().resolve() + return (Path(runs_dir).expanduser().resolve() / "daemon.json") + + +def _resolve_daemon_host() -> str: + return os.getenv("AGENTFLOW_DAEMON_HOST", "127.0.0.1") + + +def _resolve_daemon_port() -> int: + raw = os.getenv("AGENTFLOW_DAEMON_PORT", "8000") + try: + return int(raw) + except ValueError as exc: + raise typer.BadParameter(f"`AGENTFLOW_DAEMON_PORT` must be an integer, got `{raw}`.") from exc + + +def _daemon_base_url(host: str, port: int) -> str: + return f"http://{host}:{port}" + + +def _load_daemon_metadata(metadata_path: Path) -> dict[str, object] | None: + try: + payload = json.loads(metadata_path.read_text(encoding="utf-8")) + except (FileNotFoundError, json.JSONDecodeError, OSError): + return None + if not isinstance(payload, dict): + return None + return payload + + +def _write_daemon_metadata(metadata_path: Path, *, host: str, port: int, pid: int) -> None: + metadata_path.parent.mkdir(parents=True, exist_ok=True) + payload = {"host": host, "port": port, "pid": pid} + temp_path = metadata_path.parent / f"{metadata_path.name}.tmp" + temp_path.write_text(json.dumps(payload, indent=2), encoding="utf-8") + temp_path.replace(metadata_path) + + +@contextmanager +def _daemon_startup_lock(metadata_path: Path): + """Serialize daemon startup and metadata updates across local processes.""" + lock_path = metadata_path.parent / f"{metadata_path.name}.lock" + lock_path.parent.mkdir(parents=True, exist_ok=True) + with lock_path.open("a+", encoding="utf-8") as handle: + if fcntl is not None: + fcntl.flock(handle.fileno(), fcntl.LOCK_EX) + try: + yield + finally: + if fcntl is not None: + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + + +def _daemon_is_healthy(base_url: str) -> bool: + try: + response = httpx.get(f"{base_url}/api/runs", timeout=0.5) + except httpx.RequestError: + return False + return response.status_code == 200 + + +def _start_daemon(*, host: str, port: int, runs_dir: str, max_concurrent_runs: int) -> subprocess.Popen: + command = [sys.executable, "-m", "agentflow.cli", "serve", "--host", host, "--port", str(port)] + env = dict(os.environ) + env["AGENTFLOW_RUNS_DIR"] = runs_dir + env["AGENTFLOW_MAX_CONCURRENT_RUNS"] = str(max_concurrent_runs) + return subprocess.Popen( + command, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + env=env, + start_new_session=True, + ) + + +def _wait_for_daemon(base_url: str, *, timeout_seconds: float = 5.0) -> None: + deadline = time.monotonic() + timeout_seconds + while time.monotonic() < deadline: + if _daemon_is_healthy(base_url): + return + time.sleep(0.1) + typer.echo( + ( + f"Timed out waiting for the daemon at {base_url} to become healthy. " + "Check whether that host/port is already in use, inspect daemon startup errors, " + "or try starting `agentflow serve` manually with the same --host/--port values." + ), + err=True, + ) + raise typer.Exit(code=1) + + +def _ensure_daemon( + runs_dir: str, + max_concurrent_runs: int, + *, + host: str, + port: int, + metadata_path: Path, +) -> str: + base_url = _daemon_base_url(host, port) + with _daemon_startup_lock(metadata_path): + metadata = _load_daemon_metadata(metadata_path) + metadata_host = metadata.get("host") if isinstance(metadata, dict) else None + metadata_port = metadata.get("port") if isinstance(metadata, dict) else None + if isinstance(metadata_host, str) and isinstance(metadata_port, int) and (metadata_host, metadata_port) == (host, port): + metadata_base_url = _daemon_base_url(metadata_host, metadata_port) + if _daemon_is_healthy(metadata_base_url): + return metadata_base_url + + process = _start_daemon(host=host, port=port, runs_dir=runs_dir, max_concurrent_runs=max_concurrent_runs) + _wait_for_daemon(base_url) + _write_daemon_metadata(metadata_path, host=host, port=port, pid=process.pid) + return base_url + + +def _submit_detached_run(pipeline: object, base_url: str) -> RunRecord: + payload: dict[str, object] = {"pipeline": pipeline.model_dump(mode="json")} + base_dir = getattr(pipeline, "base_dir", None) + if isinstance(base_dir, str) and base_dir: + payload["base_dir"] = base_dir + try: + response = httpx.post(f"{base_url}/api/runs", json=payload, timeout=10.0) + except httpx.RequestError as exc: + typer.echo(f"Failed to submit run to daemon at {base_url}: {exc}", err=True) + raise typer.Exit(code=1) from exc + try: + response.raise_for_status() + except httpx.HTTPStatusError as exc: + detail = exc.response.text.strip() + typer.echo(f"Failed to submit run to daemon at {base_url}: {detail}", err=True) + raise typer.Exit(code=1) from exc + try: + data = response.json() + except ValueError as exc: + typer.echo(f"Failed to submit run to daemon at {base_url}: invalid JSON response", err=True) + raise typer.Exit(code=1) from exc + return RunRecord.model_validate(data) + + def _render_tuned_agents_summary(records: list[object]) -> str: if not records: return "No tuned agents found." @@ -406,6 +557,315 @@ def _build_run_summary(record: object, run_dir: Path | str | None = None) -> dic return summary +_STATUS_INACTIVE_NODE_STATUSES = {"pending", "queued", "ready"} +_STATUS_ACTIVE_NODE_STATUSES = {"running", "retrying", "cancelling"} +_EVOLUTION_PROGRESS_KEYS = {"agentflow_event", "stage", "attempt", "status", "command", "detail", "node_id"} +_EVOLUTION_PROGRESS_PREVIEW_LIMIT = 5 + + +def _normalize_event_payload(event: object) -> dict[str, object]: + if isinstance(event, dict): + payload = dict(event) + else: + model_dump = getattr(event, "model_dump", None) + if callable(model_dump): + payload = model_dump(mode="json") + else: + payload = { + "timestamp": getattr(event, "timestamp", None), + "run_id": getattr(event, "run_id", None), + "type": getattr(event, "type", None), + "node_id": getattr(event, "node_id", None), + "data": getattr(event, "data", None), + } + if not isinstance(payload, dict): + return {} + data = payload.get("data") + if not isinstance(data, dict): + payload["data"] = {} + return payload + + +def _event_data_summary(data: object) -> str | None: + if not isinstance(data, dict): + return None + pieces: list[str] = [] + if data.get("status") is not None: + pieces.append(f"status={data['status']}") + if data.get("attempt") is not None: + pieces.append(f"attempt={data['attempt']}") + if data.get("round_number") is not None: + pieces.append(f"round={data['round_number']}") + if data.get("total_rounds") is not None: + pieces.append(f"of={data['total_rounds']}") + if data.get("child_run_id") is not None: + pieces.append(f"child={data['child_run_id']}") + if data.get("reason") is not None: + pieces.append(f"reason={data['reason']}") + if data.get("error") is not None: + pieces.append(f"error={data['error']}") + return " ".join(pieces) if pieces else None + + +def _render_status_event(event_payload: dict[str, object]) -> str: + timestamp = event_payload.get("timestamp") + event_type = event_payload.get("type") + node_id = event_payload.get("node_id") + parts: list[str] = [] + if timestamp: + parts.append(str(timestamp)) + if event_type: + parts.append(str(event_type)) + if node_id: + parts.append(f"node={node_id}") + detail = _event_data_summary(event_payload.get("data")) + if detail: + parts.append(detail) + return " ".join(parts) + + +def _parse_evolution_progress_line(line: str) -> dict[str, object] | None: + try: + payload = json.loads(line) + except (TypeError, json.JSONDecodeError): + return None + if not isinstance(payload, dict): + return None + if payload.get("agentflow_event") != "evolution_progress": + return None + stage = payload.get("stage") + attempt = payload.get("attempt") + if not stage or attempt is None: + return None + return {key: payload[key] for key in _EVOLUTION_PROGRESS_KEYS if key in payload} + + +def _build_status_evolution_progress(record: object, events: list[object]) -> list[dict[str, object]]: + nodes: dict[str, object] = getattr(record, "nodes", {}) or {} + parsed_events: list[dict[str, object]] = [] + for node_id, node in nodes.items(): + for line in getattr(node, "stderr_lines", []) or []: + if not isinstance(line, str): + continue + event = _parse_evolution_progress_line(line) + if event: + event["node_id"] = node_id + parsed_events.append(event) + + for event in events: + payload = _normalize_event_payload(event) + if payload.get("type") != "node_trace": + continue + node_id = payload.get("node_id") + if not isinstance(node_id, str) or not node_id: + continue + trace = payload.get("data", {}).get("trace") + if not isinstance(trace, dict): + continue + if trace.get("source") != "stderr": + continue + content = trace.get("content") + if not isinstance(content, str): + continue + parsed = _parse_evolution_progress_line(content) + if parsed is None: + continue + parsed["node_id"] = node_id + parsed_events.append(parsed) + + deduped: list[dict[str, object]] = [] + seen: set[str] = set() + for event in parsed_events: + key = json.dumps(event, sort_keys=True, ensure_ascii=False) + if key in seen: + continue + seen.add(key) + deduped.append(event) + return deduped + + +def _render_evolution_progress(event: dict[str, object]) -> str: + node_id = event.get("node_id") or "-" + stage = event.get("stage") or "-" + status = event.get("status") + label = f"{stage} {status}" if status else str(stage) + pieces: list[str] = [] + attempt = event.get("attempt") + if attempt is not None: + pieces.append(f"attempt {attempt}") + command = event.get("command") + if command: + pieces.append(f"command={command}") + detail = event.get("detail") + if detail: + pieces.append(f"detail={detail}") + if not pieces: + return f"{node_id}: {label}" + return f"{node_id}: {label} ({', '.join(pieces)})" + + +def _build_status_progress(record: object) -> dict[str, object]: + nodes: dict[str, object] = getattr(record, "nodes", {}) or {} + pipeline_nodes = _pipeline_node_map(record) + node_ids = list(pipeline_nodes) if pipeline_nodes else list(nodes) + total_nodes = len(node_ids) + status_counts: dict[str, int] = {} + progressed_nodes = 0 + active_nodes: list[dict[str, object]] = [] + + for node_id in node_ids: + node = nodes.get(node_id) + status = _status_value(getattr(node, "status", "pending")).lower() + status_counts[status] = status_counts.get(status, 0) + 1 + if status not in _STATUS_INACTIVE_NODE_STATUSES: + progressed_nodes += 1 + if status in _STATUS_ACTIVE_NODE_STATUSES: + entry: dict[str, object] = {"id": node_id, "status": status} + attempt = _node_attempt_count(node) if node is not None else 0 + if attempt: + entry["attempt"] = attempt + active_nodes.append(entry) + + progress_percent = 0.0 + if total_nodes: + progress_percent = max(0.0, min(100.0, (progressed_nodes / total_nodes) * 100)) + + return { + "total_nodes": total_nodes, + "progressed_nodes": progressed_nodes, + "active_nodes": active_nodes, + "status_counts": status_counts, + "progress_percent": progress_percent, + } + + +def _build_status_optimization(record: object) -> dict[str, object] | None: + payload: dict[str, object] = {} + parent_run_id = getattr(record, "optimization_parent_run_id", None) + if parent_run_id: + payload["parent_run_id"] = parent_run_id + round_number = getattr(record, "optimization_round", None) + if round_number: + payload["round"] = round_number + session = getattr(record, "optimization_session", None) + if isinstance(session, dict) and session: + payload["session"] = session + return payload or None + + +def _build_status_summary( + record: object, + events: list[object], + *, + run_dir: Path | str | None = None, +) -> dict[str, object]: + summary = _build_run_summary(record, run_dir=run_dir) + normalized_events = [_normalize_event_payload(event) for event in events] + summary["events"] = normalized_events + summary["recent_events"] = normalized_events[-5:] + summary["progress"] = _build_status_progress(record) + summary["evolution_progress"] = _build_status_evolution_progress(record, events) + optimization = _build_status_optimization(record) + if optimization is not None: + summary["optimization"] = optimization + return summary + + +def _render_status_optimization(optimization: dict[str, object]) -> str | None: + session = optimization.get("session") + pieces: list[str] = [] + if isinstance(session, dict): + kind = session.get("kind") + if kind: + pieces.append(str(kind)) + optimizer = session.get("optimizer") + if optimizer: + pieces.append(f"optimizer={optimizer}") + current_round = session.get("current_round") + total_rounds = session.get("total_rounds") + if current_round and total_rounds: + pieces.append(f"round {current_round}/{total_rounds}") + elif current_round: + pieces.append(f"round {current_round}") + child_run_ids = session.get("child_run_ids") + if isinstance(child_run_ids, list): + pieces.append(f"child_runs={len(child_run_ids)}") + if "round" in optimization and not any(piece.startswith("round ") for piece in pieces): + pieces.append(f"round {optimization['round']}") + if optimization.get("parent_run_id"): + pieces.append(f"parent={optimization['parent_run_id']}") + if not pieces: + return None + return f"Optimization: {' '.join(pieces)}" + + +def _render_status_summary( + record: object, + events: list[object], + *, + run_dir: Path | str | None = None, +) -> str: + summary = _build_status_summary(record, events, run_dir=run_dir) + lines = [f"Run {summary['id']}: {summary['status']}"] + pipeline = summary.get("pipeline") + if isinstance(pipeline, dict) and pipeline.get("name"): + lines.append(f"Pipeline: {pipeline['name']}") + duration = summary.get("duration") + if duration is not None: + lines.append(f"Duration: {duration}") + started_at = summary.get("started_at") + if started_at: + lines.append(f"Started: {started_at}") + run_dir_value = summary.get("run_dir") + if run_dir_value is not None: + lines.append(f"Run dir: {run_dir_value}") + optimization = summary.get("optimization") + if isinstance(optimization, dict): + rendered = _render_status_optimization(optimization) + if rendered: + lines.append(rendered) + + progress = summary.get("progress") + if isinstance(progress, dict): + total_nodes = progress.get("total_nodes", 0) + progressed_nodes = progress.get("progressed_nodes", 0) + active_nodes = progress.get("active_nodes", []) + if total_nodes: + lines.append(f"Progress: {progressed_nodes}/{total_nodes} nodes, active {len(active_nodes)}") + if isinstance(active_nodes, list) and active_nodes: + active_entries: list[str] = [] + for node in active_nodes: + node_id = node.get("id") + status = node.get("status") + if not node_id or not status: + continue + rendered = f"{node_id} ({status}" + attempt = node.get("attempt") + if attempt: + rendered += f", attempt {attempt}" + rendered += ")" + active_entries.append(rendered) + if active_entries: + lines.append(f"Active: {', '.join(active_entries)}") + + evolution_progress = summary.get("evolution_progress") + if isinstance(evolution_progress, list) and evolution_progress: + lines.append("Evolution progress:") + for event in evolution_progress[-_EVOLUTION_PROGRESS_PREVIEW_LIMIT:]: + if not isinstance(event, dict): + continue + lines.append(f"- {_render_evolution_progress(event)}") + + recent_events = summary.get("recent_events") + if isinstance(recent_events, list) and recent_events: + lines.append("Recent events:") + for event_payload in recent_events: + if not isinstance(event_payload, dict): + continue + lines.append(f"- {_render_status_event(event_payload)}") + return "\n".join(lines) + + def _render_run_summary(record: object, run_dir: Path | str | None = None) -> str: summary = _build_run_summary(record, run_dir=run_dir) lines = [f"Run {summary['id']}: {summary['status']}"] @@ -473,6 +933,27 @@ def _echo_run_result(record: object, *, output: RunOutputFormat, run_dir: Path | typer.echo(json.dumps(record.model_dump(mode="json"), indent=2)) +def _echo_status_result( + record: object, + events: list[object], + *, + output: RunOutputFormat, + run_dir: Path | str | None = None, +) -> None: + resolved_output = _resolve_run_output(output) + if resolved_output == RunOutputFormat.SUMMARY: + typer.echo(_render_status_summary(record, events, run_dir=run_dir)) + return + if resolved_output == RunOutputFormat.JSON_SUMMARY: + typer.echo(json.dumps(_build_status_summary(record, events, run_dir=run_dir), indent=2)) + return + model_dump = getattr(record, "model_dump", None) + if callable(model_dump): + typer.echo(json.dumps(model_dump(mode="json"), indent=2)) + return + typer.echo(json.dumps(_build_run_summary(record, run_dir=run_dir), indent=2)) + + def _run_dir_for_record(store: object | None, run_id: str) -> Path | str | None: if store is None: return None @@ -2136,6 +2617,25 @@ def show( _echo_run_result(record, output=output, run_dir=_run_dir_for_record(store, run_id)) +@app.command() +def status( + run_id: str, + runs_dir: str = typer.Option(".agentflow/runs", envvar="AGENTFLOW_RUNS_DIR"), + output: RunOutputFormat = typer.Option( + RunOutputFormat.AUTO, + "--output", + help="Result output format. Defaults to `summary` on a terminal and `json` otherwise.", + ), +) -> None: + store = _build_store(runs_dir) + record = _get_run_or_exit(store, run_id, runs_dir=runs_dir) + events = [] + get_events = getattr(store, "get_events", None) + if callable(get_events): + events = get_events(run_id) + _echo_status_result(record, events, output=output, run_dir=_run_dir_for_record(store, run_id)) + + @app.command() def cancel( run_id: str, @@ -2354,6 +2854,12 @@ def run( path: str, runs_dir: str = typer.Option(".agentflow/runs", envvar="AGENTFLOW_RUNS_DIR"), max_concurrent_runs: int = typer.Option(2, envvar="AGENTFLOW_MAX_CONCURRENT_RUNS"), + detach: bool = typer.Option( + False, + "--detach", + "-d", + help="Submit the run to the local daemon and exit without waiting for completion.", + ), output: RunOutputFormat = typer.Option( RunOutputFormat.AUTO, "--output", @@ -2377,6 +2883,20 @@ def run( output, show_preflight=show_preflight, ) + if detach: + host = _resolve_daemon_host() + port = _resolve_daemon_port() + metadata_path = _daemon_metadata_path(runs_dir) + base_url = _ensure_daemon( + runs_dir, + max_concurrent_runs, + host=host, + port=port, + metadata_path=metadata_path, + ) + record = _submit_detached_run(pipeline, base_url) + _echo_run_result(record, output=output) + raise typer.Exit(code=0) _run_pipeline(pipeline, runs_dir, max_concurrent_runs, output) diff --git a/agentflow/dsl.py b/agentflow/dsl.py index b62408f..369329c 100644 --- a/agentflow/dsl.py +++ b/agentflow/dsl.py @@ -6,6 +6,7 @@ from contextvars import ContextVar, Token from dataclasses import dataclass, field import json +from pathlib import Path from types import TracebackType from typing import Any @@ -445,11 +446,16 @@ def evolve( "workspace_dir": "{{ pipeline.working_dir }}", } payload_json = json.dumps(payload, ensure_ascii=False) + source_root = Path(__file__).resolve().parents[1] code = ( "import json\n" + "import sys\n" + f"sys.path.insert(0, {json.dumps(str(source_root), ensure_ascii=False)})\n" "from agentflow.tuned_agents import run_evolution_from_payload\n\n" + "def _evolution_progress(event):\n" + " print(json.dumps(event, ensure_ascii=False), file=sys.stderr, flush=True)\n\n" f"payload = json.loads(r'''{payload_json}''')\n" - "result = run_evolution_from_payload(payload)\n" + "result = run_evolution_from_payload(payload, progress=_evolution_progress)\n" "print(json.dumps(result, ensure_ascii=False))\n" ) evolve_task_id = task_id or f"evolve_{profile.replace('-', '_')}" diff --git a/agentflow/graph_optimizer.py b/agentflow/graph_optimizer.py index e2dd31a..0a226b8 100644 --- a/agentflow/graph_optimizer.py +++ b/agentflow/graph_optimizer.py @@ -2,10 +2,13 @@ import json import shutil +import subprocess +import sys from pathlib import Path from pprint import pformat from typing import Any +from agentflow.loader import load_pipeline_from_data from agentflow.specs import PipelineSpec, RunRecord, normalize_agent_name from agentflow.store import RunStore from agentflow.utils import ensure_dir, json_dumps @@ -19,6 +22,7 @@ OPTIMIZER_RESULT_FILENAME = "optimizer-result.json" OPTIMIZER_VALIDATION_FILENAME = "optimizer-validation.json" GRAPH_OPTIMIZER_MAX_ATTEMPTS = 3 +CHILD_PIPELINE_LOAD_TIMEOUT_SECONDS = 5.0 def editable_pipeline_payload(pipeline: PipelineSpec) -> dict[str, Any]: @@ -204,3 +208,45 @@ def write_validation_result(path: Path, *, ok: bool, error: str | None = None) - if error is not None: payload["error"] = error path.write_text(json_dumps(payload), encoding="utf-8") + + +def load_child_pipeline_from_path(path: Path) -> PipelineSpec: + """Load optimizer-edited pipeline as a child-run shape (`optimizer=None`, `n_run=1`).""" + + path = Path(path) + if path.suffix == ".py": + try: + result = subprocess.run( + [sys.executable, str(path)], + capture_output=True, + text=True, + cwd=str(path.parent), + timeout=CHILD_PIPELINE_LOAD_TIMEOUT_SECONDS, + ) + except subprocess.TimeoutExpired as exc: + details: list[str] = [] + if exc.stdout: + details.append(f"stdout:\n{exc.stdout.strip()}") + if exc.stderr: + details.append(f"stderr:\n{exc.stderr.strip()}") + rendered_details = "" if not details else "\n" + "\n\n".join(details) + raise ValueError( + f"pipeline script `{path}` timed out after {CHILD_PIPELINE_LOAD_TIMEOUT_SECONDS:.1f}s.{rendered_details}" + ) from exc + if result.returncode != 0: + raise ValueError(f"pipeline script `{path}` failed:\n{result.stderr.strip()}") + raw_text = result.stdout + else: + raw_text = path.read_text(encoding="utf-8") + + try: + parsed = json.loads(raw_text) + except json.JSONDecodeError as exc: + raise ValueError(f"optimized pipeline `{path}` did not produce JSON: {exc}") from exc + + if not isinstance(parsed, dict): + raise ValueError(f"optimized pipeline `{path}` did not produce an object payload") + + parsed["optimizer"] = None + parsed["n_run"] = 1 + return load_pipeline_from_data(parsed, base_dir=path.parent) diff --git a/agentflow/orchestrator.py b/agentflow/orchestrator.py index 0a1efb5..273cef2 100644 --- a/agentflow/orchestrator.py +++ b/agentflow/orchestrator.py @@ -31,12 +31,12 @@ OPTIMIZER_VALIDATION_FILENAME, build_graph_report, copy_run_traces, + load_child_pipeline_from_path, render_graph_optimizer_prompt, write_editable_pipeline_python, write_optimizer_result, write_validation_result, ) -from agentflow.loader import load_pipeline_from_path from agentflow.prepared import ExecutionPaths, PreparedExecution, build_execution_paths from agentflow.runners.registry import RunnerRegistry, default_runner_registry from agentflow.specs import ( @@ -441,7 +441,7 @@ def _optimizer_failure_summary( write_validation_result(round_dir / OPTIMIZER_VALIDATION_FILENAME, ok=False, error=failure_summary) else: try: - loaded_pipeline = load_pipeline_from_path(pipeline_path) + loaded_pipeline = load_child_pipeline_from_path(pipeline_path) except Exception as exc: failure_summary = _optimizer_failure_summary( "Optimized pipeline", diff --git a/agentflow/tuned_agents.py b/agentflow/tuned_agents.py index 517def1..64238e6 100644 --- a/agentflow/tuned_agents.py +++ b/agentflow/tuned_agents.py @@ -6,7 +6,7 @@ import subprocess from dataclasses import dataclass from pathlib import Path -from typing import Any, Literal +from typing import Any, Callable, Literal from uuid import uuid4 import yaml @@ -641,7 +641,10 @@ def _write_failure_metadata( _write_json(version_dir / "version.json", failed_version.model_dump(mode="json")) -def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: +def run_evolution_from_payload( + payload: dict[str, Any], + progress: Callable[[dict[str, object]], None] | None = None, +) -> dict[str, Any]: request = EvolutionRequest.model_validate(payload) workspace = Path(request.workspace_dir or os.getcwd()).expanduser().resolve() resolved_config = load_tuner_config(workspace, request.profile) @@ -659,6 +662,29 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: if not request.source_nodes: raise ValueError("evolution requires at least one source node") + def _emit_progress( + stage: str, + *, + attempt: int, + status: str | None = None, + command: str | None = None, + detail: str | None = None, + ) -> None: + if progress is None: + return + payload: dict[str, object] = { + "agentflow_event": "evolution_progress", + "stage": stage, + "attempt": attempt, + } + if status is not None: + payload["status"] = status + if command is not None: + payload["command"] = command + if detail is not None: + payload["detail"] = detail + progress(payload) + version_id = uuid4().hex[:12] version_dir = tuned_agent_version_dir(workspace, resolved_config.agent_name, version_id) traces_dir = version_dir / "traces" @@ -680,8 +706,11 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: resolved_executable = _resolved_executable_path(resolved_config.config, repo_workdir) failure_summary: str | None = None + _emit_progress("start", attempt=1) + for attempt_number in range(1, resolved_config.config.max_attempts + 1): attempt_dir = ensure_dir(attempt_root / f"attempt-{attempt_number}") + _emit_progress("attempt", attempt=attempt_number, status="started") prompt = _optimizer_prompt( resolved_config, repo_root=repo_dir, @@ -692,6 +721,7 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: ) _write_text(attempt_dir / "optimizer-prompt.txt", prompt) + _emit_progress("optimizer", attempt=attempt_number, status="started", command="optimizer") optimizer_result = _run_optimizer( optimizer_kind, prompt=prompt, @@ -702,8 +732,16 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: _write_attempt_artifact(attempt_dir, "optimizer", optimizer_result) if optimizer_result.exit_code != 0: failure_summary = _attempt_summary("Optimizer", optimizer_result) + _emit_progress("optimizer", attempt=attempt_number, status="failed", detail=failure_summary) continue + _emit_progress("optimizer", attempt=attempt_number, status="completed") + _emit_progress( + "build", + attempt=attempt_number, + status="started", + command=resolved_config.config.build_command, + ) build_result = _run_shell_command( resolved_config.config.build_command, repo_dir=repo_workdir, @@ -715,8 +753,16 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: _write_attempt_artifact(attempt_dir, "build", build_result) if build_result.exit_code != 0: failure_summary = _attempt_summary("Build", build_result) + _emit_progress("build", attempt=attempt_number, status="failed", detail=failure_summary) continue + _emit_progress("build", attempt=attempt_number, status="completed") + _emit_progress( + "test", + attempt=attempt_number, + status="started", + command=resolved_config.config.test_command, + ) test_result = _run_shell_command( resolved_config.config.test_command, repo_dir=repo_workdir, @@ -728,8 +774,16 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: _write_attempt_artifact(attempt_dir, "test", test_result) if test_result.exit_code != 0: failure_summary = _attempt_summary("Test", test_result) + _emit_progress("test", attempt=attempt_number, status="failed", detail=failure_summary) continue + _emit_progress("test", attempt=attempt_number, status="completed") + _emit_progress( + "smoke", + attempt=attempt_number, + status="started", + command=resolved_config.config.smoke_command, + ) smoke_result = _run_shell_command( resolved_config.config.smoke_command, repo_dir=repo_workdir, @@ -741,14 +795,20 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: _write_attempt_artifact(attempt_dir, "smoke", smoke_result) if smoke_result.exit_code != 0: failure_summary = _attempt_summary("Smoke", smoke_result) + _emit_progress("smoke", attempt=attempt_number, status="failed", detail=failure_summary) continue + _emit_progress("smoke", attempt=attempt_number, status="completed") executable_path = Path(resolved_executable) if not executable_path.exists(): - raise FileNotFoundError( + detail = ( f"successful evolution did not produce executable `{executable_path}`; " "set `executable_path` in the tuner config or make the build produce the default path" ) + _emit_progress("final", attempt=attempt_number, status="failed", detail=detail) + raise FileNotFoundError( + detail + ) version = TunedAgentVersion( id=version_id, @@ -764,6 +824,7 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: summary=_parse_agent_output(optimizer_kind, f"optimizer_{version_id}", optimizer_result.stdout), ) register_tuned_agent_version(workspace, version) + _emit_progress("final", attempt=attempt_number, status="success") return { "ok": True, "agent_name": version.agent_name, @@ -775,6 +836,12 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: "traces": copied_traces, } + _emit_progress( + "final", + attempt=resolved_config.config.max_attempts, + status="failed", + detail=failure_summary or "evolution failed without diagnostics", + ) _write_failure_metadata( version_dir, agent_name=resolved_config.agent_name, diff --git a/scripts/verify_async_codex.sh b/scripts/verify_async_codex.sh new file mode 100755 index 0000000..9a54cf7 --- /dev/null +++ b/scripts/verify_async_codex.sh @@ -0,0 +1,301 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + cat <<'EOF' +verify_async_codex.sh + +Smoke-like validation for the async AgentFlow mainline using Codex only. + +What it verifies: +- Detached submission: `agentflow run ... -d` +- Store-backed process view: `agentflow status ` +- PR11 process visibility: evolution progress rendered in status +- PR12 process visibility: optimization session / round events rendered in status + +Required environment: +- OPENAI_API_KEY +- Optional if you use a custom gateway: + - OPENAI_BASE_URL + - AGENTFLOW_OPENAI_BASE_URL + +Usage: + bash scripts/verify_async_codex.sh + bash scripts/verify_async_codex.sh /tmp/agentflow-async-verify + +Outputs: +- Writes all generated pipelines, run ids, summaries, and json payloads under the chosen workdir. +- Prints the key artifact paths you should use for screenshots in the PR description. +EOF +} + +if [[ "${1:-}" == "--help" || "${1:-}" == "-h" ]]; then + usage + exit 0 +fi + +if [[ -z "${OPENAI_API_KEY:-}" ]]; then + echo "OPENAI_API_KEY is required." >&2 + exit 1 +fi + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" +WORKDIR="${1:-${REPO_ROOT}/.tmp/verify_async_codex}" +ARTIFACT_DIR="${WORKDIR}/artifacts" +PR11_WORKSPACE="${WORKDIR}/pr11_workspace" +PR11_SEED_REPO="${WORKDIR}/pr11_seed_repo" +PR12_WORKSPACE="${WORKDIR}/pr12_workspace" + +if [[ -x "${REPO_ROOT}/.venv/bin/python" ]]; then + PYTHON_BIN="${REPO_ROOT}/.venv/bin/python" +else + PYTHON_BIN="$(command -v python3)" +fi + +choose_port() { + "${PYTHON_BIN}" - <<'PY' +import socket +s = socket.socket() +s.bind(("127.0.0.1", 0)) +print(s.getsockname()[1]) +s.close() +PY +} + +AF() { + "${PYTHON_BIN}" -m agentflow.cli "$@" +} + +json_field() { + local json_path="$1" + local expr="$2" + "${PYTHON_BIN}" - "$json_path" "$expr" <<'PY' +import json +import sys +from pathlib import Path + +json_path = Path(sys.argv[1]) +expr = sys.argv[2] +data = json.loads(json_path.read_text(encoding="utf-8")) +value = eval(expr, {"__builtins__": {}}, {"data": data}) +if isinstance(value, (dict, list)): + print(json.dumps(value, ensure_ascii=False)) +else: + print(value) +PY +} + +wait_for_condition() { + local run_id="$1" + local output_json="$2" + local python_expr="$3" + local timeout_seconds="$4" + local start_ts + start_ts="$(date +%s)" + + while true; do + AF status "$run_id" --output json-summary > "$output_json" + if "${PYTHON_BIN}" - "$output_json" "$python_expr" <<'PY' +import json +import sys +from pathlib import Path + +payload = json.loads(Path(sys.argv[1]).read_text(encoding="utf-8")) +expr = sys.argv[2] +safe_globals = { + "__builtins__": {}, + "len": len, + "any": any, + "all": all, + "bool": bool, +} +ok = bool(eval(expr, safe_globals, {"data": payload})) +raise SystemExit(0 if ok else 1) +PY + then + return 0 + fi + + if (( "$(date +%s)" - start_ts >= timeout_seconds )); then + echo "Timed out waiting for condition on run ${run_id}: ${python_expr}" >&2 + return 1 + fi + sleep 2 + done +} + +wait_for_terminal() { + local run_id="$1" + local output_json="$2" + local timeout_seconds="$3" + wait_for_condition \ + "$run_id" \ + "$output_json" \ + 'data["status"] in {"completed", "failed", "cancelled"}' \ + "$timeout_seconds" +} + +capture_latest_status() { + local run_id="$1" + local json_path="$2" + local summary_path="$3" + AF status "$run_id" --output json-summary > "$json_path" + AF status "$run_id" --output summary > "$summary_path" +} + +best_effort_terminal_snapshot() { + local run_id="$1" + local json_path="$2" + local summary_path="$3" + local timeout_seconds="$4" + + if wait_for_terminal "$run_id" "$json_path" "$timeout_seconds"; then + AF status "$run_id" --output summary > "$summary_path" + return 0 + fi + + capture_latest_status "$run_id" "$json_path" "$summary_path" + return 0 +} + +rm -rf "$ARTIFACT_DIR" "$PR11_WORKSPACE" "$PR11_SEED_REPO" "$PR12_WORKSPACE" "${WORKDIR}/runs" +mkdir -p "$ARTIFACT_DIR" "$PR11_WORKSPACE" "$PR12_WORKSPACE" + +export AGENTFLOW_RUNS_DIR="${WORKDIR}/runs" +export AGENTFLOW_DAEMON_HOST="${AGENTFLOW_DAEMON_HOST:-127.0.0.1}" +export AGENTFLOW_DAEMON_PORT="${AGENTFLOW_DAEMON_PORT:-$(choose_port)}" +export AGENTFLOW_DAEMON_METADATA_PATH="${AGENTFLOW_RUNS_DIR}/daemon.json" +VERIFY_LATEST_TIMEOUT_SECONDS="${VERIFY_LATEST_TIMEOUT_SECONDS:-15}" + +echo "[verify] repo root: ${REPO_ROOT}" +echo "[verify] python: ${PYTHON_BIN}" +echo "[verify] runs dir: ${AGENTFLOW_RUNS_DIR}" +echo "[verify] daemon: ${AGENTFLOW_DAEMON_HOST}:${AGENTFLOW_DAEMON_PORT}" + +echo "[verify] preparing PR11 local seed repo" +rm -rf "$PR11_SEED_REPO" +mkdir -p "$PR11_SEED_REPO" +git -C "$PR11_SEED_REPO" init -b main >/dev/null +cat > "${PR11_SEED_REPO}/README.md" <<'EOF' +# local-codex-smoke +EOF +git -C "$PR11_SEED_REPO" add README.md +git -c user.name="AgentFlow Verify" -c user.email="verify@example.invalid" -C "$PR11_SEED_REPO" commit -m "init local smoke repo" >/dev/null + +mkdir -p "${PR11_WORKSPACE}/agent_tuner" +cat > "${PR11_WORKSPACE}/agent_tuner/local_codex_smoke.yaml" < .venv/bin/codex && chmod +x .venv/bin/codex +test_command: test -f README.md +smoke_command: "{executable} >/dev/null" +executable_path: .venv/bin/codex +max_attempts: 2 +evolution_prompt: | + Make the smallest coherent change you can based on the copied traces. + Keep the repository valid and preserve a working local smoke flow. +EOF + +cat > "${PR11_WORKSPACE}/pr11_evolve_demo.py" < "${PR12_WORKSPACE}/pr12_optimize_demo.py" <> summarize + +print(g.to_json()) +EOF + +echo "[verify] submitting PR11 evolve demo via detached run" +AF run "${PR11_WORKSPACE}/pr11_evolve_demo.py" -d --output json > "${ARTIFACT_DIR}/pr11.run.json" +PR11_RUN_ID="$(json_field "${ARTIFACT_DIR}/pr11.run.json" 'data["id"]')" +echo "[verify] PR11 run id: ${PR11_RUN_ID}" + +wait_for_condition \ + "$PR11_RUN_ID" \ + "${ARTIFACT_DIR}/pr11.status.process.json" \ + 'len(data.get("evolution_progress", [])) >= 3 and any(event.get("stage") == "optimizer" for event in data.get("evolution_progress", []))' \ + 900 +AF status "$PR11_RUN_ID" --output summary > "${ARTIFACT_DIR}/pr11.status.process.summary.txt" + +best_effort_terminal_snapshot \ + "$PR11_RUN_ID" \ + "${ARTIFACT_DIR}/pr11.status.latest.json" \ + "${ARTIFACT_DIR}/pr11.status.latest.summary.txt" \ + "$VERIFY_LATEST_TIMEOUT_SECONDS" +PR11_LATEST_STATUS="$(json_field "${ARTIFACT_DIR}/pr11.status.latest.json" 'data["status"]')" + +echo "[verify] submitting PR12 optimization demo via detached run" +AF run "${PR12_WORKSPACE}/pr12_optimize_demo.py" -d --output json > "${ARTIFACT_DIR}/pr12.run.json" +PR12_RUN_ID="$(json_field "${ARTIFACT_DIR}/pr12.run.json" 'data["id"]')" +echo "[verify] PR12 run id: ${PR12_RUN_ID}" + +wait_for_condition \ + "$PR12_RUN_ID" \ + "${ARTIFACT_DIR}/pr12.status.process.json" \ + 'bool(data.get("optimization")) and any(event.get("type") == "optimization_optimizer_started" for event in data.get("events", []))' \ + 900 +AF status "$PR12_RUN_ID" --output summary > "${ARTIFACT_DIR}/pr12.status.process.summary.txt" + +best_effort_terminal_snapshot \ + "$PR12_RUN_ID" \ + "${ARTIFACT_DIR}/pr12.status.latest.json" \ + "${ARTIFACT_DIR}/pr12.status.latest.summary.txt" \ + "$VERIFY_LATEST_TIMEOUT_SECONDS" +PR12_LATEST_STATUS="$(json_field "${ARTIFACT_DIR}/pr12.status.latest.json" 'data["status"]')" + +cat > "${ARTIFACT_DIR}/verification_report.txt" < None: + return None + + def json(self) -> object: + raise ValueError("not json") + + monkeypatch.setattr(agentflow.cli.httpx, "post", lambda *args, **kwargs: FakeResponse()) + monkeypatch.setattr(agentflow.cli.typer, "echo", lambda message, err=False: messages.append((message, err))) + + with pytest.raises(typer.Exit) as exc_info: + agentflow.cli._submit_detached_run(fake_pipeline, "http://daemon.test") + + assert exc_info.value.exit_code == 1 + assert messages == [("Failed to submit run to daemon at http://daemon.test: invalid JSON response", True)] + + def test_run_defaults_to_summary_on_tty(monkeypatch): class FakeOrchestrator: async def submit(self, pipeline: object): @@ -3160,6 +3408,417 @@ def _missing(run_id: str): assert "Run `missing-run` not found in `.agentflow/runs`." in result.stderr +def test_status_command_exits_for_missing_run(monkeypatch): + def _missing(run_id: str): + raise KeyError(run_id) + + monkeypatch.setattr( + agentflow.cli, + "_build_store", + lambda runs_dir: SimpleNamespace(get_run=_missing), + ) + + result = runner.invoke(app, ["status", "missing-run"]) + + assert result.exit_code == 1 + assert "Run `missing-run` not found in `.agentflow/runs`." in result.stderr + + +def test_status_command_renders_summary_with_recent_events(monkeypatch): + record = _completed_run( + "run-status", + pipeline_name="status-pipeline", + status="running", + pipeline_nodes=[ + SimpleNamespace(id="plan", agent=SimpleNamespace(value="codex")), + SimpleNamespace(id="review", agent=SimpleNamespace(value="claude")), + ], + nodes={ + "plan": SimpleNamespace( + status=SimpleNamespace(value="running"), + current_attempt=2, + attempts=[SimpleNamespace(number=1), SimpleNamespace(number=2)], + stderr_lines=[], + stdout_lines=[], + ), + "review": SimpleNamespace( + status=SimpleNamespace(value="pending"), + current_attempt=0, + attempts=[], + stderr_lines=[], + stdout_lines=[], + ), + }, + ) + record.finished_at = None + events = [ + _run_event("old_event", timestamp="2026-04-12T10:00:00+00:00"), + _run_event("node_started", timestamp="2026-04-12T10:00:01+00:00", node_id="plan"), + _run_event("node_retrying", timestamp="2026-04-12T10:00:02+00:00", node_id="plan", data={"attempt": 2}), + _run_event("node_trace", timestamp="2026-04-12T10:00:03+00:00", node_id="plan"), + _run_event("node_waiting", timestamp="2026-04-12T10:00:04+00:00", node_id="review"), + _run_event("node_skipped", timestamp="2026-04-12T10:00:05+00:00", node_id="review", data={"reason": "upstream_failure"}), + ] + + monkeypatch.setattr( + agentflow.cli, + "_build_store", + lambda runs_dir: SimpleNamespace( + get_run=lambda run_id: record, + get_events=lambda run_id: events, + run_dir=lambda run_id: Path(runs_dir) / run_id, + ), + ) + monkeypatch.setattr(agentflow.cli, "_stream_supports_tty_summary", lambda *, err: True) + + result = runner.invoke(app, ["status", "run-status"]) + + assert result.exit_code == 0 + assert "Run run-status: running" in result.stdout + assert "Pipeline: status-pipeline" in result.stdout + assert "Progress: 1/2 nodes, active 1" in result.stdout + assert "Active: plan (running, attempt 2)" in result.stdout + assert "Recent events:" in result.stdout + assert "old_event" not in result.stdout + for event_type in ("node_started", "node_retrying", "node_trace", "node_waiting", "node_skipped"): + assert event_type in result.stdout + + +def test_status_command_supports_json_summary_output(monkeypatch): + record = _completed_run( + "run-status-json", + pipeline_name="status-pipeline", + status="running", + pipeline_nodes=[ + SimpleNamespace(id="plan", agent=SimpleNamespace(value="codex")), + SimpleNamespace(id="review", agent=SimpleNamespace(value="claude")), + ], + nodes={ + "plan": SimpleNamespace( + status=SimpleNamespace(value="running"), + current_attempt=2, + attempts=[SimpleNamespace(number=1), SimpleNamespace(number=2)], + stderr_lines=[], + stdout_lines=[], + ), + "review": SimpleNamespace( + status=SimpleNamespace(value="pending"), + current_attempt=0, + attempts=[], + stderr_lines=[], + stdout_lines=[], + ), + }, + ) + record.finished_at = None + events = [ + _run_event("run_started", timestamp="2026-04-12T10:00:01+00:00"), + _run_event("node_started", timestamp="2026-04-12T10:00:02+00:00", node_id="plan"), + ] + + monkeypatch.setattr( + agentflow.cli, + "_build_store", + lambda runs_dir: SimpleNamespace( + get_run=lambda run_id: record, + get_events=lambda run_id: events, + run_dir=lambda run_id: Path(runs_dir) / run_id, + ), + ) + + result = runner.invoke(app, ["status", "run-status-json", "--output", "json-summary"]) + + assert result.exit_code == 0 + payload = json.loads(result.stdout) + assert payload["id"] == "run-status-json" + assert [event["type"] for event in payload["events"]] == ["run_started", "node_started"] + assert [event["type"] for event in payload["recent_events"]] == ["run_started", "node_started"] + assert payload["progress"] == { + "total_nodes": 2, + "progressed_nodes": 1, + "active_nodes": [{"id": "plan", "status": "running", "attempt": 2}], + "status_counts": {"running": 1, "pending": 1}, + "progress_percent": 50.0, + } + + +def test_status_command_renders_evolution_progress(monkeypatch): + record = _completed_run( + "run-status-evolve", + pipeline_name="status-pipeline", + status="running", + pipeline_nodes=[ + SimpleNamespace(id="plan", agent=SimpleNamespace(value="codex")), + SimpleNamespace(id="evolve", agent=SimpleNamespace(value="python")), + SimpleNamespace(id="evolve_b", agent=SimpleNamespace(value="python")), + ], + nodes={ + "plan": SimpleNamespace( + status=SimpleNamespace(value="completed"), + current_attempt=1, + attempts=[SimpleNamespace(number=1)], + stderr_lines=[], + stdout_lines=[], + ), + "evolve": SimpleNamespace( + status=SimpleNamespace(value="running"), + current_attempt=1, + attempts=[SimpleNamespace(number=1)], + stderr_lines=[ + "not-json", + json.dumps({"agentflow_event": "evolution_progress", "stage": "start", "attempt": 1}), + json.dumps( + { + "agentflow_event": "evolution_progress", + "stage": "build", + "attempt": 1, + "status": "started", + "command": "build", + } + ), + json.dumps( + { + "agentflow_event": "evolution_progress", + "stage": "build", + "attempt": 1, + "status": "completed", + } + ), + json.dumps( + { + "agentflow_event": "evolution_progress", + "stage": "final", + "attempt": 1, + "status": "success", + } + ), + ], + stdout_lines=[], + ), + "evolve_b": SimpleNamespace( + status=SimpleNamespace(value="running"), + current_attempt=1, + attempts=[SimpleNamespace(number=1)], + stderr_lines=[ + json.dumps( + { + "agentflow_event": "evolution_progress", + "stage": "build", + "attempt": 1, + "status": "started", + "command": "build-b", + } + ) + ], + stdout_lines=[], + ), + }, + ) + record.finished_at = None + events = [] + + monkeypatch.setattr( + agentflow.cli, + "_build_store", + lambda runs_dir: SimpleNamespace( + get_run=lambda run_id: record, + get_events=lambda run_id: events, + run_dir=lambda run_id: Path(runs_dir) / run_id, + ), + ) + monkeypatch.setattr(agentflow.cli, "_stream_supports_tty_summary", lambda *, err: True) + + result = runner.invoke(app, ["status", "run-status-evolve"]) + + assert result.exit_code == 0 + assert "Evolution progress:" in result.stdout + assert "evolve: start (attempt 1)" in result.stdout + assert "evolve: build started (attempt 1, command=build)" in result.stdout + assert "evolve: build completed (attempt 1)" in result.stdout + assert "evolve: final success (attempt 1)" in result.stdout + assert "evolve_b: build started (attempt 1, command=build-b)" in result.stdout + + +def test_status_command_returns_evolution_progress_json(monkeypatch): + record = _completed_run( + "run-status-evolve-json", + pipeline_name="status-pipeline", + status="running", + pipeline_nodes=[ + SimpleNamespace(id="plan", agent=SimpleNamespace(value="codex")), + SimpleNamespace(id="evolve", agent=SimpleNamespace(value="python")), + ], + nodes={ + "plan": SimpleNamespace( + status=SimpleNamespace(value="completed"), + current_attempt=1, + attempts=[SimpleNamespace(number=1)], + stderr_lines=[], + stdout_lines=[], + ), + "evolve": SimpleNamespace( + status=SimpleNamespace(value="running"), + current_attempt=1, + attempts=[SimpleNamespace(number=1)], + stderr_lines=[ + json.dumps({"agentflow_event": "evolution_progress", "stage": "start", "attempt": 1}), + "plain text", + json.dumps( + { + "agentflow_event": "evolution_progress", + "stage": "build", + "attempt": 1, + "status": "started", + "command": "build", + } + ), + json.dumps( + { + "agentflow_event": "evolution_progress", + "stage": "build", + "attempt": 1, + "status": "failed", + "detail": "exit 1", + } + ), + ], + stdout_lines=[], + ), + }, + ) + record.finished_at = None + + monkeypatch.setattr( + agentflow.cli, + "_build_store", + lambda runs_dir: SimpleNamespace( + get_run=lambda run_id: record, + get_events=lambda run_id: [], + run_dir=lambda run_id: Path(runs_dir) / run_id, + ), + ) + + result = runner.invoke(app, ["status", "run-status-evolve-json", "--output", "json-summary"]) + + assert result.exit_code == 0 + payload = json.loads(result.stdout) + assert payload["evolution_progress"] == [ + { + "agentflow_event": "evolution_progress", + "stage": "start", + "attempt": 1, + "node_id": "evolve", + }, + { + "agentflow_event": "evolution_progress", + "stage": "build", + "attempt": 1, + "status": "started", + "command": "build", + "node_id": "evolve", + }, + { + "agentflow_event": "evolution_progress", + "stage": "build", + "attempt": 1, + "status": "failed", + "detail": "exit 1", + "node_id": "evolve", + }, + ] + + +def test_status_command_uses_live_node_trace_for_evolution_progress(monkeypatch): + record = _completed_run( + "run-status-evolve-live", + pipeline_name="status-pipeline", + status="running", + pipeline_nodes=[ + SimpleNamespace(id="evolve", agent=SimpleNamespace(value="python")), + ], + nodes={ + "evolve": SimpleNamespace( + status=SimpleNamespace(value="pending"), + current_attempt=1, + attempts=[SimpleNamespace(number=1)], + stderr_lines=[], + stdout_lines=[], + ), + }, + ) + record.finished_at = None + live_events = [ + _run_event( + "node_trace", + timestamp="2026-04-12T10:00:02+00:00", + node_id="evolve", + data={ + "trace": { + "source": "stderr", + "content": json.dumps( + { + "agentflow_event": "evolution_progress", + "stage": "optimizer", + "attempt": 1, + "status": "started", + "command": "optimizer", + } + ), + } + }, + ) + ] + + monkeypatch.setattr( + agentflow.cli, + "_build_store", + lambda runs_dir: SimpleNamespace( + get_run=lambda run_id: record, + get_events=lambda run_id: live_events, + run_dir=lambda run_id: Path(runs_dir) / run_id, + ), + ) + monkeypatch.setattr(agentflow.cli, "_stream_supports_tty_summary", lambda *, err: True) + + result = runner.invoke(app, ["status", "run-status-evolve-live"]) + + assert result.exit_code == 0 + assert "Evolution progress:" in result.stdout + assert "evolve: optimizer started (attempt 1, command=optimizer)" in result.stdout + + +def test_status_command_shows_optimization_session(monkeypatch): + record = _completed_run( + "run-status-opt", + pipeline_name="status-pipeline", + status="running", + ) + record.finished_at = None + record.optimization_session = { + "kind": "graph", + "optimizer": "codex", + "total_rounds": 3, + "current_round": 2, + "child_run_ids": ["child-1", "child-2"], + } + + monkeypatch.setattr( + agentflow.cli, + "_build_store", + lambda runs_dir: SimpleNamespace( + get_run=lambda run_id: record, + get_events=lambda run_id: [], + run_dir=lambda run_id: Path(runs_dir) / run_id, + ), + ) + monkeypatch.setattr(agentflow.cli, "_stream_supports_tty_summary", lambda *, err: True) + + result = runner.invoke(app, ["status", "run-status-opt"]) + + assert result.exit_code == 0 + assert "Optimization: graph optimizer=codex round 2/3 child_runs=2" in result.stdout + + def test_cancel_outputs_summary_for_existing_run(monkeypatch): captured: dict[str, object] = {} diff --git a/tests/test_graph_optimizer.py b/tests/test_graph_optimizer.py index 89273be..c53b82c 100644 --- a/tests/test_graph_optimizer.py +++ b/tests/test_graph_optimizer.py @@ -2,17 +2,20 @@ import asyncio import json +import subprocess from pathlib import Path import pytest from pydantic import ValidationError from agentflow.graph_optimizer import ( + CHILD_PIPELINE_LOAD_TIMEOUT_SECONDS, GRAPH_OPTIMIZER_MAX_ATTEMPTS, GENERATED_PIPELINE_EDITED_FILENAME, GENERATED_PIPELINE_ORIGINAL_FILENAME, OPTIMIZER_VALIDATION_FILENAME, editable_pipeline_payload, + load_child_pipeline_from_path, render_graph_optimizer_prompt, write_editable_pipeline_python, ) @@ -90,6 +93,28 @@ def test_graph_optimizer_prompt_includes_goal_guardrails_and_validation(tmp_path assert "The resulting pipeline validates cleanly and contains at least one node." in prompt +def test_load_child_pipeline_from_path_times_out_with_debug_output(tmp_path, monkeypatch): + pipeline_path = tmp_path / "pipeline.py" + + def fake_run(*args, **kwargs): + raise subprocess.TimeoutExpired( + cmd=args[0], + timeout=kwargs["timeout"], + output="partial stdout", + stderr="partial stderr", + ) + + monkeypatch.setattr("agentflow.graph_optimizer.subprocess.run", fake_run) + + with pytest.raises(ValueError, match="timed out") as exc_info: + load_child_pipeline_from_path(pipeline_path) + + message = str(exc_info.value) + assert f"{CHILD_PIPELINE_LOAD_TIMEOUT_SECONDS:.1f}s" in message + assert "partial stdout" in message + assert "partial stderr" in message + + def test_orchestrator_runs_graph_optimization_rounds(tmp_path, monkeypatch): orchestrator = make_orchestrator(tmp_path) @@ -216,3 +241,48 @@ def fake_optimizer(_optimizer, *, prompt: str, repo_dir: Path, runtime_dir: Path ) assert validation_payload["ok"] is False assert "failed to load" in validation_payload["error"] + + +def test_orchestrator_normalizes_optimizer_edits_to_iteration_controls(tmp_path, monkeypatch): + orchestrator = make_orchestrator(tmp_path) + + def fake_optimizer(_optimizer, *, prompt: str, repo_dir: Path, runtime_dir: Path, env: dict[str, str]): + pipeline_path = repo_dir / "pipeline.py" + pipeline_path.write_text( + ( + "from __future__ import annotations\n\n" + "import json\n\n" + "PIPELINE = {\n" + " 'name': 'graph-opt-nrun-edit',\n" + f" 'working_dir': {str(tmp_path)!r},\n" + " 'n_run': 3,\n" + " 'nodes': [\n" + " {'id': 'plan', 'agent': 'codex', 'prompt': 'round two'},\n" + " ],\n" + "}\n\n" + "if __name__ == '__main__':\n" + " print(json.dumps(PIPELINE, ensure_ascii=False, indent=2))\n" + ), + encoding="utf-8", + ) + return CommandExecution(command="optimizer", exit_code=0, stdout="updated pipeline", stderr="") + + monkeypatch.setattr("agentflow.orchestrator._run_optimizer", fake_optimizer) + + pipeline = PipelineSpec.model_validate( + { + "name": "graph-opt-nrun-edit", + "working_dir": str(tmp_path), + "optimizer": "codex", + "n_run": 2, + "nodes": [{"id": "plan", "agent": "codex", "prompt": "round one"}], + } + ) + + run = asyncio.run(orchestrator.submit(pipeline)) + completed = asyncio.run(orchestrator.wait(run.id, timeout=5)) + + assert completed.status == RunStatus.COMPLETED + assert completed.nodes["plan"].output == "round two" + assert completed.optimization_session is not None + assert completed.optimization_session["current_round"] == pipeline.n_run diff --git a/tests/test_tuned_agents.py b/tests/test_tuned_agents.py index f2fd69f..80e515f 100644 --- a/tests/test_tuned_agents.py +++ b/tests/test_tuned_agents.py @@ -2,7 +2,6 @@ import asyncio import json -import subprocess from pathlib import Path import agentflow.cli @@ -13,7 +12,7 @@ from agentflow.orchestrator import Orchestrator from agentflow.prepared import ExecutionPaths, PreparedExecution from agentflow.runners.registry import RunnerRegistry -from agentflow.specs import AgentKind, NodeSpec, PipelineSpec, RepoInstructionsMode, RunRecord, RunStatus +from agentflow.specs import AgentKind, NodeSpec, PipelineSpec, RunRecord, RunStatus from agentflow.store import RunStore from agentflow.tuned_agents import ( CommandExecution, @@ -22,10 +21,7 @@ list_tuned_agent_records, load_tuner_config, load_tuned_agent_registry, - _execution_paths, _optimizer_prompt, - _run_optimizer, - _run_prepared, register_tuned_agent_version, ResolvedTunerConfig, TunerConfig, @@ -49,138 +45,6 @@ def prepare(self, node, prompt: str, paths: ExecutionPaths) -> PreparedExecution ) -def test_run_prepared_closes_stdin_when_none(monkeypatch): - captured: dict[str, object] = {} - - def fake_run(*args, **kwargs): - captured["args"] = args - captured["kwargs"] = kwargs - return subprocess.CompletedProcess(args[0], 0, "", "") - - monkeypatch.setattr("agentflow.tuned_agents.subprocess.run", fake_run) - - prepared = PreparedExecution( - command=["echo", "hi"], - env={}, - cwd=".", - trace_kind="codex", - stdin=None, - ) - - _run_prepared(prepared) - - assert captured["kwargs"]["stdin"] is subprocess.DEVNULL - - -def test_run_optimizer_ignores_repo_instructions(monkeypatch, tmp_path): - captured: dict[str, object] = {} - - class FakeAdapter(AgentAdapter): - def prepare(self, node, prompt: str, paths: ExecutionPaths) -> PreparedExecution: - captured["node"] = node - return PreparedExecution( - command=["echo", "optimizer"], - env={}, - cwd=str(tmp_path), - trace_kind="codex", - ) - - def fake_get(_agent): - return FakeAdapter() - - def fake_paths(repo_dir: Path, runtime_dir: Path) -> ExecutionPaths: - return ExecutionPaths( - host_workdir=repo_dir, - host_runtime_dir=runtime_dir, - target_workdir=str(repo_dir), - target_runtime_dir=str(runtime_dir), - app_root=repo_dir, - ) - - def fake_materialize(*_args, **_kwargs) -> None: - return None - - def fake_run_prepared(_prepared: PreparedExecution) -> subprocess.CompletedProcess[str]: - return subprocess.CompletedProcess(["echo"], 0, "", "") - - monkeypatch.setattr("agentflow.tuned_agents.default_adapter_registry.get", fake_get) - monkeypatch.setattr("agentflow.tuned_agents._execution_paths", fake_paths) - monkeypatch.setattr("agentflow.tuned_agents._materialize_runtime_files", fake_materialize) - monkeypatch.setattr("agentflow.tuned_agents._run_prepared", fake_run_prepared) - - _run_optimizer( - AgentKind.CODEX, - prompt="optimize", - repo_dir=tmp_path / "repo", - runtime_dir=tmp_path / "runtime", - env={}, - ) - - assert captured["node"].repo_instructions_mode == RepoInstructionsMode.IGNORE - - -def test_run_optimizer_uses_openai_provider_when_base_url_is_present(monkeypatch, tmp_path): - captured: dict[str, object] = {} - - class FakeAdapter(AgentAdapter): - def prepare(self, node, prompt: str, paths: ExecutionPaths) -> PreparedExecution: - captured["node"] = node - return PreparedExecution( - command=["echo", "optimizer"], - env={}, - cwd=str(tmp_path), - trace_kind="codex", - ) - - def fake_get(_agent): - return FakeAdapter() - - def fake_paths(repo_dir: Path, runtime_dir: Path) -> ExecutionPaths: - return ExecutionPaths( - host_workdir=repo_dir, - host_runtime_dir=runtime_dir, - target_workdir=str(repo_dir), - target_runtime_dir=str(runtime_dir), - app_root=repo_dir, - ) - - def fake_materialize(*_args, **_kwargs) -> None: - return None - - def fake_run_prepared(_prepared: PreparedExecution) -> subprocess.CompletedProcess[str]: - return subprocess.CompletedProcess(["echo"], 0, "", "") - - monkeypatch.setattr("agentflow.tuned_agents.default_adapter_registry.get", fake_get) - monkeypatch.setattr("agentflow.tuned_agents._execution_paths", fake_paths) - monkeypatch.setattr("agentflow.tuned_agents._materialize_runtime_files", fake_materialize) - monkeypatch.setattr("agentflow.tuned_agents._run_prepared", fake_run_prepared) - - _run_optimizer( - AgentKind.CODEX, - prompt="optimize", - repo_dir=tmp_path / "repo", - runtime_dir=tmp_path / "runtime", - env={"AGENTFLOW_OPENAI_BASE_URL": "http://relay.example/openai"}, - ) - - assert captured["node"].provider is not None - assert captured["node"].provider.name == "openai-custom" - assert captured["node"].provider.base_url == "http://relay.example/openai" - assert captured["node"].provider.wire_api == "responses" - assert captured["node"].env["OPENAI_BASE_URL"] == "http://relay.example/openai" - - -def test_execution_paths_resolve_relative_inputs(monkeypatch, tmp_path): - monkeypatch.chdir(tmp_path) - - paths = _execution_paths(Path("repo"), Path("runtime")) - - assert paths.target_workdir == str(tmp_path / "repo") - assert paths.target_runtime_dir == str(tmp_path / "runtime") - assert Path(paths.target_workdir).is_absolute() - assert Path(paths.target_runtime_dir).is_absolute() - - def test_loader_supports_yaml_pipeline(tmp_path): pipeline_path = tmp_path / "pipeline.yaml" pipeline_path.write_text( @@ -218,6 +82,8 @@ def test_evolve_helper_filters_source_nodes_and_builds_payload(): assert evolve_node["depends_on"] == ["plan"] assert '"source_nodes": ["plan"]' in evolve_node["prompt"] assert "{{ nodes.plan.artifacts.trace_jsonl }}" in evolve_node["prompt"] + assert "sys.path.insert(0," in evolve_node["prompt"] + assert "progress=_evolution_progress" in evolve_node["prompt"] def test_resolve_node_for_execution_uses_latest_registry_entry(tmp_path): @@ -383,7 +249,7 @@ def fake_shell(command_template: str, *, repo_dir: Path, version_dir: Path, trac assert Path(result["executable"]).exists() -def test_run_evolution_from_payload_uses_repo_root_for_optimizer_when_workdir_subpath_is_set(tmp_path, monkeypatch): +def test_run_evolution_from_payload_reports_progress(tmp_path, monkeypatch): workspace = tmp_path config_dir = workspace / "agent_tuner" config_dir.mkdir() @@ -393,29 +259,28 @@ def test_run_evolution_from_payload_uses_repo_root_for_optimizer_when_workdir_su "name: codex_tuned", "base_agent: codex", "repo_url: https://example.invalid/repo.git", - "workdir_subpath: codex-rs", "build_command: build", "test_command: test", "smoke_command: smoke", "evolution_prompt: improve the agent", - "executable_path: target/debug/codex", - "max_attempts: 1", + "executable_path: .venv/bin/codex", + "max_attempts: 2", ] ), encoding="utf-8", ) trace_path = workspace / "trace.jsonl" trace_path.write_text('{"kind":"assistant_message","content":"hello"}\n', encoding="utf-8") - captured: dict[str, object] = {} def fake_clone(_config, repo_dir: Path) -> None: - (repo_dir / "codex-rs").mkdir(parents=True) - (repo_dir / "README.md").write_text("root", encoding="utf-8") - (repo_dir / "codex-rs" / "Cargo.toml").write_text("[workspace]\n", encoding="utf-8") + repo_dir.mkdir(parents=True) + (repo_dir / "README.md").write_text("base", encoding="utf-8") + + attempt_state = {"smoke": 0} def fake_optimizer(_optimizer: AgentKind, *, prompt: str, repo_dir: Path, runtime_dir: Path, env: dict[str, str]): - captured["optimizer_repo_dir"] = str(repo_dir) - captured["optimizer_prompt"] = prompt + runtime_dir.mkdir(parents=True, exist_ok=True) + (repo_dir / "README.md").write_text(prompt, encoding="utf-8") return CommandExecution( command="optimizer", exit_code=0, @@ -424,17 +289,25 @@ def fake_optimizer(_optimizer: AgentKind, *, prompt: str, repo_dir: Path, runtim ) def fake_shell(command_template: str, *, repo_dir: Path, version_dir: Path, traces_dir: Path, executable: str, env: dict[str, str]): - captured.setdefault("shell_repo_dirs", []).append(str(repo_dir)) if command_template == "build": executable_path = Path(executable) executable_path.parent.mkdir(parents=True, exist_ok=True) executable_path.write_text("#!/bin/sh\nexit 0\n", encoding="utf-8") + if command_template == "smoke": + attempt_state["smoke"] += 1 + if attempt_state["smoke"] == 1: + return CommandExecution(command="smoke", exit_code=1, stdout="", stderr="ping failed") return CommandExecution(command=command_template, exit_code=0, stdout="ok", stderr="") monkeypatch.setattr("agentflow.tuned_agents._clone_repo", fake_clone) monkeypatch.setattr("agentflow.tuned_agents._run_optimizer", fake_optimizer) monkeypatch.setattr("agentflow.tuned_agents._run_shell_command", fake_shell) + progress: list[dict[str, object]] = [] + + def capture(event: dict[str, object]) -> None: + progress.append(event) + result = run_evolution_from_payload( { "profile": "codex", @@ -443,13 +316,30 @@ def fake_shell(command_template: str, *, repo_dir: Path, version_dir: Path, trac "source_nodes": ["plan"], "trace_paths": {"plan": str(trace_path)}, "workspace_dir": str(workspace), - "run_id": "run123", - } + "run_id": "run-progress", + }, + progress=capture, ) - assert captured["optimizer_repo_dir"] == result["repo_path"] - assert result["workdir"] == str(Path(result["repo_path"]) / "codex-rs") - assert captured["shell_repo_dirs"] == [result["workdir"], result["workdir"], result["workdir"]] + assert result["ok"] is True + assert any(event.get("agentflow_event") == "evolution_progress" for event in progress) + stages = [(event.get("stage"), event.get("status"), event.get("attempt")) for event in progress] + assert stages[0][0] == "start" + assert ("attempt", "started", 1) in stages + assert ("optimizer", "started", 1) in stages + assert ("optimizer", "completed", 1) in stages + assert ("build", "started", 1) in stages + assert ("build", "completed", 1) in stages + assert ("test", "started", 1) in stages + assert ("test", "completed", 1) in stages + assert ("smoke", "started", 1) in stages + assert ("smoke", "failed", 1) in stages + assert ("attempt", "started", 2) in stages + assert ("final", "success", 2) in stages + smoke_failure = next(event for event in progress if event.get("stage") == "smoke" and event.get("status") == "failed") + assert "Smoke" in str(smoke_failure.get("detail", "")) + build_start = next(event for event in progress if event.get("stage") == "build" and event.get("status") == "started") + assert build_start.get("command") == "build" def test_optimizer_prompt_explicitly_allows_prompt_and_tool_edits(tmp_path): @@ -477,8 +367,7 @@ def test_optimizer_prompt_explicitly_allows_prompt_and_tool_edits(tmp_path): prompt = _optimizer_prompt( resolved, - repo_root=tmp_path / "repo", - repo_workdir=tmp_path / "repo", + repo_dir=tmp_path / "repo", traces_dir=tmp_path / "traces", source_nodes=["plan"], previous_failure=None, @@ -490,9 +379,6 @@ def test_optimizer_prompt_explicitly_allows_prompt_and_tool_edits(tmp_path): assert "Known tunable surfaces and implementing files" in prompt assert "core/gpt_5_codex_prompt.md" in prompt assert "tools/src/local_tool.rs" in prompt - assert "Do not write design docs, implementation plans, or other planning artifacts" in prompt - assert "Do not wait for user confirmation" in prompt - assert "Ignore installed process skills such as brainstorming, writing-plans, systematic-debugging, and test-driven-development" in prompt def test_repo_includes_codex_tuner_profile(): @@ -505,19 +391,11 @@ def test_repo_includes_codex_tuner_profile(): assert resolved.config.repo_url == "https://github.com/openai/codex.git" assert resolved.config.workdir_subpath == "codex-rs" assert resolved.config.executable_path == "target/debug/codex" - assert resolved.config.env["AGENTFLOW_CODEX_SANDBOX_MODE"] == "danger-full-access" assert "System prompts" in resolved.config.evolution_prompt assert "tool descriptions" in resolved.config.evolution_prompt assert len(resolved.config.tunable_surfaces) >= 10 - # The first surface must be the real BASE_INSTRUCTIONS prompt that gets - # baked into the binary via `include_str!` in models-manager — not the - # legacy `core/gpt_5_codex_prompt.md` documentation files which never made - # it into the compiled binary. - # The primary surface is now the agentflow-side wrapper file, because - # gateway-proxied model providers can override server-side prompts and the - # wrapper is concatenated into the user message by agentflow before invoke. - assert resolved.config.tunable_surfaces[0].name.startswith("Agentflow-side prompt wrapper") - assert "codex-rs/agentflow_wrapper.md" in resolved.config.tunable_surfaces[0].paths + assert resolved.config.tunable_surfaces[0].name == "Base model prompts and prompt assembly" + assert "core/gpt_5_codex_prompt.md" in resolved.config.tunable_surfaces[0].paths def test_cli_lists_tuned_agents(tmp_path, capsys): diff --git a/tests/test_validation_script.py b/tests/test_validation_script.py new file mode 100644 index 0000000..fe76695 --- /dev/null +++ b/tests/test_validation_script.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import subprocess +from pathlib import Path + + +def test_verify_async_codex_script_help() -> None: + repo_root = Path(__file__).resolve().parents[1] + script_path = repo_root / "scripts" / "verify_async_codex.sh" + + assert script_path.exists() + + completed = subprocess.run( + ["bash", str(script_path), "--help"], + capture_output=True, + text=True, + check=False, + ) + + assert completed.returncode == 0 + assert "PR11" in completed.stdout + assert "PR12" in completed.stdout + assert "OPENAI_API_KEY" in completed.stdout + + +def test_verify_async_codex_script_sets_local_git_identity() -> None: + repo_root = Path(__file__).resolve().parents[1] + script_path = repo_root / "scripts" / "verify_async_codex.sh" + + script_text = script_path.read_text(encoding="utf-8") + + assert 'user.name="AgentFlow Verify"' in script_text + assert 'user.email="verify@example.invalid"' in script_text