diff --git a/README.md b/README.md
index a025d72..d3be943 100644
--- a/README.md
+++ b/README.md
@@ -39,6 +39,12 @@ It simulates what actually happens when you deploy coding agents: multi-turn con
Works against any OpenAI-compatible endpoint — vLLM, TGI, OpenAI, Ollama, LMStudio.
+> **Inspired by** Fabian Wesner's [One-Shot Shop Challenge](https://agentic-engineers.dev)
+> ([announcement](https://www.linkedin.com/posts/fabian-wesner_oneshotshop-share-7442096217976897536-SRI9/)) —
+> the study that showed orchestration architecture beats model choice (Team Mode 85% vs Sub-Agents 57% on the same model).
+> Pawbench's orchestration × complexity matrix operationalizes that finding inside a reproducible benchmark.
+> See [spec 009](https://github.com/zenprocess/switchyard/blob/main/specs/009-pawbench-orchestration-axis/spec.md).
+
## Meet Lola
@@ -84,7 +90,7 @@ pawbench --scenario my_scenario.json
## What It Measures
-### 4 Dimensions
+### 4 Dimensions + Spec 009 Matrix
| Dimension | Metrics |
|---|---|
@@ -92,6 +98,22 @@ pawbench --scenario my_scenario.json
| **Quality** | Tool call accuracy, instruction following, format compliance, keyword matching |
| **Efficiency** | Useful token ratio (code in tool args vs filler preamble), tokens per turn |
| **Adaptability** | Steering event response, mid-conversation context injection, nudge quality delta |
+| **Artifact Quality** *(spec 009)* | Static analysis over changed files (ruff/mypy/radon for Python, generic fallback otherwise). Orthogonal to AC pass. |
+| **Complexity Tier** *(spec 009)* | Per-task tagging — `display` / `crud` / `transactional` / `cross_cutting` — with stratified `quality_by_tier` reporting. |
+| **Orchestration Shape** *(spec 009)* | Same scenario × 5 shapes (`flat` / `waves` / `scatter-gather` / `team-mode` / `subagents`) → `orchestration_dqs_spread` SLI. |
+| **DQS** *(spec 009)* | Composite Dispatch Quality Score v1.0.0 with auditable weights + post-hoc ablation matrix. |
+
+### New flags (spec 009)
+
+```bash
+pawbench --orchestration flat,waves,scatter-gather,team-mode,subagents
+pawbench --ablate quality,format_compliance,tool_accuracy,useful_ratio,steering_rate
+pawbench --context-tier manifest-only
+pawbench --verification-runs 2
+pawbench --no-quality-analysis
+```
+
+The orchestration matrix scenario (`pawstyle-orchestration-matrix`) is designed to differentiate shapes — four independent feature blocks, one per complexity tier. Inspired by Fabian Wesner's [One-Shot Shop Challenge](https://agentic-engineers.dev) (orchestration > model). See [switchyard spec 009](https://github.com/zenprocess/switchyard/blob/main/specs/009-pawbench-orchestration-axis/spec.md).
diff --git a/src/pawbench/ablation.py b/src/pawbench/ablation.py
new file mode 100644
index 0000000..db505ae
--- /dev/null
+++ b/src/pawbench/ablation.py
@@ -0,0 +1,150 @@
+"""Ablation matrix — spec 009 / B7.
+
+The publishable counter-intuitive: Fabian's study showed quality-focused
+instructions *decreased* performance and the fastest build scored worst.
+Pawbench has accumulated five scoring/quality components over time, none
+of which have ever been measured in isolation.
+
+The ablation runner takes a base BenchmarkReport and recomputes DQS with
+one component disabled at a time, returning the delta. Components with
+consistently negative deltas across consecutive runs are removal
+candidates.
+
+This module is **pure** — it operates on already-collected results, never
+re-runs the benchmark. That keeps ablation cheap (no extra GPU time) and
+deterministic (same inputs → same deltas).
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from typing import Any
+
+from pawbench.dqs import DQSBreakdown, compute_dqs
+
+# Components that can be ablated. Each maps to a kwarg of compute_dqs that
+# gets pinned to a "neutral" value (1.0 = component contributes its max,
+# i.e., it's invisible to the comparison; 0.0 = component is silenced).
+# We use 1.0-pinning so the ablation answers "what if this component
+# always reported success", which is the meaningful counterfactual: a
+# component is dead weight if its absence doesn't reduce the score.
+ABLATABLE_COMPONENTS: dict[str, str] = {
+ "format_compliance": "format_compliance",
+ "tool_accuracy": "tool_accuracy",
+ "useful_ratio": "useful_ratio",
+ "steering_rate": "steering_rate",
+ "quality": "quality",
+}
+
+
+@dataclass
+class AblationDelta:
+ """Per-component DQS delta from disabling that component."""
+
+ component: str
+ baseline_dqs: float
+ ablated_dqs: float
+ delta: float
+ interpretation: str
+
+ def to_dict(self) -> dict[str, Any]:
+ return {
+ "component": self.component,
+ "baseline_dqs": round(self.baseline_dqs, 4),
+ "ablated_dqs": round(self.ablated_dqs, 4),
+ "delta": round(self.delta, 4),
+ "interpretation": self.interpretation,
+ }
+
+
+@dataclass
+class AblationReport:
+ """Complete ablation matrix for a single scenario or scenario aggregate."""
+
+ scenario_id: str
+ baseline: DQSBreakdown
+ deltas: list[AblationDelta] = field(default_factory=list)
+
+ def to_dict(self) -> dict[str, Any]:
+ return {
+ "scenario_id": self.scenario_id,
+ "baseline": self.baseline.to_dict(),
+ "deltas": [d.to_dict() for d in self.deltas],
+ "removal_candidates": [d.component for d in self.deltas if d.delta <= 0.0],
+ }
+
+
+def _interpret(delta: float) -> str:
+ """Plain-English read of a delta. Conservative thresholds."""
+ if delta > 0.05:
+ return "load-bearing — component contributes meaningfully"
+ if delta > 0.01:
+ return "marginal — small but real contribution"
+ if delta > -0.01:
+ return "neutral — within noise floor"
+ return "DEAD WEIGHT — score improves when component is silenced"
+
+
+def ablate(
+ *,
+ scenario_id: str,
+ quality: float,
+ format_compliance: float,
+ tool_accuracy: float,
+ useful_ratio: float,
+ steering_rate: float,
+ components: list[str] | None = None,
+) -> AblationReport:
+ """Compute the ablation delta for each requested component.
+
+ `components=None` means "all ablatable components". Unknown components
+ are ignored (logged into the interpretation field).
+
+ Pinning convention: when a component is "ablated", its input is set to
+ 1.0 — i.e., we ask "what if this signal were perfect?". A negative
+ delta then means: turning the signal perfect made the score *worse*,
+ which is impossible under the current additive formula. So this
+ counterfactual surfaces components that are *zero-information*: if
+ pinning them to 1.0 gives the same score as the real value, the
+ component is contributing nothing in this run.
+ """
+ base_inputs = {
+ "quality": quality,
+ "format_compliance": format_compliance,
+ "tool_accuracy": tool_accuracy,
+ "useful_ratio": useful_ratio,
+ "steering_rate": steering_rate,
+ }
+ baseline = compute_dqs(**base_inputs)
+
+ requested = components if components else list(ABLATABLE_COMPONENTS)
+ deltas: list[AblationDelta] = []
+
+ for comp in requested:
+ if comp not in ABLATABLE_COMPONENTS:
+ deltas.append(
+ AblationDelta(
+ component=comp,
+ baseline_dqs=baseline.composite,
+ ablated_dqs=baseline.composite,
+ delta=0.0,
+ interpretation=f"unknown component '{comp}' — skipped",
+ )
+ )
+ continue
+
+ ablated_inputs = dict(base_inputs)
+ ablated_inputs[comp] = 1.0 # pin to perfect
+ ablated = compute_dqs(**ablated_inputs)
+ delta = ablated.composite - baseline.composite
+ deltas.append(
+ AblationDelta(
+ component=comp,
+ baseline_dqs=baseline.composite,
+ ablated_dqs=ablated.composite,
+ delta=delta,
+ interpretation=_interpret(delta),
+ )
+ )
+
+ return AblationReport(scenario_id=scenario_id, baseline=baseline, deltas=deltas)
diff --git a/src/pawbench/cli.py b/src/pawbench/cli.py
index f4275ea..391c926 100644
--- a/src/pawbench/cli.py
+++ b/src/pawbench/cli.py
@@ -12,8 +12,10 @@
import requests
from pawbench import __version__
+from pawbench.ablation import ablate
from pawbench.banner import print_banner
from pawbench.capture import capture_model_card, scrape_server_metrics
+from pawbench.dqs import compute_dqs, dqs_spread
from pawbench.engine import run_parallel_dispatch, run_saturation_test
from pawbench.mock import (
MockEndpoint,
@@ -21,13 +23,17 @@
load_fixture,
save_fixture,
)
+from pawbench.orchestration import OrchestrationShape, run_with_shape
+from pawbench.quality import analyze_artifact
from pawbench.report import print_report
-from pawbench.scoring import useful_ratio
+from pawbench.scoring import quality_by_tier, useful_ratio
from pawbench.types import BenchmarkReport, ScenarioReport
SCENARIOS_DIR = Path(__file__).parent / "scenarios"
FIXTURES_DIR = Path(__file__).parent / "fixtures"
+from pawbench.context_tier import apply_context_tier as _apply_context_tier # noqa: E402
+
def _get_model(endpoint: str) -> str:
try:
@@ -143,6 +149,36 @@ def main():
help="Export results as a ServingCard file (.json or .yaml) for servingcard.dev",
)
parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}")
+ # Spec 009 — orchestration × complexity matrix
+ parser.add_argument(
+ "--orchestration",
+ default=None,
+ help="Comma-separated orchestration shapes (flat,waves,scatter-gather,team-mode,subagents). "
+ "Runs the same scenario under each shape and reports per-shape DQS + spread.",
+ )
+ parser.add_argument(
+ "--ablate",
+ default=None,
+ help="Comma-separated component names to ablate (quality,format_compliance,tool_accuracy,"
+ "useful_ratio,steering_rate). Recomputes DQS with each component pinned to perfect.",
+ )
+ parser.add_argument(
+ "--context-tier",
+ default="standard",
+ choices=["standard", "manifest-only"],
+ help="Spec 009/B6 — manifest-only strips embedded code from prompts to test exploration-driven solving.",
+ )
+ parser.add_argument(
+ "--verification-runs",
+ type=int,
+ default=1,
+ help="Spec 009/B3 — re-score the same outputs N times to measure verifier reliability (>=1).",
+ )
+ parser.add_argument(
+ "--no-quality-analysis",
+ action="store_true",
+ help="Skip artifact_quality static analysis (spec 009/B4). Useful in CI without ruff/mypy.",
+ )
args = parser.parse_args()
conc_levels = [int(c) for c in args.concurrency.split(",")]
@@ -177,9 +213,21 @@ def main():
if not args.saturation_only:
scenario_paths = [Path(p) for p in args.scenario] if args.scenario else None
scenarios = _load_scenarios(scenario_paths)
+ # Spec 009/B6 — apply context tier transformation
+ scenarios = [_apply_context_tier(s, args.context_tier) for s in scenarios]
if not args.json:
- print(f" Scenarios: {len(scenarios)}")
+ print(f" Scenarios: {len(scenarios)} | Context tier: {args.context_tier}")
+
+ # Per-scenario aggregate state for spec 009 reporting
+ scenario_artifact_quality: list[dict] = []
+ scenario_quality_by_tier: list[dict] = []
+ scenario_orchestration: list[dict] = []
+ scenario_dqs: list[float] = []
+
+ orchestration_shapes: list[OrchestrationShape] = []
+ if args.orchestration:
+ orchestration_shapes = [OrchestrationShape.parse(s) for s in args.orchestration.split(",") if s.strip()]
for scenario in scenarios:
if not args.json:
@@ -191,8 +239,58 @@ def main():
for cr in crs:
all_concurrency_data.setdefault(cr.concurrency, []).append(cr)
+ # Spec 009/B2 — per-tier quality breakdown
+ all_agents_for_tier = [a for cr in crs for a in cr.agents if not a.error]
+ tier_breakdown = quality_by_tier(all_agents_for_tier, scenario)
+ if tier_breakdown:
+ scenario_quality_by_tier.append({"scenario_id": scenario["id"], "by_tier": tier_breakdown})
+
+ # Spec 009/B4 — artifact quality analysis on collected tool calls
+ if not args.no_quality_analysis:
+ all_tool_calls = [tc for a in all_agents_for_tier for t in a.turns for tc in t.tool_calls]
+ aq = analyze_artifact(all_tool_calls)
+ scenario_artifact_quality.append({"scenario_id": scenario["id"], **aq.__dict__})
+
+ # Spec 009/B1 — orchestration matrix (re-runs scenario per shape)
+ if orchestration_shapes and not args.mock:
+ shape_results = []
+ shape_dqs_list: list[float] = []
+ for shape in orchestration_shapes:
+ res = asyncio.run(run_with_shape(args.endpoint, model, scenario, shape))
+ shape_results.append(res.to_dict())
+ # Quick DQS on the orchestration result using its avg_quality
+ bd = compute_dqs(
+ quality=res.avg_quality,
+ format_compliance=sr.format_compliance_rate,
+ tool_accuracy=sr.tool_accuracy,
+ useful_ratio=sr.useful_ratio,
+ steering_rate=sr.steering_rate,
+ )
+ shape_dqs_list.append(bd.composite)
+ scenario_orchestration.append(
+ {
+ "scenario_id": scenario["id"],
+ "shapes": shape_results,
+ "dqs_per_shape": dict(zip([s.value for s in orchestration_shapes], shape_dqs_list)),
+ "dqs_spread": dqs_spread(shape_dqs_list),
+ }
+ )
+
+ # Per-scenario DQS
+ sd = compute_dqs(
+ quality=sr.avg_quality,
+ format_compliance=sr.format_compliance_rate,
+ tool_accuracy=sr.tool_accuracy,
+ useful_ratio=sr.useful_ratio,
+ steering_rate=sr.steering_rate,
+ )
+ scenario_dqs.append(sd.composite)
+
if not args.json:
- print(f" tok/s={sr.single_tok_s:.1f} quality={sr.avg_quality:.0%} steer={sr.steering_rate:.0%}")
+ print(
+ f" tok/s={sr.single_tok_s:.1f} quality={sr.avg_quality:.0%}"
+ f" steer={sr.steering_rate:.0%} dqs={sd.composite:.2f}"
+ )
# Raw saturation test
saturation_curve = []
@@ -290,6 +388,63 @@ def main():
},
)
+ # Spec 009 — attach orchestration × complexity matrix outputs
+ if not args.saturation_only and valid_scenarios:
+ # B4 — artifact quality (single aggregate row across scenarios)
+ if scenario_artifact_quality:
+ report.dim5_artifact_quality = {
+ "version": "spec-009",
+ "per_scenario": scenario_artifact_quality,
+ "aggregate_score": sum(r.get("score", 0.0) for r in scenario_artifact_quality)
+ / len(scenario_artifact_quality),
+ }
+ # B2 — quality_by_tier aggregate
+ agg_tiers: dict[str, list[float]] = {}
+ for entry in scenario_quality_by_tier:
+ for tier_name, score in entry["by_tier"].items():
+ agg_tiers.setdefault(tier_name, []).append(score)
+ report.quality_by_tier = {t: sum(v) / len(v) for t, v in agg_tiers.items()}
+ # B1 — orchestration matrix
+ if scenario_orchestration:
+ report.orchestration_results = scenario_orchestration
+ spreads = [e["dqs_spread"] for e in scenario_orchestration]
+ report.orchestration_dqs_spread = max(spreads) if spreads else 0.0
+ # DQS aggregate
+ agg_dqs = compute_dqs(
+ quality=report.dim2_quality["avg_quality"],
+ format_compliance=report.dim2_quality["format_compliance_rate"],
+ tool_accuracy=report.dim2_quality["tool_accuracy"],
+ useful_ratio=report.dim3_efficiency["avg_useful_ratio"],
+ steering_rate=report.dim4_adaptability["steering_rate"],
+ )
+ report.dqs = agg_dqs.to_dict()
+ # B7 — ablation matrix
+ if args.ablate is not None:
+ requested = [c.strip() for c in args.ablate.split(",") if c.strip()] if args.ablate else None
+ ab = ablate(
+ scenario_id="aggregate",
+ quality=report.dim2_quality["avg_quality"],
+ format_compliance=report.dim2_quality["format_compliance_rate"],
+ tool_accuracy=report.dim2_quality["tool_accuracy"],
+ useful_ratio=report.dim3_efficiency["avg_useful_ratio"],
+ steering_rate=report.dim4_adaptability["steering_rate"],
+ components=requested,
+ )
+ report.ablation = ab.to_dict()
+ # B3 — verification reliability via N-run scoring agreement
+ if args.verification_runs > 1:
+ # Scoring is deterministic given outputs, so the only "disagreement"
+ # source is non-determinism in score_turn (none today). We still
+ # surface the field so downstream consumers can plug in real
+ # LLM-judged verification later. Records: N runs, all agreeing.
+ report.dqs.setdefault("verification", {})
+ report.dqs["verification"] = {
+ "runs": args.verification_runs,
+ "agreement_rate": 1.0,
+ "notes": "deterministic scoring — agreement_rate is 1.0 by construction. "
+ "Plug in an LLM judge to surface real verifier flake.",
+ }
+
if args.json:
print(json.dumps(asdict(report), indent=2, default=str))
else:
diff --git a/src/pawbench/complexity.py b/src/pawbench/complexity.py
new file mode 100644
index 0000000..1d0e531
--- /dev/null
+++ b/src/pawbench/complexity.py
@@ -0,0 +1,127 @@
+"""Complexity tier taxonomy — spec 009 / B2.
+
+Stratifies scenario tasks so aggregate scores don't mask the cliff that
+Fabian Wesner's One-Shot Shop study surfaced: display tasks pass everywhere,
+transactional flows expose architectural weakness immediately.
+
+Canonical vocabulary lives in Axiom §17.2.
+"""
+
+from __future__ import annotations
+
+from enum import Enum
+from typing import Any
+
+
+class ComplexityTier(str, Enum):
+ """Canonical complexity tiers (Axiom §17.2)."""
+
+ DISPLAY = "display" # Read-only render of existing data
+ CRUD = "crud" # Single-entity create/read/update/delete
+ TRANSACTIONAL = "transactional" # Multi-entity flow with invariants
+ CROSS_CUTTING = "cross_cutting" # Spans multiple subsystems
+
+ @classmethod
+ def parse(cls, value: str | None) -> "ComplexityTier | None":
+ if not value:
+ return None
+ try:
+ return cls(value.lower().replace("-", "_"))
+ except ValueError:
+ return None
+
+
+# Heuristic keyword inference for legacy scenarios that pre-date tier tagging.
+# Used only as a fallback when scenarios don't carry an explicit tier.
+_TIER_KEYWORDS: dict[ComplexityTier, tuple[str, ...]] = {
+ ComplexityTier.CROSS_CUTTING: (
+ "auth",
+ "payment",
+ "checkout",
+ "email",
+ "webhook",
+ "subscription",
+ "oauth",
+ "saml",
+ "sso",
+ "rbac",
+ "audit log",
+ ),
+ ComplexityTier.TRANSACTIONAL: (
+ "transaction",
+ "rollback",
+ "atomic",
+ "invariant",
+ "transfer",
+ "checkout",
+ "booking",
+ "reservation",
+ "two-phase",
+ "saga",
+ ),
+ ComplexityTier.CRUD: (
+ "create",
+ "update",
+ "delete",
+ "validation",
+ "endpoint",
+ "rest",
+ "api",
+ "post",
+ "put",
+ "patch",
+ "crud",
+ ),
+ ComplexityTier.DISPLAY: (
+ "render",
+ "display",
+ "list",
+ "show",
+ "view",
+ "page",
+ "grid",
+ "card",
+ "html",
+ "css",
+ ),
+}
+
+
+def infer_tier(text: str) -> ComplexityTier:
+ """Heuristic tier inference for un-tagged scenarios.
+
+ Walks tiers from most-complex to least-complex and returns the first
+ match. Default falls back to CRUD because it's the largest bucket in
+ practice — display tier is often miscounted by keyword matches alone.
+ """
+ if not text:
+ return ComplexityTier.CRUD
+ lowered = text.lower()
+ for tier in (
+ ComplexityTier.CROSS_CUTTING,
+ ComplexityTier.TRANSACTIONAL,
+ ComplexityTier.CRUD,
+ ComplexityTier.DISPLAY,
+ ):
+ if any(kw in lowered for kw in _TIER_KEYWORDS[tier]):
+ return tier
+ return ComplexityTier.CRUD
+
+
+def tier_for_turn(turn_spec: dict[str, Any]) -> ComplexityTier:
+ """Resolve the complexity tier for a single turn spec.
+
+ Priority:
+ 1. Explicit `complexity_tier` field on the turn.
+ 2. Inherited from the parent scenario (handled by caller).
+ 3. Heuristic inference from the turn's content.
+ """
+ explicit = ComplexityTier.parse(turn_spec.get("complexity_tier"))
+ if explicit is not None:
+ return explicit
+ return infer_tier(turn_spec.get("content", ""))
+
+
+def tier_for_scenario(scenario: dict[str, Any]) -> ComplexityTier | None:
+ """Resolve a scenario-level default tier from explicit metadata."""
+ return ComplexityTier.parse(scenario.get("complexity_tier"))
diff --git a/src/pawbench/context_tier.py b/src/pawbench/context_tier.py
new file mode 100644
index 0000000..36c8b89
--- /dev/null
+++ b/src/pawbench/context_tier.py
@@ -0,0 +1,46 @@
+"""Context tier transformations — spec 009 / B6.
+
+Manifest-only mode strips embedded code from prompts so we can test whether
+agents can solve hard tasks via exploration when given only file inventories.
+Standard mode is a passthrough.
+
+Lives in its own module so tests and tools can import it without pulling in
+the full benchmark engine (which depends on aiohttp).
+"""
+
+from __future__ import annotations
+
+import copy
+import re
+from typing import Any
+
+_FENCED_CODE_RE = re.compile(r"```[\s\S]*?```")
+_LONG_LITERAL_RE = re.compile(r"\{[^{}]{200,}\}")
+
+CONTEXT_TIERS = ("standard", "manifest-only")
+
+
+def strip_code_from_content(text: str) -> str:
+ """Remove fenced code blocks and long inline literals from prompt text."""
+ text = _FENCED_CODE_RE.sub("[code block removed — manifest-only mode]", text)
+ text = _LONG_LITERAL_RE.sub("[long literal removed — manifest-only mode]", text)
+ return text
+
+
+def apply_context_tier(scenario: dict[str, Any], tier: str) -> dict[str, Any]:
+ """Return a scenario transformed for the requested context tier.
+
+ `standard` returns the input unchanged (identity preserved).
+ `manifest-only` returns a deep copy with code stripped from every turn.
+ """
+ if tier == "standard":
+ return scenario
+ if tier not in CONTEXT_TIERS:
+ raise ValueError(f"unknown context tier: {tier!r}; valid: {CONTEXT_TIERS}")
+ out = copy.deepcopy(scenario)
+ for agent in out.get("agents", []):
+ for turn in agent.get("turns", []):
+ content = turn.get("content")
+ if isinstance(content, str):
+ turn["content"] = strip_code_from_content(content)
+ return out
diff --git a/src/pawbench/dqs.py b/src/pawbench/dqs.py
new file mode 100644
index 0000000..b4ea347
--- /dev/null
+++ b/src/pawbench/dqs.py
@@ -0,0 +1,103 @@
+"""Dispatch Quality Score (DQS) — composite scoring for Pawbench results.
+
+DQS is intentionally simple, transparent, and version-pinned. Every change
+to the formula bumps the version so historical results stay comparable.
+
+This is *not* the same DQS that lives in Switchyard's optimizer — it's the
+Pawbench-side composite that aggregates the four existing dimensions plus
+the new spec 009 axes (complexity tier, artifact quality, verifier
+agreement). The Switchyard DQS is per-dispatch; the Pawbench DQS is
+per-scenario-run.
+
+Formula (DQS v1):
+ DQS = 0.50 * quality
+ + 0.20 * format_compliance
+ + 0.15 * tool_accuracy
+ + 0.10 * useful_ratio
+ + 0.05 * steering_rate
+
+Artifact quality and verifier agreement are reported alongside DQS but
+NOT folded in until calibration data justifies it (spec 009 §B4 explicit
+requirement).
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Any
+
+DQS_VERSION = "1.0.0"
+
+
+@dataclass
+class DQSBreakdown:
+ """Auditable view of a DQS computation."""
+
+ quality: float = 0.0
+ format_compliance: float = 0.0
+ tool_accuracy: float = 0.0
+ useful_ratio: float = 0.0
+ steering_rate: float = 0.0
+ composite: float = 0.0
+ version: str = DQS_VERSION
+
+ def to_dict(self) -> dict[str, Any]:
+ return {
+ "version": self.version,
+ "composite": round(self.composite, 4),
+ "components": {
+ "quality": round(self.quality, 4),
+ "format_compliance": round(self.format_compliance, 4),
+ "tool_accuracy": round(self.tool_accuracy, 4),
+ "useful_ratio": round(self.useful_ratio, 4),
+ "steering_rate": round(self.steering_rate, 4),
+ },
+ "weights": {
+ "quality": 0.50,
+ "format_compliance": 0.20,
+ "tool_accuracy": 0.15,
+ "useful_ratio": 0.10,
+ "steering_rate": 0.05,
+ },
+ }
+
+
+def compute_dqs(
+ *,
+ quality: float,
+ format_compliance: float,
+ tool_accuracy: float,
+ useful_ratio: float,
+ steering_rate: float,
+) -> DQSBreakdown:
+ """Compute DQS from per-scenario aggregate metrics. All inputs in 0..1."""
+
+ def _clamp(x: float) -> float:
+ return max(0.0, min(1.0, float(x)))
+
+ q = _clamp(quality)
+ f = _clamp(format_compliance)
+ t = _clamp(tool_accuracy)
+ u = _clamp(useful_ratio)
+ s = _clamp(steering_rate)
+
+ composite = 0.50 * q + 0.20 * f + 0.15 * t + 0.10 * u + 0.05 * s
+ return DQSBreakdown(
+ quality=q,
+ format_compliance=f,
+ tool_accuracy=t,
+ useful_ratio=u,
+ steering_rate=s,
+ composite=composite,
+ )
+
+
+def dqs_spread(scores: list[float]) -> float:
+ """Max − min across a list of DQS values. The headline orchestration SLI.
+
+ Spec 009 §5: high spread means orchestration shape mattered more than
+ model — exactly the One-Shot Shop finding, re-derived from our data.
+ """
+ if not scores:
+ return 0.0
+ return max(scores) - min(scores)
diff --git a/src/pawbench/orchestration.py b/src/pawbench/orchestration.py
new file mode 100644
index 0000000..e65ca9e
--- /dev/null
+++ b/src/pawbench/orchestration.py
@@ -0,0 +1,229 @@
+"""Orchestration shapes — spec 009 / B1.
+
+The headline borrowable from Fabian Wesner's One-Shot Shop study:
+**orchestration architecture beats model choice** (Team Mode 85% vs
+Sub-Agents 57% on the same model). To re-derive that finding inside our
+own benchmark we vary orchestration shape as a first-class axis.
+
+Canonical vocabulary lives in Axiom §17.1. Pawbench implements three
+shapes with distinct execution semantics today and stubs the other two
+to fall through to `subagents` until we nail their operational contract:
+
+ flat — single dispatch, single agent, no parallelism. Baseline.
+ subagents — N agents in parallel, no merge. Pawbench's classic mode.
+ scatter-gather — N agents in parallel, then a synthesis turn that sees
+ every agent's outputs (a merge step). The presence of
+ the merge is what differentiates "Team Mode" from
+ "Sub-Agents" in Fabian's study.
+ waves — currently identical to subagents; reserved for future
+ DAG-aware execution (cluster_tasks-style coloring).
+ team-mode — currently identical to scatter-gather; reserved for
+ real shared-scratchpad coordination.
+
+Pawbench's value-add here is **measurement**, not orchestration product
+features. We do not claim to *be* a multi-agent framework; we claim to
+score them on a level field.
+"""
+
+from __future__ import annotations
+
+import time
+from dataclasses import dataclass, field
+from enum import Enum
+from typing import Any
+
+from pawbench.types import AgentResult
+
+# aiohttp + engine are imported lazily inside run_with_shape so this module's
+# pure helpers (parse, _build_merge_agent, OrchestrationResult) can be unit
+# tested without the network stack.
+
+
+class OrchestrationShape(str, Enum):
+ """Canonical orchestration vocabulary (Axiom §17.1)."""
+
+ FLAT = "flat"
+ WAVES = "waves"
+ SCATTER_GATHER = "scatter-gather"
+ TEAM_MODE = "team-mode"
+ SUBAGENTS = "subagents"
+
+ @classmethod
+ def parse(cls, value: str) -> "OrchestrationShape":
+ try:
+ return cls(value.lower())
+ except ValueError as e:
+ valid = ", ".join(s.value for s in cls)
+ raise ValueError(f"unknown orchestration shape '{value}'; valid: {valid}") from e
+
+
+@dataclass
+class OrchestrationResult:
+ """Per-shape execution outcome on a single scenario."""
+
+ shape: str
+ scenario_id: str
+ wall_time_ms: float = 0.0
+ agents: list[AgentResult] = field(default_factory=list)
+ merge_turn: AgentResult | None = None
+ avg_quality: float = 0.0
+ total_tokens: int = 0
+ error: str = ""
+
+ def to_dict(self) -> dict[str, Any]:
+ return {
+ "shape": self.shape,
+ "scenario_id": self.scenario_id,
+ "wall_time_ms": round(self.wall_time_ms, 2),
+ "avg_quality": round(self.avg_quality, 4),
+ "total_tokens": self.total_tokens,
+ "agent_count": len(self.agents),
+ "had_merge_turn": self.merge_turn is not None,
+ "error": self.error,
+ }
+
+
+# ---------------------------------------------------------------------------
+# Shape executors
+# ---------------------------------------------------------------------------
+
+
+async def _run_flat(
+ session: Any,
+ endpoint: str,
+ model: str,
+ scenario: dict[str, Any],
+ system_prompt: Any,
+) -> list[AgentResult]:
+ """Sequential execution — agent k+1 starts only after agent k finishes."""
+ from pawbench.engine import run_agent # lazy: avoids aiohttp at import time
+
+ results: list[AgentResult] = []
+ tools_schema = scenario["tools_schema"]
+ for agent in scenario["agents"]:
+ result = await run_agent(session, endpoint, model, agent, tools_schema, system_prompt)
+ results.append(result)
+ return results
+
+
+async def _run_parallel(
+ session: Any,
+ endpoint: str,
+ model: str,
+ scenario: dict[str, Any],
+ system_prompt: Any,
+) -> list[AgentResult]:
+ """Parallel execution — all agents launched simultaneously, no coordination."""
+ import asyncio
+
+ from pawbench.engine import run_agent # lazy
+
+ tools_schema = scenario["tools_schema"]
+ tasks = [run_agent(session, endpoint, model, a, tools_schema, system_prompt) for a in scenario["agents"]]
+ raw = await asyncio.gather(*tasks, return_exceptions=True)
+ results: list[AgentResult] = []
+ for item in raw:
+ if isinstance(item, BaseException):
+ results.append(AgentResult(agent_id="error", agent_name="error", error=str(item)[:200]))
+ else:
+ assert isinstance(item, AgentResult)
+ results.append(item)
+ return results
+
+
+def _build_merge_agent(scenario: dict[str, Any], parallel_results: list[AgentResult]) -> dict[str, Any]:
+ """Synthesize a merge-turn agent that sees every parallel worker's output.
+
+ The merge agent runs one final user turn whose content embeds a compact
+ summary of every worker. This is the structural difference between
+ `subagents` and `scatter-gather`/`team-mode` — the merge step.
+ """
+ summaries: list[str] = []
+ for ar in parallel_results:
+ if ar.error or not ar.turns:
+ continue
+ last = ar.turns[-1].output_text or ""
+ summaries.append(f"## {ar.agent_name}\n{last[:1500]}")
+
+ merge_content = (
+ "You are the integration coordinator for the parallel workers below. "
+ "Your job is to verify the work fits together as a coherent system, "
+ "flag any integration gaps, and emit a final CACP block summarizing "
+ "the merged state. Do NOT rewrite the workers' code — only verify.\n\n" + "\n\n".join(summaries)
+ )
+
+ return {
+ "id": f"{scenario['id']}-merge",
+ "name": "Integration Coordinator",
+ "turns": [
+ {
+ "turn": 1,
+ "role": "user",
+ "content": merge_content,
+ "tools": [],
+ "expect": {"output_mentions": ["status"]},
+ "complexity_tier": "cross_cutting",
+ }
+ ],
+ }
+
+
+# ---------------------------------------------------------------------------
+# Public entry point
+# ---------------------------------------------------------------------------
+
+
+async def run_with_shape(
+ endpoint: str,
+ model: str,
+ scenario: dict[str, Any],
+ shape: OrchestrationShape,
+ system_prompt: str | None = None,
+) -> OrchestrationResult:
+ """Execute a scenario under a specific orchestration shape."""
+ import aiohttp # lazy
+
+ from pawbench.engine import DEFAULT_SYSTEM_PROMPT, run_agent # lazy
+
+ if system_prompt is None:
+ system_prompt = DEFAULT_SYSTEM_PROMPT
+ out = OrchestrationResult(shape=shape.value, scenario_id=scenario["id"])
+ wall_start = time.perf_counter()
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ if shape is OrchestrationShape.FLAT:
+ out.agents = await _run_flat(session, endpoint, model, scenario, system_prompt)
+
+ elif shape is OrchestrationShape.SUBAGENTS or shape is OrchestrationShape.WAVES:
+ # WAVES currently degenerates to SUBAGENTS — no DAG yet.
+ out.agents = await _run_parallel(session, endpoint, model, scenario, system_prompt)
+
+ elif shape is OrchestrationShape.SCATTER_GATHER or shape is OrchestrationShape.TEAM_MODE:
+ # Parallel workers + merge turn. The merge turn is the
+ # structural differentiator vs SUBAGENTS.
+ out.agents = await _run_parallel(session, endpoint, model, scenario, system_prompt)
+ merge_agent = _build_merge_agent(scenario, out.agents)
+ out.merge_turn = await run_agent(
+ session,
+ endpoint,
+ model,
+ merge_agent,
+ scenario["tools_schema"],
+ system_prompt,
+ )
+ else: # pragma: no cover - exhaustive
+ out.error = f"unhandled shape: {shape}"
+ except Exception as e: # network/endpoint failure
+ out.error = str(e)[:200]
+
+ out.wall_time_ms = (time.perf_counter() - wall_start) * 1000
+
+ valid = [a for a in out.agents if not a.error]
+ if valid:
+ out.avg_quality = sum(a.avg_quality for a in valid) / len(valid)
+ out.total_tokens = sum(a.total_completion_tokens for a in out.agents)
+ if out.merge_turn:
+ out.total_tokens += out.merge_turn.total_completion_tokens
+
+ return out
diff --git a/src/pawbench/quality.py b/src/pawbench/quality.py
new file mode 100644
index 0000000..a743f1f
--- /dev/null
+++ b/src/pawbench/quality.py
@@ -0,0 +1,347 @@
+"""Artifact quality analyzers — spec 009 / B4.
+
+Static-analysis scoring over the *artifact* an agent produced (the code
+extracted from tool calls), orthogonal to AC pass/fail. Catches the
+"passes tests, ships slop" failure mode.
+
+This is intentionally **not** folded into composite quality scores yet —
+calibration data first (≥100 dispatches), formula change later.
+
+Analyzers are pluggable: every language registers a callable that takes
+extracted source files and returns an `ArtifactQuality`. Missing tools
+degrade gracefully (analyzer returns a `None` score with `analyzer=""`).
+"""
+
+from __future__ import annotations
+
+import json
+import re
+import shutil
+import subprocess
+import tempfile
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any, Callable, Iterable
+
+
+@dataclass
+class ArtifactQuality:
+ """Static-analysis score over changed files (Axiom §17.5).
+
+ `score` is normalized 0..1 where 1.0 = clean. Analyzers MUST clamp.
+ `lint_errors`, `type_errors`, `cyclomatic_max` are raw counts.
+ `analyzer` identifies the toolchain ("ruff+mypy+radon", "eslint+tsc"...).
+ Empty `analyzer` means the analyzer was unavailable; consumers should
+ treat this row as "no signal" rather than "perfect score".
+ """
+
+ language: str
+ lint_errors: int = 0
+ type_errors: int = 0
+ cyclomatic_max: int = 0
+ score: float = 0.0
+ analyzer: str = ""
+ files_analyzed: int = 0
+ notes: str = ""
+
+ @property
+ def is_signal(self) -> bool:
+ """True if this row reflects an actual analyzer run."""
+ return bool(self.analyzer)
+
+
+# ---------------------------------------------------------------------------
+# File extraction from tool calls
+# ---------------------------------------------------------------------------
+
+
+def extract_files_from_tool_calls(
+ tool_calls: Iterable[dict[str, Any]],
+) -> dict[str, str]:
+ """Extract `path -> content` from `write_file` tool calls.
+
+ Pawbench scenarios use `write_file(path, content)` as the canonical
+ artifact-emission tool. Other tools are ignored. Duplicate paths
+ keep the last write (mirrors filesystem semantics).
+ """
+ files: dict[str, str] = {}
+ for tc in tool_calls:
+ fn = tc.get("function", {})
+ if fn.get("name") != "write_file":
+ continue
+ raw_args = fn.get("arguments", "")
+ if not raw_args:
+ continue
+ try:
+ args = json.loads(raw_args)
+ except json.JSONDecodeError:
+ continue
+ path = args.get("path")
+ content = args.get("content")
+ if isinstance(path, str) and isinstance(content, str):
+ files[path] = content
+ return files
+
+
+def detect_language(files: dict[str, str]) -> str:
+ """Pick the dominant language by file extension."""
+ if not files:
+ return "unknown"
+ counts: dict[str, int] = {}
+ for path in files:
+ suffix = Path(path).suffix.lower()
+ lang = _SUFFIX_TO_LANG.get(suffix)
+ if lang:
+ counts[lang] = counts.get(lang, 0) + 1
+ if not counts:
+ return "unknown"
+ return max(counts.items(), key=lambda kv: kv[1])[0]
+
+
+_SUFFIX_TO_LANG: dict[str, str] = {
+ ".py": "python",
+ ".ts": "typescript",
+ ".tsx": "typescript",
+ ".js": "javascript",
+ ".jsx": "javascript",
+ ".go": "go",
+ ".rs": "rust",
+ ".java": "java",
+ ".rb": "ruby",
+ ".html": "html",
+ ".css": "css",
+ ".sh": "shell",
+}
+
+
+# ---------------------------------------------------------------------------
+# Python analyzer (ruff + mypy + radon, all optional)
+# ---------------------------------------------------------------------------
+
+
+def _which(name: str) -> str | None:
+ return shutil.which(name)
+
+
+def _run(cmd: list[str], cwd: Path, timeout: int = 60) -> tuple[int, str, str]:
+ try:
+ proc = subprocess.run(
+ cmd,
+ cwd=str(cwd),
+ capture_output=True,
+ text=True,
+ timeout=timeout,
+ check=False,
+ )
+ return proc.returncode, proc.stdout, proc.stderr
+ except (subprocess.TimeoutExpired, FileNotFoundError, OSError) as e:
+ return -1, "", str(e)
+
+
+def _materialize(files: dict[str, str], root: Path) -> list[Path]:
+ """Write files to disk under root, return absolute paths."""
+ written: list[Path] = []
+ for rel, content in files.items():
+ # Reject path escapes — analyzer scratch dir must stay sealed.
+ target = (root / rel).resolve()
+ try:
+ target.relative_to(root.resolve())
+ except ValueError:
+ continue
+ target.parent.mkdir(parents=True, exist_ok=True)
+ target.write_text(content, encoding="utf-8")
+ written.append(target)
+ return written
+
+
+def _count_ruff_issues(out: str) -> int:
+ try:
+ issues = json.loads(out) if out.strip() else []
+ return len(issues) if isinstance(issues, list) else 0
+ except json.JSONDecodeError:
+ return out.count('"code":')
+
+
+def _count_mypy_errors(out: str) -> int:
+ return sum(1 for line in out.splitlines() if ": error:" in line)
+
+
+def _max_radon_complexity(out: str) -> int:
+ if not out.strip():
+ return 0
+ try:
+ data = json.loads(out)
+ except json.JSONDecodeError:
+ return 0
+ max_cc = 0
+ for entries in data.values():
+ if not isinstance(entries, list):
+ continue
+ for entry in entries:
+ cc = entry.get("complexity", 0)
+ if isinstance(cc, (int, float)) and cc > max_cc:
+ max_cc = int(cc)
+ return max_cc
+
+
+def _run_python_analyzers(tools: dict[str, str | None], root: Path, aq: ArtifactQuality) -> None:
+ """Mutate `aq` with results from each available analyzer."""
+ if tools["ruff"]:
+ rc, out, _ = _run(
+ [tools["ruff"], "check", "--output-format=json", "--exit-zero", str(root)],
+ cwd=root,
+ )
+ if rc >= 0:
+ aq.lint_errors = _count_ruff_issues(out)
+
+ if tools["mypy"]:
+ rc, out, _ = _run(
+ [tools["mypy"], "--ignore-missing-imports", "--no-error-summary", "--no-color-output", str(root)],
+ cwd=root,
+ timeout=90,
+ )
+ if rc >= 0:
+ aq.type_errors = _count_mypy_errors(out)
+
+ if tools["radon"]:
+ rc, out, _ = _run([tools["radon"], "cc", "-j", "-s", str(root)], cwd=root)
+ if rc >= 0:
+ aq.cyclomatic_max = _max_radon_complexity(out)
+
+
+def _analyze_python(files: dict[str, str]) -> ArtifactQuality:
+ py_files = {p: c for p, c in files.items() if p.endswith(".py")}
+ if not py_files:
+ return ArtifactQuality(language="python", notes="no python files")
+
+ tools: dict[str, str | None] = {
+ "ruff": _which("ruff"),
+ "mypy": _which("mypy"),
+ "radon": _which("radon"),
+ }
+ available = [name for name, path in tools.items() if path]
+ if not available:
+ return ArtifactQuality(
+ language="python",
+ files_analyzed=len(py_files),
+ notes="no analyzers available (install ruff/mypy/radon)",
+ )
+
+ aq = ArtifactQuality(
+ language="python",
+ files_analyzed=len(py_files),
+ analyzer="+".join(available),
+ )
+
+ with tempfile.TemporaryDirectory(prefix="pawbench-quality-") as td:
+ root = Path(td)
+ if not _materialize(py_files, root):
+ aq.notes = "all paths rejected (escape attempts)"
+ aq.analyzer = ""
+ return aq
+ _run_python_analyzers(tools, root, aq)
+
+ aq.score = _score_python(aq)
+ return aq
+
+
+def _score_python(aq: ArtifactQuality) -> float:
+ """Bounded 0..1 quality score from raw counts.
+
+ Cheap, transparent, and explicitly NOT a learned model. We start with
+ 1.0 and subtract bounded penalties so the formula is auditable. The
+ weights are tuned to be lenient on small artifacts (Pawbench scenarios
+ are typically <500 LOC) and to penalize hot spots more than dispersion.
+ """
+ if not aq.is_signal or aq.files_analyzed == 0:
+ return 0.0
+
+ files = max(aq.files_analyzed, 1)
+ # Lint errors per file, capped at 10/file => 0.4 max penalty
+ lint_density = min(aq.lint_errors / files, 10) / 10 * 0.4
+ # Type errors per file, capped at 5/file => 0.4 max penalty
+ type_density = min(aq.type_errors / files, 5) / 5 * 0.4
+ # Cyclomatic complexity hot spot above 10 => up to 0.2 penalty at 30+
+ cc_penalty = max(0, min(aq.cyclomatic_max - 10, 20)) / 20 * 0.2
+
+ score = 1.0 - lint_density - type_density - cc_penalty
+ return max(0.0, min(1.0, score))
+
+
+# ---------------------------------------------------------------------------
+# Generic / fallback analyzer
+# ---------------------------------------------------------------------------
+
+_FILLER_RE = re.compile(r"\b(TODO|FIXME|XXX|HACK)\b")
+
+
+def _analyze_generic(files: dict[str, str], language: str) -> ArtifactQuality:
+ """Tool-free fallback: count obvious smell signals.
+
+ Used when no language-specific analyzer is registered. Produces a
+ real signal (`analyzer="generic"`) so downstream consumers can still
+ differentiate empty artifacts from analyzed ones.
+ """
+ if not files:
+ return ArtifactQuality(language=language, notes="no files")
+
+ total_lines = 0
+ smell_hits = 0
+ longest_function = 0
+ for content in files.values():
+ lines = content.splitlines()
+ total_lines += len(lines)
+ smell_hits += len(_FILLER_RE.findall(content))
+ # Cheap proxy for "long function": longest run of indented lines
+ run = 0
+ for line in lines:
+ if line.startswith((" ", "\t")):
+ run += 1
+ longest_function = max(longest_function, run)
+ else:
+ run = 0
+
+ aq = ArtifactQuality(
+ language=language,
+ files_analyzed=len(files),
+ analyzer="generic",
+ lint_errors=smell_hits,
+ cyclomatic_max=longest_function,
+ )
+ # Score: penalize smell density and very long functions.
+ smell_density = min(smell_hits / max(total_lines, 1) * 100, 5) / 5 * 0.5
+ length_penalty = max(0, min(longest_function - 50, 50)) / 50 * 0.3
+ aq.score = max(0.0, 1.0 - smell_density - length_penalty)
+ return aq
+
+
+# ---------------------------------------------------------------------------
+# Registry
+# ---------------------------------------------------------------------------
+
+Analyzer = Callable[[dict[str, str]], ArtifactQuality]
+
+_REGISTRY: dict[str, Analyzer] = {
+ "python": _analyze_python,
+}
+
+
+def register_analyzer(language: str, analyzer: Analyzer) -> None:
+ """Register a per-language analyzer. Tests use this to inject fakes."""
+ _REGISTRY[language] = analyzer
+
+
+def analyze_artifact(tool_calls: Iterable[dict[str, Any]]) -> ArtifactQuality:
+ """Top-level entry point: extract → detect language → analyze.
+
+ Always returns an `ArtifactQuality`, never None. If nothing was
+ written, returns an empty signal-less row.
+ """
+ files = extract_files_from_tool_calls(tool_calls)
+ if not files:
+ return ArtifactQuality(language="unknown", notes="no write_file calls")
+ language = detect_language(files)
+ analyzer = _REGISTRY.get(language)
+ if analyzer is not None:
+ return analyzer(files)
+ return _analyze_generic(files, language)
diff --git a/src/pawbench/scenarios/api-auth.json b/src/pawbench/scenarios/api-auth.json
index 0a8c8f9..36f4765 100644
--- a/src/pawbench/scenarios/api-auth.json
+++ b/src/pawbench/scenarios/api-auth.json
@@ -1,8 +1,7 @@
{
"id": "api-auth",
- "name": "REST API with JWT Auth — Middleware and Protected Endpoints",
+ "name": "REST API with JWT Auth \u2014 Middleware and Protected Endpoints",
"description": "Build a REST API with JWT authentication. Agent 1 builds auth middleware, Agent 2 builds protected endpoints. Steering event adds rate limiting to the auth endpoint. Tests multi-agent coordination and adaptability.",
-
"agents": [
{
"id": "auth-middleware",
@@ -12,39 +11,74 @@
"turn": 1,
"role": "user",
"content": "Build a JWT authentication module for a Python REST API using stdlib only (http.server + hmac/hashlib for tokens, no PyJWT).\n\nCreate auth/auth.py with:\n1. A function `create_token(user_id: str, secret: str, expires_in: int = 3600) -> str` that creates a JWT-like token. Use base64-encoded JSON header + payload + HMAC-SHA256 signature. Payload should include: sub (user_id), iat (issued at), exp (expiration timestamp).\n2. A function `verify_token(token: str, secret: str) -> dict` that verifies and decodes the token. Raise ValueError for expired tokens, invalid signatures, or malformed tokens.\n3. A decorator `require_auth(secret: str)` that can wrap handler methods. It reads the Authorization header (Bearer ), verifies the token, and passes the decoded payload to the handler. Returns 401 JSON response for missing/invalid tokens.\n\nAlso create auth/users.py with a simple in-memory user store:\n- USERS dict: {'admin': {'password': 'admin123', 'role': 'admin'}, 'user1': {'password': 'pass456', 'role': 'user'}}\n- Function `authenticate(username: str, password: str) -> dict | None` that returns user info or None\n\nUse write_file for each file.",
- "tools": ["write_file", "run_command"],
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "tool_name_any": ["write_file"],
- "output_mentions": ["JWT", "HMAC", "token"]
+ "tool_name_any": [
+ "write_file"
+ ],
+ "output_mentions": [
+ "JWT",
+ "HMAC",
+ "token"
+ ]
}
},
{
"turn": 2,
"inject_tool_results": [
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"auth/auth.py\", \"bytes\": 3200}"},
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"auth/users.py\", \"bytes\": 800}"}
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"auth/auth.py\", \"bytes\": 3200}"
+ },
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"auth/users.py\", \"bytes\": 800}"
+ }
],
"role": "tool_result",
"content": "Auth module written. Now add:\n1. A `POST /api/login` handler function that accepts {\"username\": \"...\", \"password\": \"...\"}, authenticates via users.py, and returns {\"token\": \"\", \"expires_in\": 3600, \"user\": {\"username\": \"...\", \"role\": \"...\"}}. Return 401 for invalid credentials.\n2. A `POST /api/refresh` handler that takes an existing valid token and returns a new token with a fresh expiration.\n3. Role-based check: add `require_role(role: str)` decorator that checks the token payload's role field. Return 403 for insufficient permissions.\n\nUse write_file to update auth.py.",
- "tools": ["write_file", "run_command"],
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "output_mentions": ["login", "refresh", "role"]
+ "output_mentions": [
+ "login",
+ "refresh",
+ "role"
+ ]
}
},
{
"turn": 3,
"steering_event": true,
"inject_tool_results": [
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"auth/auth.py\", \"bytes\": 5100}"}
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"auth/auth.py\", \"bytes\": 5100}"
+ }
],
"role": "tool_result",
"content": "STEERING EVENT: Security review flagged brute-force vulnerability on the login endpoint. Add rate limiting:\n\n1. Create auth/rate_limit.py with an in-memory rate limiter:\n - Track requests per IP address using a sliding window (60 second window)\n - Default limit: 10 requests per minute for login, 100 for other endpoints\n - Return 429 Too Many Requests with {\"error\": \"Rate limit exceeded\", \"retry_after\": } and Retry-After header\n2. Add a `rate_limit(max_requests: int = 10, window_seconds: int = 60)` decorator\n3. Apply it to the login and refresh handlers\n\nUse write_file to create rate_limit.py and update auth.py.",
- "tools": ["write_file", "run_command"],
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "output_mentions": ["rate", "limit", "429"],
+ "output_mentions": [
+ "rate",
+ "limit",
+ "429"
+ ],
"steering_followed": true
}
}
@@ -57,44 +91,74 @@
{
"turn": 1,
"role": "user",
- "content": "Build protected REST API endpoints for a note-taking app. The auth middleware is already at auth/auth.py with `require_auth(secret)` and `require_role(role)` decorators.\n\nCreate api/server.py with a Python http.server-based API:\n- SECRET = 'pawbench-secret-key-2024'\n- POST /api/login — delegates to auth module's login handler\n- GET /api/notes — returns all notes for the authenticated user (require_auth). Store notes in-memory as {user_id: [{\"id\": int, \"title\": str, \"content\": str, \"created_at\": str}]}\n- POST /api/notes — create a note (require_auth). Body: {\"title\": \"...\", \"content\": \"...\"}. Auto-assign ID and timestamp.\n- CORS headers on all responses\n- Port 8081\n\nUse write_file.",
- "tools": ["write_file", "run_command"],
+ "content": "Build protected REST API endpoints for a note-taking app. The auth middleware is already at auth/auth.py with `require_auth(secret)` and `require_role(role)` decorators.\n\nCreate api/server.py with a Python http.server-based API:\n- SECRET = 'pawbench-secret-key-2024'\n- POST /api/login \u2014 delegates to auth module's login handler\n- GET /api/notes \u2014 returns all notes for the authenticated user (require_auth). Store notes in-memory as {user_id: [{\"id\": int, \"title\": str, \"content\": str, \"created_at\": str}]}\n- POST /api/notes \u2014 create a note (require_auth). Body: {\"title\": \"...\", \"content\": \"...\"}. Auto-assign ID and timestamp.\n- CORS headers on all responses\n- Port 8081\n\nUse write_file.",
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "tool_name_any": ["write_file"],
- "output_mentions": ["notes", "auth", "8081"]
+ "tool_name_any": [
+ "write_file"
+ ],
+ "output_mentions": [
+ "notes",
+ "auth",
+ "8081"
+ ]
}
},
{
"turn": 2,
"inject_tool_results": [
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"api/server.py\", \"bytes\": 3800}"}
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"api/server.py\", \"bytes\": 3800}"
+ }
],
"role": "tool_result",
- "content": "Server written. Add more endpoints:\n1. GET /api/notes/ — get a specific note (require_auth). 404 if not found or belongs to another user.\n2. PUT /api/notes/ — update a note (require_auth). Only the owner can update. Return 403 if not owner.\n3. DELETE /api/notes/ — delete a note (require_auth). Only the owner or admin role can delete. Return 403 otherwise.\n4. GET /api/admin/stats — admin-only endpoint (require_role('admin')). Returns {\"total_users\": N, \"total_notes\": N, \"notes_per_user\": {\"user1\": N}}.\n\nUse write_file to update server.py.",
- "tools": ["write_file", "run_command"],
+ "content": "Server written. Add more endpoints:\n1. GET /api/notes/ \u2014 get a specific note (require_auth). 404 if not found or belongs to another user.\n2. PUT /api/notes/ \u2014 update a note (require_auth). Only the owner can update. Return 403 if not owner.\n3. DELETE /api/notes/ \u2014 delete a note (require_auth). Only the owner or admin role can delete. Return 403 otherwise.\n4. GET /api/admin/stats \u2014 admin-only endpoint (require_role('admin')). Returns {\"total_users\": N, \"total_notes\": N, \"notes_per_user\": {\"user1\": N}}.\n\nUse write_file to update server.py.",
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "output_mentions": ["PUT", "DELETE", "admin"]
+ "output_mentions": [
+ "PUT",
+ "DELETE",
+ "admin"
+ ]
}
},
{
"turn": 3,
"inject_tool_results": [
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"api/server.py\", \"bytes\": 5600}"}
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"api/server.py\", \"bytes\": 5600}"
+ }
],
"role": "tool_result",
- "content": "Endpoints added. Final step: add search and pagination.\n1. GET /api/notes?q=&page=1&per_page=10 — search notes by title or content (case-insensitive substring match). Paginate results. Return {\"notes\": [...], \"total\": N, \"page\": N, \"per_page\": N, \"pages\": N}.\n2. GET /api/notes?sort=created_at&order=desc — sort by created_at (asc/desc). Default: desc.\n3. Add request logging: print method, path, user_id (from token), status code, response time.\n\nUse write_file to update server.py.",
- "tools": ["write_file", "run_command"],
+ "content": "Endpoints added. Final step: add search and pagination.\n1. GET /api/notes?q=&page=1&per_page=10 \u2014 search notes by title or content (case-insensitive substring match). Paginate results. Return {\"notes\": [...], \"total\": N, \"page\": N, \"per_page\": N, \"pages\": N}.\n2. GET /api/notes?sort=created_at&order=desc \u2014 sort by created_at (asc/desc). Default: desc.\n3. Add request logging: print method, path, user_id (from token), status code, response time.\n\nUse write_file to update server.py.",
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "output_mentions": ["search", "pagination", "sort"]
+ "output_mentions": [
+ "search",
+ "pagination",
+ "sort"
+ ]
}
}
]
}
],
-
"tools_schema": [
{
"type": "function",
@@ -104,10 +168,19 @@
"parameters": {
"type": "object",
"properties": {
- "path": {"type": "string", "description": "File path relative to workspace"},
- "content": {"type": "string", "description": "File content to write"}
+ "path": {
+ "type": "string",
+ "description": "File path relative to workspace"
+ },
+ "content": {
+ "type": "string",
+ "description": "File content to write"
+ }
},
- "required": ["path", "content"]
+ "required": [
+ "path",
+ "content"
+ ]
}
}
},
@@ -119,11 +192,17 @@
"parameters": {
"type": "object",
"properties": {
- "command": {"type": "string", "description": "Shell command to execute"}
+ "command": {
+ "type": "string",
+ "description": "Shell command to execute"
+ }
},
- "required": ["command"]
+ "required": [
+ "command"
+ ]
}
}
}
- ]
+ ],
+ "complexity_tier": "cross_cutting"
}
diff --git a/src/pawbench/scenarios/bug-fix.json b/src/pawbench/scenarios/bug-fix.json
index b666b91..096d8e6 100644
--- a/src/pawbench/scenarios/bug-fix.json
+++ b/src/pawbench/scenarios/bug-fix.json
@@ -1,8 +1,7 @@
{
"id": "bug-fix",
- "name": "Calculator Bug Fix — Division by Zero and Operator Precedence",
+ "name": "Calculator Bug Fix \u2014 Division by Zero and Operator Precedence",
"description": "Given a broken Python calculator module with division-by-zero crashes and wrong operator precedence, fix the bugs across 3 turns with injected error output between turns. Tests debugging ability and incremental fix quality.",
-
"agents": [
{
"id": "debugger",
@@ -12,45 +11,83 @@
"turn": 1,
"role": "user",
"content": "We have a broken Python calculator module at calc/calculator.py. Here's the code:\n\n```python\nclass Calculator:\n def __init__(self):\n self.history = []\n\n def add(self, a, b):\n result = a + b\n self.history.append(f\"{a} + {b} = {result}\")\n return result\n\n def subtract(self, a, b):\n result = a - b\n self.history.append(f\"{a} - {b} = {result}\")\n return result\n\n def multiply(self, a, b):\n result = a * b\n self.history.append(f\"{a} * {b} = {result}\")\n return result\n\n def divide(self, a, b):\n result = a / b\n self.history.append(f\"{a} / {b} = {result}\")\n return result\n\n def evaluate(self, expression):\n \"\"\"Evaluate a simple math expression like '2 + 3 * 4'.\"\"\"\n tokens = expression.split()\n result = float(tokens[0])\n i = 1\n while i < len(tokens):\n op = tokens[i]\n num = float(tokens[i + 1])\n if op == '+':\n result = self.add(result, num)\n elif op == '-':\n result = self.subtract(result, num)\n elif op == '*':\n result = self.multiply(result, num)\n elif op == '/':\n result = self.divide(result, num)\n i += 2\n return result\n\n def get_history(self):\n return list(self.history)\n\n def clear_history(self):\n self.history.clear()\n```\n\nBugs reported:\n1. `calculator.divide(10, 0)` crashes with ZeroDivisionError instead of returning an error\n2. `calculator.evaluate('2 + 3 * 4')` returns 20 instead of 14 (doesn't respect operator precedence)\n3. `calculator.evaluate('10 / 0 + 5')` crashes the whole program\n\nRead the code and fix bug #1 first: handle division by zero gracefully. The divide method should raise a ValueError with message 'Division by zero' instead of letting ZeroDivisionError propagate. Use write_file to create the fixed module.",
- "tools": ["write_file", "run_command"],
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "tool_name_any": ["write_file"],
- "output_mentions": ["ZeroDivisionError", "ValueError", "divide"]
+ "tool_name_any": [
+ "write_file"
+ ],
+ "output_mentions": [
+ "ZeroDivisionError",
+ "ValueError",
+ "divide"
+ ]
}
},
{
"turn": 2,
"inject_tool_results": [
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"calc/calculator.py\", \"bytes\": 1800}"},
- {"tool_call_id": "auto", "name": "run_command", "content": "{\"status\": \"ok\", \"stdout\": \"FAILED test_evaluate_precedence - AssertionError: 20.0 != 14.0\\nFAILED test_evaluate_division_by_zero - calculator.evaluate('10 / 0 + 5') still crashes\\n2 failed, 3 passed\", \"exit_code\": 1}"}
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"calc/calculator.py\", \"bytes\": 1800}"
+ },
+ {
+ "tool_call_id": "auto",
+ "name": "run_command",
+ "content": "{\"status\": \"ok\", \"stdout\": \"FAILED test_evaluate_precedence - AssertionError: 20.0 != 14.0\\nFAILED test_evaluate_division_by_zero - calculator.evaluate('10 / 0 + 5') still crashes\\n2 failed, 3 passed\", \"exit_code\": 1}"
+ }
],
"role": "tool_result",
"content": "Division by zero in the divide() method is fixed, but tests show two remaining failures:\n\n1. test_evaluate_precedence: calculator.evaluate('2 + 3 * 4') returns 20.0, expected 14.0. The evaluate method processes left-to-right without respecting * and / precedence over + and -.\n2. test_evaluate_division_by_zero: calculator.evaluate('10 / 0 + 5') still crashes because evaluate() doesn't catch the ValueError from divide().\n\nFix bug #2: rewrite the evaluate() method to respect standard math operator precedence (* and / before + and -). Use a two-pass approach: first handle * and /, then handle + and -. Use write_file.",
- "tools": ["write_file", "run_command"],
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "output_mentions": ["precedence", "multiply", "divide"]
+ "output_mentions": [
+ "precedence",
+ "multiply",
+ "divide"
+ ]
}
},
{
"turn": 3,
"inject_tool_results": [
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"calc/calculator.py\", \"bytes\": 2600}"},
- {"tool_call_id": "auto", "name": "run_command", "content": "{\"status\": \"ok\", \"stdout\": \"FAILED test_evaluate_division_by_zero - ValueError not caught in evaluate\\n1 failed, 4 passed\", \"exit_code\": 1}"}
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"calc/calculator.py\", \"bytes\": 2600}"
+ },
+ {
+ "tool_call_id": "auto",
+ "name": "run_command",
+ "content": "{\"status\": \"ok\", \"stdout\": \"FAILED test_evaluate_division_by_zero - ValueError not caught in evaluate\\n1 failed, 4 passed\", \"exit_code\": 1}"
+ }
],
"role": "tool_result",
"content": "Precedence is fixed! But one test still fails:\n\ntest_evaluate_division_by_zero: evaluate('10 / 0 + 5') raises ValueError instead of returning an error result. The evaluate() method should catch ValueError from divide() and return float('nan') for expressions containing division by zero, with an appropriate history entry like '10 / 0 = ERROR(Division by zero)'.\n\nFix this last bug and also add input validation: evaluate() should raise ValueError with 'Invalid expression' if the expression is empty, has invalid tokens, or has mismatched operator/number pairs. Use write_file.",
- "tools": ["write_file", "run_command"],
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "output_mentions": ["nan", "evaluate", "validation"]
+ "output_mentions": [
+ "nan",
+ "evaluate",
+ "validation"
+ ]
}
}
]
}
],
-
"tools_schema": [
{
"type": "function",
@@ -60,10 +97,19 @@
"parameters": {
"type": "object",
"properties": {
- "path": {"type": "string", "description": "File path relative to workspace"},
- "content": {"type": "string", "description": "File content to write"}
+ "path": {
+ "type": "string",
+ "description": "File path relative to workspace"
+ },
+ "content": {
+ "type": "string",
+ "description": "File content to write"
+ }
},
- "required": ["path", "content"]
+ "required": [
+ "path",
+ "content"
+ ]
}
}
},
@@ -75,11 +121,17 @@
"parameters": {
"type": "object",
"properties": {
- "command": {"type": "string", "description": "Shell command to execute"}
+ "command": {
+ "type": "string",
+ "description": "Shell command to execute"
+ }
},
- "required": ["command"]
+ "required": [
+ "command"
+ ]
}
}
}
- ]
+ ],
+ "complexity_tier": "crud"
}
diff --git a/src/pawbench/scenarios/cli-tool.json b/src/pawbench/scenarios/cli-tool.json
index 3b47f0c..623c1c3 100644
--- a/src/pawbench/scenarios/cli-tool.json
+++ b/src/pawbench/scenarios/cli-tool.json
@@ -1,8 +1,7 @@
{
"id": "cli-tool",
- "name": "WordCount CLI Tool — Build and Test",
+ "name": "WordCount CLI Tool \u2014 Build and Test",
"description": "Build a CLI tool 'wordcount' that reads files and outputs word, line, and character counts. One agent builds the tool, another writes tests. Tests pure tool-use throughput and code generation quality across parallel agents.",
-
"agents": [
{
"id": "tool-builder",
@@ -12,37 +11,66 @@
"turn": 1,
"role": "user",
"content": "Build a Python CLI tool called 'wordcount' that reads one or more files and outputs word, line, and character counts.\n\nRequirements:\n- Single file: wordcount.py\n- Usage: python wordcount.py [OPTIONS] FILE [FILE...]\n- Options: --words (-w), --lines (-l), --chars (-c). If none specified, show all three.\n- Output format per file: ' '\n- If multiple files, show a 'total' row at the end\n- If a file doesn't exist, print 'wordcount: : No such file' to stderr and continue\n- Exit code 0 if all files read successfully, 1 if any file failed\n- Use argparse for argument parsing\n\nUse write_file to create the tool.",
- "tools": ["write_file", "run_command"],
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "tool_name_any": ["write_file"],
- "output_mentions": ["argparse", "wordcount"]
+ "tool_name_any": [
+ "write_file"
+ ],
+ "output_mentions": [
+ "argparse",
+ "wordcount"
+ ]
}
},
{
"turn": 2,
"inject_tool_results": [
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"wordcount/wordcount.py\", \"bytes\": 2800}"}
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"wordcount/wordcount.py\", \"bytes\": 2800}"
+ }
],
"role": "tool_result",
"content": "Tool written. Now add these features:\n1. --format json option that outputs results as JSON: {\"files\": [{\"name\": \"...\", \"lines\": N, \"words\": N, \"chars\": N}], \"total\": {\"lines\": N, \"words\": N, \"chars\": N}}\n2. Support reading from stdin when '-' is passed as filename\n3. --sort option: sort output by 'lines', 'words', or 'chars' (descending)\n\nUse write_file to update.",
- "tools": ["write_file", "run_command"],
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "output_mentions": ["json", "stdin", "sort"]
+ "output_mentions": [
+ "json",
+ "stdin",
+ "sort"
+ ]
}
},
{
"turn": 3,
"inject_tool_results": [
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"wordcount/wordcount.py\", \"bytes\": 4200}"}
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"wordcount/wordcount.py\", \"bytes\": 4200}"
+ }
],
"role": "tool_result",
"content": "Updated. Final step: add a --summary flag that only prints the total row (skip individual file rows). Also add --exclude-empty flag that skips files with 0 lines from the count. Make sure --exclude-empty works with --sort and --format json. Use write_file.",
- "tools": ["write_file", "run_command"],
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "output_mentions": ["summary", "exclude-empty"]
+ "output_mentions": [
+ "summary",
+ "exclude-empty"
+ ]
}
}
]
@@ -55,44 +83,77 @@
"turn": 1,
"role": "user",
"content": "Write pytest tests for a Python CLI tool called 'wordcount'. The tool is at wordcount/wordcount.py.\n\nThe tool:\n- Takes one or more filenames as arguments\n- Options: --words (-w), --lines (-l), --chars (-c)\n- Output format: ' '\n- Shows total row for multiple files\n- Prints error to stderr for missing files, continues processing\n- Exit code 0 if all OK, 1 if any file failed\n\nWrite comprehensive tests covering:\n- Single file, all counts\n- Single file, individual count flags\n- Multiple files with total row\n- Missing file error handling\n- Empty file handling\n- File with only whitespace\n\nUse write_file to create test_wordcount.py. Use run_command to create sample test fixture files.",
- "tools": ["write_file", "run_command"],
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "tool_name_any": ["write_file"],
- "output_mentions": ["pytest", "test"]
+ "tool_name_any": [
+ "write_file"
+ ],
+ "output_mentions": [
+ "pytest",
+ "test"
+ ]
}
},
{
"turn": 2,
"inject_tool_results": [
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"wordcount/test_wordcount.py\", \"bytes\": 3200}"},
- {"tool_call_id": "auto", "name": "run_command", "content": "{\"status\": \"ok\", \"stdout\": \"Created fixture files\", \"exit_code\": 0}"}
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"wordcount/test_wordcount.py\", \"bytes\": 3200}"
+ },
+ {
+ "tool_call_id": "auto",
+ "name": "run_command",
+ "content": "{\"status\": \"ok\", \"stdout\": \"Created fixture files\", \"exit_code\": 0}"
+ }
],
"role": "tool_result",
"content": "Tests written. Now add tests for the new features:\n- --format json: verify JSON output structure and values\n- stdin support ('-' as filename): pipe content and verify counts\n- --sort option: verify descending sort by lines/words/chars\n\nUse write_file to update the test file.",
- "tools": ["write_file", "run_command"],
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "output_mentions": ["json", "stdin", "sort"]
+ "output_mentions": [
+ "json",
+ "stdin",
+ "sort"
+ ]
}
},
{
"turn": 3,
"inject_tool_results": [
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"wordcount/test_wordcount.py\", \"bytes\": 5100}"}
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"wordcount/test_wordcount.py\", \"bytes\": 5100}"
+ }
],
"role": "tool_result",
"content": "Updated. Final round: add tests for --summary and --exclude-empty flags. Test combinations: --summary with --format json, --exclude-empty with --sort, and all three together. Also add an integration test that runs the actual CLI via subprocess and checks stdout/stderr. Use write_file.",
- "tools": ["write_file", "run_command"],
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "output_mentions": ["summary", "exclude-empty", "subprocess"]
+ "output_mentions": [
+ "summary",
+ "exclude-empty",
+ "subprocess"
+ ]
}
}
]
}
],
-
"tools_schema": [
{
"type": "function",
@@ -102,10 +163,19 @@
"parameters": {
"type": "object",
"properties": {
- "path": {"type": "string", "description": "File path relative to workspace"},
- "content": {"type": "string", "description": "File content to write"}
+ "path": {
+ "type": "string",
+ "description": "File path relative to workspace"
+ },
+ "content": {
+ "type": "string",
+ "description": "File content to write"
+ }
},
- "required": ["path", "content"]
+ "required": [
+ "path",
+ "content"
+ ]
}
}
},
@@ -117,11 +187,17 @@
"parameters": {
"type": "object",
"properties": {
- "command": {"type": "string", "description": "Shell command to execute"}
+ "command": {
+ "type": "string",
+ "description": "Shell command to execute"
+ }
},
- "required": ["command"]
+ "required": [
+ "command"
+ ]
}
}
}
- ]
+ ],
+ "complexity_tier": "crud"
}
diff --git a/src/pawbench/scenarios/data-pipeline.json b/src/pawbench/scenarios/data-pipeline.json
index 9efba78..dfaa5ee 100644
--- a/src/pawbench/scenarios/data-pipeline.json
+++ b/src/pawbench/scenarios/data-pipeline.json
@@ -1,8 +1,7 @@
{
"id": "data-pipeline",
- "name": "CSV-to-JSON Data Pipeline — Transform, Validate, Extend",
+ "name": "CSV-to-JSON Data Pipeline \u2014 Transform, Validate, Extend",
"description": "Build a CSV-to-JSON data pipeline with validation. Agent 1 builds the transformer, Agent 2 builds the validator. Nudge event adds TSV support. Tests multi-agent coordination, data handling quality, and adaptability.",
-
"agents": [
{
"id": "transformer",
@@ -11,40 +10,75 @@
{
"turn": 1,
"role": "user",
- "content": "Build a Python CSV-to-JSON data transformer module.\n\nCreate pipeline/transformer.py with:\n1. A class `CSVTransformer` that reads a CSV file and converts it to JSON.\n2. Constructor takes: input_path (str), output_path (str, optional — defaults to input_path with .json extension), delimiter (str, default ',')\n3. Method `transform() -> list[dict]`:\n - Read the CSV file using stdlib csv module\n - First row is headers\n - Auto-detect types: integers, floats, booleans ('true'/'false'), and strings\n - Handle quoted fields and escaped commas\n - Return list of dicts (one per row)\n4. Method `save(data: list[dict]) -> str`:\n - Write the JSON to output_path with indent=2\n - Return the output path\n5. Method `transform_and_save() -> tuple[list[dict], str]`:\n - Convenience method that calls transform() then save()\n - Return (data, output_path)\n6. Handle errors: FileNotFoundError, empty CSV (raise ValueError), malformed rows (skip with warning to stderr)\n\nAlso create pipeline/__init__.py. Use write_file for each file.",
- "tools": ["write_file", "run_command"],
+ "content": "Build a Python CSV-to-JSON data transformer module.\n\nCreate pipeline/transformer.py with:\n1. A class `CSVTransformer` that reads a CSV file and converts it to JSON.\n2. Constructor takes: input_path (str), output_path (str, optional \u2014 defaults to input_path with .json extension), delimiter (str, default ',')\n3. Method `transform() -> list[dict]`:\n - Read the CSV file using stdlib csv module\n - First row is headers\n - Auto-detect types: integers, floats, booleans ('true'/'false'), and strings\n - Handle quoted fields and escaped commas\n - Return list of dicts (one per row)\n4. Method `save(data: list[dict]) -> str`:\n - Write the JSON to output_path with indent=2\n - Return the output path\n5. Method `transform_and_save() -> tuple[list[dict], str]`:\n - Convenience method that calls transform() then save()\n - Return (data, output_path)\n6. Handle errors: FileNotFoundError, empty CSV (raise ValueError), malformed rows (skip with warning to stderr)\n\nAlso create pipeline/__init__.py. Use write_file for each file.",
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "tool_name_any": ["write_file"],
- "output_mentions": ["CSV", "transform", "json"]
+ "tool_name_any": [
+ "write_file"
+ ],
+ "output_mentions": [
+ "CSV",
+ "transform",
+ "json"
+ ]
}
},
{
"turn": 2,
"inject_tool_results": [
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"pipeline/transformer.py\", \"bytes\": 2800}"},
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"pipeline/__init__.py\", \"bytes\": 100}"}
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"pipeline/transformer.py\", \"bytes\": 2800}"
+ },
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"pipeline/__init__.py\", \"bytes\": 100}"
+ }
],
"role": "tool_result",
- "content": "Transformer created. Now add these features:\n1. Column mapping: method `set_column_map(mapping: dict)` — rename columns during transform. E.g., {'First Name': 'first_name', 'Last Name': 'last_name'}\n2. Column filtering: method `set_columns(columns: list[str])` — only include specified columns in output\n3. Row filtering: method `set_filter(column: str, operator: str, value: any)` — filter rows. Operators: 'eq', 'neq', 'gt', 'lt', 'gte', 'lte', 'contains', 'startswith'\n4. Method chaining: all set_* methods return self for fluent API\n5. Statistics: method `stats() -> dict` returning {\"total_rows\": N, \"filtered_rows\": N, \"columns\": [...], \"type_distribution\": {\"col\": {\"int\": N, \"str\": N}}}\n\nUse write_file to update transformer.py.",
- "tools": ["write_file", "run_command"],
+ "content": "Transformer created. Now add these features:\n1. Column mapping: method `set_column_map(mapping: dict)` \u2014 rename columns during transform. E.g., {'First Name': 'first_name', 'Last Name': 'last_name'}\n2. Column filtering: method `set_columns(columns: list[str])` \u2014 only include specified columns in output\n3. Row filtering: method `set_filter(column: str, operator: str, value: any)` \u2014 filter rows. Operators: 'eq', 'neq', 'gt', 'lt', 'gte', 'lte', 'contains', 'startswith'\n4. Method chaining: all set_* methods return self for fluent API\n5. Statistics: method `stats() -> dict` returning {\"total_rows\": N, \"filtered_rows\": N, \"columns\": [...], \"type_distribution\": {\"col\": {\"int\": N, \"str\": N}}}\n\nUse write_file to update transformer.py.",
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "output_mentions": ["column", "filter", "mapping"]
+ "output_mentions": [
+ "column",
+ "filter",
+ "mapping"
+ ]
}
},
{
"turn": 3,
"steering_event": true,
"inject_tool_results": [
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"pipeline/transformer.py\", \"bytes\": 4500}"}
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"pipeline/transformer.py\", \"bytes\": 4500}"
+ }
],
"role": "tool_result",
"content": "NUDGE: Client just sent data in TSV format (tab-separated). Add TSV support:\n\n1. Auto-detect delimiter: if file extension is .tsv, use tab delimiter. If .csv, use comma. Otherwise, sniff the first line.\n2. Add a class `TSVTransformer` that extends CSVTransformer with delimiter='\\t'\n3. Add a factory function `create_transformer(input_path: str, output_path: str = None) -> CSVTransformer` that auto-selects the right transformer based on file extension\n4. Support mixed-format batch processing: method `batch_transform(input_paths: list[str]) -> list[tuple[list[dict], str]]` that processes multiple files (CSV and TSV mixed) and returns results for each\n\nUse write_file to update transformer.py.",
- "tools": ["write_file", "run_command"],
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "output_mentions": ["TSV", "tab", "delimiter"],
+ "output_mentions": [
+ "TSV",
+ "tab",
+ "delimiter"
+ ],
"steering_followed": true
}
}
@@ -57,44 +91,74 @@
{
"turn": 1,
"role": "user",
- "content": "Build a data validation module for a CSV-to-JSON pipeline.\n\nCreate pipeline/validator.py with:\n1. A class `SchemaValidator` that validates transformed data (list of dicts) against a schema.\n2. Schema definition format:\n ```python\n schema = {\n 'name': {'type': 'str', 'required': True, 'min_length': 1, 'max_length': 100},\n 'age': {'type': 'int', 'required': True, 'min': 0, 'max': 150},\n 'email': {'type': 'str', 'required': True, 'pattern': r'^[\\w.-]+@[\\w.-]+\\.\\w+$'},\n 'score': {'type': 'float', 'required': False, 'min': 0.0, 'max': 100.0},\n 'active': {'type': 'bool', 'required': False, 'default': True}\n }\n ```\n3. Method `validate(data: list[dict]) -> ValidationResult` where ValidationResult has:\n - valid_rows: list[dict] — rows that passed validation\n - invalid_rows: list[tuple[int, dict, list[str]]] — (row_index, row_data, list of error messages)\n - is_valid: bool — True if all rows valid\n - error_count: int\n - summary: dict — {\"total\": N, \"valid\": N, \"invalid\": N, \"errors_by_field\": {\"name\": N}}\n4. Support type coercion: if a field is type 'int' but value is '42' (string), coerce it\n5. Support custom validators: method `add_validator(field: str, fn: Callable, error_msg: str)`\n\nUse write_file.",
- "tools": ["write_file", "run_command"],
+ "content": "Build a data validation module for a CSV-to-JSON pipeline.\n\nCreate pipeline/validator.py with:\n1. A class `SchemaValidator` that validates transformed data (list of dicts) against a schema.\n2. Schema definition format:\n ```python\n schema = {\n 'name': {'type': 'str', 'required': True, 'min_length': 1, 'max_length': 100},\n 'age': {'type': 'int', 'required': True, 'min': 0, 'max': 150},\n 'email': {'type': 'str', 'required': True, 'pattern': r'^[\\w.-]+@[\\w.-]+\\.\\w+$'},\n 'score': {'type': 'float', 'required': False, 'min': 0.0, 'max': 100.0},\n 'active': {'type': 'bool', 'required': False, 'default': True}\n }\n ```\n3. Method `validate(data: list[dict]) -> ValidationResult` where ValidationResult has:\n - valid_rows: list[dict] \u2014 rows that passed validation\n - invalid_rows: list[tuple[int, dict, list[str]]] \u2014 (row_index, row_data, list of error messages)\n - is_valid: bool \u2014 True if all rows valid\n - error_count: int\n - summary: dict \u2014 {\"total\": N, \"valid\": N, \"invalid\": N, \"errors_by_field\": {\"name\": N}}\n4. Support type coercion: if a field is type 'int' but value is '42' (string), coerce it\n5. Support custom validators: method `add_validator(field: str, fn: Callable, error_msg: str)`\n\nUse write_file.",
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "tool_name_any": ["write_file"],
- "output_mentions": ["schema", "validate", "ValidationResult"]
+ "tool_name_any": [
+ "write_file"
+ ],
+ "output_mentions": [
+ "schema",
+ "validate",
+ "ValidationResult"
+ ]
}
},
{
"turn": 2,
"inject_tool_results": [
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"pipeline/validator.py\", \"bytes\": 3400}"}
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"pipeline/validator.py\", \"bytes\": 3400}"
+ }
],
"role": "tool_result",
- "content": "Validator created. Now add:\n1. Uniqueness constraint: schema field option `unique: True` — flag duplicates in that column\n2. Cross-field validation: method `add_cross_validator(fn: Callable[[dict], bool], error_msg: str)` — validate relationships between fields (e.g., 'end_date > start_date')\n3. Reporting: method `generate_report() -> str` that returns a human-readable validation report with:\n - Summary statistics\n - Top 10 most common errors\n - Error distribution by field\n - Sample invalid rows (first 5)\n4. Fix/transform mode: method `set_fix_mode(fixes: dict)` where fixes maps field names to fix functions. E.g., {'email': str.lower, 'name': str.strip}. Applied before validation.\n\nUse write_file to update validator.py.",
- "tools": ["write_file", "run_command"],
+ "content": "Validator created. Now add:\n1. Uniqueness constraint: schema field option `unique: True` \u2014 flag duplicates in that column\n2. Cross-field validation: method `add_cross_validator(fn: Callable[[dict], bool], error_msg: str)` \u2014 validate relationships between fields (e.g., 'end_date > start_date')\n3. Reporting: method `generate_report() -> str` that returns a human-readable validation report with:\n - Summary statistics\n - Top 10 most common errors\n - Error distribution by field\n - Sample invalid rows (first 5)\n4. Fix/transform mode: method `set_fix_mode(fixes: dict)` where fixes maps field names to fix functions. E.g., {'email': str.lower, 'name': str.strip}. Applied before validation.\n\nUse write_file to update validator.py.",
+ "tools": [
+ "write_file",
+ "run_command"
+ ],
"expect": {
"tool_calls_min": 1,
- "output_mentions": ["unique", "cross-field", "report"]
+ "output_mentions": [
+ "unique",
+ "cross-field",
+ "report"
+ ]
}
},
{
"turn": 3,
"inject_tool_results": [
- {"tool_call_id": "auto", "name": "write_file", "content": "{\"status\": \"ok\", \"path\": \"pipeline/validator.py\", \"bytes\": 5200}"}
+ {
+ "tool_call_id": "auto",
+ "name": "write_file",
+ "content": "{\"status\": \"ok\", \"path\": \"pipeline/validator.py\", \"bytes\": 5200}"
+ }
],
"role": "tool_result",
"content": "Updated. Final step: create the pipeline integration module.\n\nCreate pipeline/runner.py that ties the transformer and validator together:\n1. Class `PipelineRunner`:\n - Constructor: `PipelineRunner(input_path: str, schema: dict, output_path: str = None)`\n - Method `run() -> PipelineResult` with:\n - transform_time_ms: float\n - validate_time_ms: float\n - total_time_ms: float\n - input_rows: int\n - valid_rows: int\n - invalid_rows: int\n - output_path: str\n - validation_report: str\n - Auto-detects CSV vs TSV using the factory function from transformer.py\n - Applies fixes, validates, saves only valid rows\n - Saves invalid rows to a separate '