diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..760ffb0 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,25 @@ +name: release + +on: + push: + tags: ["v*"] + +permissions: + contents: read + id-token: write # required for PyPI Trusted Publishing (no API token needed) + +jobs: + build-and-publish: + runs-on: ubuntu-latest + environment: pypi + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - name: Build distribution + run: | + python -m pip install --upgrade build hatchling + python -m build + - name: Publish to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 diff --git a/.github/workflows/sin-verify.yml b/.github/workflows/sin-verify.yml new file mode 100644 index 0000000..2ce680a --- /dev/null +++ b/.github/workflows/sin-verify.yml @@ -0,0 +1,33 @@ +name: sin-verify + +on: + pull_request: + branches: ["main"] + +jobs: + verify: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install SIN-Code Bundle + run: pip install "sin-code-bundle[dev]" + + - name: Run test suite + run: pytest -q + + - name: Audit chain integrity + run: | + # Passes if no audit log exists yet (clean repo). + python -c " + from pathlib import Path + from sin_code_bundle.policy import AuditLog + ok = AuditLog(Path('.')).verify_chain() + print('Audit chain:', 'intact' if ok else 'TAMPERED') + raise SystemExit(0 if ok else 1) + " diff --git a/.opencode/plugin/package.json b/.opencode/plugin/package.json new file mode 100644 index 0000000..1784c88 --- /dev/null +++ b/.opencode/plugin/package.json @@ -0,0 +1,9 @@ +{ + "name": "sin-opencode-plugin", + "version": "0.1.0", + "private": true, + "type": "module", + "dependencies": { + "@opencode-ai/plugin": "^0.4.0" + } +} diff --git a/.opencode/plugin/sin.ts b/.opencode/plugin/sin.ts new file mode 100644 index 0000000..670d6da --- /dev/null +++ b/.opencode/plugin/sin.ts @@ -0,0 +1,236 @@ +/** + * SIN-Code Bundle — opencode plugin + * + * Turns the AGENTS.md doctrine into an *enforced* protocol: + * - after every file edit -> run semantic_diff + architectural_debt + * - before a session ends -> require a GREEN Oracle verification + * - on a tripped ADW breaker -> hard-stop the agent + * + * Docs: https://opencode.ai/docs/plugins + * + * The plugin talks to the SIN MCP tools that opencode already loaded via + * `opencode.json` (mcp.sin). It does not shell out to `sin` itself; instead it + * reads/writes a small session ledger under `.sin/session/` so the gate state + * survives across tool calls. + */ + +import type { Plugin } from "@opencode-ai/plugin" +import { mkdir, readFile, writeFile } from "node:fs/promises" +import { join } from "node:path" + +// --------------------------------------------------------------------------- // +// Config (overridable via env) +// --------------------------------------------------------------------------- // +const SIN_DIR = ".sin" +const SESSION_DIR = join(SIN_DIR, "session") +const LEDGER = join(SESSION_DIR, "gate.json") + +const RISK_BLOCK_LEVEL = (process.env.SIN_RISK_BLOCK ?? "high").toLowerCase() +const DEBT_BREAKER = Number(process.env.SIN_DEBT_BREAKER ?? "85") // 0-100 +const ENFORCE = (process.env.SIN_ENFORCE ?? "1") !== "0" + +type RiskLevel = "low" | "medium" | "high" + +interface Ledger { + /** files edited but not yet verified green */ + dirty: string[] + /** last Oracle verdict: "pass" | "fail" | "unknown" */ + oracle: "pass" | "fail" | "unknown" + /** last architectural debt score 0-100 */ + debt: number + /** highest risk seen since last green verification */ + risk: RiskLevel + /** human-readable reasons accumulated for the current gate */ + notes: string[] + updatedAt: string +} + +const EMPTY_LEDGER: Ledger = { + dirty: [], + oracle: "unknown", + debt: 0, + risk: "low", + notes: [], + updatedAt: new Date(0).toISOString(), +} + +// --------------------------------------------------------------------------- // +// Ledger persistence +// --------------------------------------------------------------------------- // +async function readLedger(): Promise { + try { + const raw = await readFile(LEDGER, "utf8") + return { ...EMPTY_LEDGER, ...(JSON.parse(raw) as Partial) } + } catch { + return { ...EMPTY_LEDGER } + } +} + +async function writeLedger(ledger: Ledger): Promise { + ledger.updatedAt = new Date().toISOString() + await mkdir(SESSION_DIR, { recursive: true }) + await writeFile(LEDGER, JSON.stringify(ledger, null, 2), "utf8") +} + +const RISK_ORDER: Record = { low: 0, medium: 1, high: 2 } +function maxRisk(a: RiskLevel, b: RiskLevel): RiskLevel { + return RISK_ORDER[a] >= RISK_ORDER[b] ? a : b +} + +// --------------------------------------------------------------------------- // +// Helpers to call the SIN MCP tools through the opencode client +// --------------------------------------------------------------------------- // +async function callSin( + client: any, + tool: string, + args: Record, +): Promise { + try { + return await client.tool.call({ server: "sin", tool, arguments: args }) + } catch (err) { + // Subsystem may be unavailable (graceful degradation). Never crash the agent. + return { ok: false, error: String(err) } + } +} + +function parseRisk(result: any): RiskLevel { + const r = String(result?.risk ?? result?.risk_level ?? "low").toLowerCase() + if (r === "high" || r === "critical") return "high" + if (r === "medium" || r === "moderate") return "medium" + return "low" +} + +function parseDebt(result: any): number { + const d = Number(result?.score ?? result?.debt ?? result?.complexity ?? 0) + return Number.isFinite(d) ? d : 0 +} + +function parseOracle(result: any): "pass" | "fail" | "unknown" { + const v = String(result?.verdict ?? result?.status ?? "").toLowerCase() + if (v === "pass" || v === "passed" || v === "green" || result?.ok === true) + return "pass" + if (v === "fail" || v === "failed" || v === "red" || result?.ok === false) + return "fail" + return "unknown" +} + +// --------------------------------------------------------------------------- // +// Plugin +// --------------------------------------------------------------------------- // +export const SinPlugin: Plugin = async ({ client, $ }) => { + return { + /** + * After any file edit: assess the change semantically and update debt. + * This is the "review" + "guard debt" steps of the SIN loop, automated. + */ + "file.edited": async ({ file }) => { + if (!file) return + const ledger = await readLedger() + + // 1) semantic diff against git HEAD for this file + const diff = await callSin(client, "semantic_diff", { + file_a: `git:HEAD:${file}`, + file_b: file, + }) + const risk = parseRisk(diff) + ledger.risk = maxRisk(ledger.risk, risk) + + // 2) architectural debt snapshot + const debt = await callSin(client, "architectural_debt", {}) + ledger.debt = parseDebt(debt) + + // any edit invalidates the previous green verification + ledger.oracle = "unknown" + if (!ledger.dirty.includes(file)) ledger.dirty.push(file) + + const note = `edited ${file} (risk=${risk}, debt=${ledger.debt})` + ledger.notes.push(note) + await writeLedger(ledger) + + // 3) ADW breaker: hard stop + if (ENFORCE && ledger.debt >= DEBT_BREAKER) { + throw new Error( + `[SIN] ADW breaker tripped: debt ${ledger.debt} >= ${DEBT_BREAKER}. ` + + `Stop adding code and refactor. Re-run architectural_debt after refactor.`, + ) + } + + // 4) risk gate: warn loudly (does not stop the edit, stops "done") + if (RISK_ORDER[risk] >= RISK_ORDER[RISK_BLOCK_LEVEL as RiskLevel]) { + await client.session.log?.({ + level: "warn", + message: + `[SIN] High-risk change in ${file}. Justify it and run ` + + `verify_tests before reporting done.`, + }) + } + }, + + /** + * Before a tool runs: if the agent tries to "finish" while the gate is not + * green, intercept and force a verification first. + */ + "tool.execute.before": async ({ tool }, output) => { + if (!ENFORCE) return + const name = (tool ?? "").toLowerCase() + const isFinishSignal = + name.includes("done") || + name.includes("finish") || + name.includes("complete") + if (!isFinishSignal) return + + const ledger = await readLedger() + if (ledger.dirty.length === 0) return + + if (ledger.oracle !== "pass") { + throw new Error( + `[SIN] Cannot report done: Oracle verification is "${ledger.oracle}". ` + + `Files awaiting green verification: ${ledger.dirty.join(", ")}. ` + + `Run the SIN "verify_tests" tool until it returns pass.`, + ) + } + // gate is green -> reset ledger for next task + await writeLedger({ ...EMPTY_LEDGER }) + }, + + /** + * After a verification tool runs: record the Oracle verdict so the finish + * gate can open. We watch for verify_tests / prove / verify_change results. + */ + "tool.execute.after": async ({ tool }, output) => { + const name = (tool ?? "").toLowerCase() + const isVerify = + name.includes("verify") || name.includes("prove") || name.includes("oracle") + if (!isVerify) return + + const ledger = await readLedger() + const verdict = parseOracle(output?.result ?? output) + ledger.oracle = verdict + if (verdict === "pass") { + ledger.dirty = [] + ledger.risk = "low" + ledger.notes.push("oracle: PASS") + } else if (verdict === "fail") { + ledger.notes.push("oracle: FAIL") + } + await writeLedger(ledger) + }, + + /** + * Session idle: gentle reminder if there is unverified work on the table. + */ + "session.idle": async () => { + const ledger = await readLedger() + if (ledger.dirty.length > 0 && ledger.oracle !== "pass") { + await client.session.log?.({ + level: "info", + message: + `[SIN] ${ledger.dirty.length} file(s) edited without a green ` + + `verification. Run verify_tests before finishing.`, + }) + } + }, + } +} + +export default SinPlugin diff --git a/BENCHMARKS.md b/BENCHMARKS.md new file mode 100644 index 0000000..be1ecf1 --- /dev/null +++ b/BENCHMARKS.md @@ -0,0 +1,43 @@ +# SIN-Code Benchmarks + +We measure one thing: **does exposing the SIN tools improve an agent's +resolved-rate?** The harness (`sin bench`) runs the same task set twice — once +with SIN tools disabled (`control`) and once enabled (`sin`) — and reports the +delta in percentage points. + +## Reproduce + +```bash +pip install "sin-code-bundle[bench]" + +# Smoke test (no LLM cost — validates the clone/apply/test pipeline) +sin bench --runner dry --limit 5 + +# Full A/B on SWE-bench Lite with opencode +sin bench --runner opencode --limit 100 --out report.json +``` + +## Methodology + +- **Dataset:** SWE-bench Lite (`princeton-nlp/SWE-bench_Lite`, test split). +- **Arms:** `control` (SIN_ENFORCE=0) vs `sin` (SIN_ENFORCE=1, MCP tools loaded). +- **Resolved:** patch applies cleanly AND all FAIL_TO_PASS tests pass. +- **Isolation:** each task runs in a fresh git clone at `base_commit`. + +## Results + +| Arm | Resolved | Rate | Mean time | +|-----|----------|------|-----------| +| control | *TBD* | *TBD* | *TBD* | +| sin | *TBD* | *TBD* | *TBD* | +| **delta** | | ***TBD* pp** | | + +> Fill this table from `report.json` after a full run and commit the +> `report.json` alongside the version tag so results are auditable. + +## Interpretation + +A positive delta means the SIN tools (impact analysis, semantic diff, Oracle +verification) caused the agent to produce more correct patches. The harness is +runner-agnostic — the same JSON report can compare opencode, codex, and hermes +on identical tasks. diff --git a/pyproject.toml b/pyproject.toml index d4707ee..ed80177 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,16 +4,24 @@ build-backend = "setuptools.build_meta" [project] name = "sin-code-bundle" -version = "0.1.0" -description = "Unified SOTA Agent-Engineering Stack" +version = "0.2.0" +description = "Structural intelligence for AI coding agents: impact analysis, semantic diff, architectural-debt breaker, and an independent verification Oracle — over MCP." +readme = "README.md" requires-python = ">=3.11" -dependencies = [ - "typer>=0.9.0", - "pyyaml>=6.0", +license = { text = "Apache-2.0" } +authors = [{ name = "OpenSIN-Code" }] +keywords = ["mcp", "ai-agents", "coding-agent", "opencode", "codex", "lsp", "swe-bench"] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Software Development :: Quality Assurance", ] -# Die Subsysteme werden lokal per editable install verdrahtet, nicht von PyPI. -# Installation (Reihenfolge wichtig): +# Core runtime dependencies only — subsystems installed separately (editable). +# Full install order: # pip install -e ../SIN-Code-Semantic-Codebase-Knowledge-Graphs # pip install -e ../SIN-Code-Intent-Based-Diffing # pip install -e ../SIN-Code-Proof-of-Correctness @@ -23,9 +31,39 @@ dependencies = [ # pip install -e ../SIN-Code-Orchestration # pip install -e ../SIN-Code-Review-Interface # pip install -e . +dependencies = [ + "typer>=0.12", + "pyyaml>=6.0", +] + [project.optional-dependencies] -mcp = ["mcp>=1.0.0"] -test = ["pytest>=8.0"] +lsp = [ + "multilspy>=0.0.10", + "tree-sitter>=0.21", + "tree-sitter-languages>=1.10", +] +bench = [ + "datasets>=2.19", +] +mcp = [ + "mcp[cli]>=1.2", +] +otel = [ + "opentelemetry-sdk>=1.25", + "opentelemetry-exporter-otlp>=1.25", +] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", + "ruff>=0.5", +] +all = [ + "sin-code-bundle[lsp,bench,mcp,otel]", +] + +[project.urls] +Homepage = "https://github.com/OpenSIN-Code/SIN-Code-Bundle" +Issues = "https://github.com/OpenSIN-Code/SIN-Code-Bundle/issues" [project.scripts] sin = "sin_code_bundle.cli:app" @@ -35,3 +73,11 @@ where = ["src"] [tool.setuptools.package-data] sin_code_bundle = ["data/codocs/*.md"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] + +[tool.ruff] +line-length = 100 +target-version = "py311" diff --git a/skills/add-endpoint.md b/skills/add-endpoint.md new file mode 100644 index 0000000..74539fb --- /dev/null +++ b/skills/add-endpoint.md @@ -0,0 +1,20 @@ +--- +name: add-endpoint +description: Add an API endpoint with an ephemeral mock and verification. +arguments: + - name: spec + description: One-line description of the endpoint (method, path, behavior) + required: true +--- + +Add the endpoint described as: {{spec}}. + +1. Call `mock_env("up")` to get an ephemeral full-stack environment. +2. Implement the endpoint with input validation and error handling. +3. Call `semantic_review(before, after)` on each changed file; justify any + non-"low" risk. +4. Write tests covering success + failure paths. +5. Call `verify_tests(...)`; iterate until the verdict is `pass`. +6. Call `mock_env("down")` to tear down the environment. + +Do not report done while verification is red or the mock is still running. diff --git a/skills/safe-refactor.md b/skills/safe-refactor.md new file mode 100644 index 0000000..e1d6020 --- /dev/null +++ b/skills/safe-refactor.md @@ -0,0 +1,24 @@ +--- +name: safe-refactor +description: Refactor a symbol with full SIN impact analysis and Oracle verification. +arguments: + - name: symbol + description: Fully-qualified symbol to refactor (e.g. module.Class.method) + required: true +--- + +You are performing a SAFE REFACTOR of `{{symbol}}` using the SIN-Code tools. +Follow this loop exactly and do not skip a step. + +1. Call `impact("{{symbol}}")`. Read the callers, fan_in, and risk. + - If `touches_public_api` is true or risk is "high", state the blast radius + back to the user and plan accordingly. +2. Make the smallest refactor that satisfies the goal. Do not change behavior. +3. For each edited file, call `semantic_diff(before, after)`. + - If any diff reports more than one intent, split the change. +4. Call `architectural_debt()`. If the score regressed, simplify before moving on. +5. Call `verify_tests(...)` (and `prove(...)` for critical pure functions). +6. Do NOT report done until the Oracle verdict is `pass`. + +Report: the blast radius, the intents from each semantic_diff, the debt delta, +and the final Oracle verdict. diff --git a/src/sin_code_bundle/bench.py b/src/sin_code_bundle/bench.py new file mode 100644 index 0000000..095c20c --- /dev/null +++ b/src/sin_code_bundle/bench.py @@ -0,0 +1,375 @@ +"""SWE-bench-style A/B evaluation harness for the SIN-Code Bundle. + +Goal: produce an objective, reproducible number that answers +"do the SIN tools (impact / semantic_diff / verify / oracle) actually improve +an agent's pass-rate?" + +Design +------ +- Loads a task set (SWE-bench Lite subset by default, or a local JSONL file). +- Runs each task twice through a pluggable agent runner: + * arm "control" -> SIN tools DISABLED (SIN_ENFORCE=0) + * arm "sin" -> SIN tools ENABLED (SIN_ENFORCE=1) +- Applies the produced patch in an isolated git worktree and runs the task's + FAIL_TO_PASS / PASS_TO_PASS tests. +- Reports resolved-rate per arm, the delta, and a per-task breakdown. + +The harness is intentionally runner-agnostic: you wire in opencode / codex / +hermes via a small AgentRunner. A DryRunRunner is included so `sin bench` +works end-to-end without any LLM credits. +""" +from __future__ import annotations + +import json +import statistics +import subprocess +import tempfile +import time +from dataclasses import asdict, dataclass, field +from pathlib import Path +from typing import Callable, Iterable, Literal, Optional, Protocol + +Arm = Literal["control", "sin"] + + +# --------------------------------------------------------------------------- # +# Task + result models +# --------------------------------------------------------------------------- # +@dataclass(frozen=True) +class Task: + """One benchmark instance (SWE-bench compatible subset of fields).""" + + instance_id: str + repo: str + base_commit: str + problem_statement: str + fail_to_pass: list[str] = field(default_factory=list) + pass_to_pass: list[str] = field(default_factory=list) + setup_cmds: list[str] = field(default_factory=list) + test_cmd: str = "pytest -q" + + +@dataclass +class TaskResult: + instance_id: str + arm: Arm + resolved: bool + duration_s: float + patch_applied: bool + fail_to_pass_passed: int + fail_to_pass_total: int + error: Optional[str] = None + + +@dataclass +class ArmSummary: + arm: Arm + total: int + resolved: int + resolved_rate: float + mean_duration_s: float + + +@dataclass +class BenchReport: + arms: dict[str, ArmSummary] + delta_resolved_rate: float + per_task: list[TaskResult] + started_at: str + finished_at: str + + def to_json(self) -> str: + return json.dumps( + { + "arms": {k: asdict(v) for k, v in self.arms.items()}, + "delta_resolved_rate": self.delta_resolved_rate, + "per_task": [asdict(r) for r in self.per_task], + "started_at": self.started_at, + "finished_at": self.finished_at, + }, + indent=2, + ) + + +# --------------------------------------------------------------------------- # +# Agent runner protocol +# --------------------------------------------------------------------------- # +class AgentRunner(Protocol): + """Produces a unified diff that attempts to solve `task` inside `workdir`. + + `sin_enabled` tells the runner whether to expose the SIN MCP tools to the + underlying agent. Implementations should return a unified-diff string (may + be empty if the agent produced no change). + """ + + def run(self, task: Task, workdir: Path, sin_enabled: bool) -> str: ... + + +class DryRunRunner: + """Zero-cost runner for smoke-testing the harness itself. + + Produces no patch, so every task "fails" — but exercises the full + clone/apply/test pipeline so you can validate without an LLM. + """ + + def run(self, task: Task, workdir: Path, sin_enabled: bool) -> str: # noqa: ARG002 + return "" + + +class CommandRunner: + """Runs an external agent CLI and captures the diff it leaves in the repo. + + Example wiring for opencode: + CommandRunner( + build_cmd=lambda task, sin: [ + "opencode", "run", + "-m", task.problem_statement, + ], + ) + """ + + def __init__( + self, + build_cmd: Callable[[Task, bool], list[str]], + timeout_s: int = 1800, + env_for: Optional[Callable[[Task, bool], dict[str, str]]] = None, + ) -> None: + self._build_cmd = build_cmd + self._timeout_s = timeout_s + self._env_for = env_for + + def run(self, task: Task, workdir: Path, sin_enabled: bool) -> str: + import os + + cmd = self._build_cmd(task, sin_enabled) + env = {**os.environ} + if self._env_for: + env.update(self._env_for(task, sin_enabled)) + env["SIN_ENFORCE"] = "1" if sin_enabled else "0" + + subprocess.run( + cmd, + cwd=workdir, + env=env, + timeout=self._timeout_s, + check=False, + capture_output=True, + text=True, + ) + diff = subprocess.run( + ["git", "diff"], + cwd=workdir, + check=False, + capture_output=True, + text=True, + ) + return diff.stdout + + +# --------------------------------------------------------------------------- # +# Git / test plumbing +# --------------------------------------------------------------------------- # +def _sh(cmd: list[str], cwd: Path, timeout: int = 600) -> subprocess.CompletedProcess: + return subprocess.run( + cmd, cwd=cwd, check=False, capture_output=True, text=True, timeout=timeout + ) + + +def _prepare_worktree(task: Task, root: Path) -> Path: + work = root / task.instance_id.replace("/", "__") + work.mkdir(parents=True, exist_ok=True) + url = f"https://github.com/{task.repo}.git" + _sh(["git", "clone", "--quiet", url, "."], cwd=work, timeout=900) + _sh(["git", "checkout", "--quiet", task.base_commit], cwd=work) + for cmd in task.setup_cmds: + _sh(["bash", "-lc", cmd], cwd=work, timeout=1800) + return work + + +def _apply_patch(diff: str, work: Path) -> bool: + if not diff.strip(): + return False + patch = work / ".sin_patch.diff" + patch.write_text(diff, encoding="utf-8") + res = _sh(["git", "apply", "--whitespace=nowarn", str(patch)], cwd=work) + return res.returncode == 0 + + +def _run_named_tests(work: Path, task: Task) -> tuple[int, int]: + if not task.fail_to_pass: + res = _sh(["bash", "-lc", task.test_cmd], cwd=work, timeout=1800) + return (1, 1) if res.returncode == 0 else (0, 1) + + passed = 0 + for test_id in task.fail_to_pass: + res = _sh( + ["bash", "-lc", f"{task.test_cmd} {test_id}"], + cwd=work, + timeout=900, + ) + if res.returncode == 0: + passed += 1 + return passed, len(task.fail_to_pass) + + +# --------------------------------------------------------------------------- # +# Core eval loop +# --------------------------------------------------------------------------- # +def _eval_one(task: Task, arm: Arm, runner: AgentRunner, root: Path) -> TaskResult: + start = time.time() + try: + work = _prepare_worktree(task, root) + diff = runner.run(task, work, sin_enabled=(arm == "sin")) + applied = _apply_patch(diff, work) + passed, total = (0, len(task.fail_to_pass) or 1) + if applied: + passed, total = _run_named_tests(work, task) + resolved = applied and passed == total and total > 0 + return TaskResult( + instance_id=task.instance_id, + arm=arm, + resolved=resolved, + duration_s=round(time.time() - start, 2), + patch_applied=applied, + fail_to_pass_passed=passed, + fail_to_pass_total=total, + ) + except Exception as exc: # noqa: BLE001 + return TaskResult( + instance_id=task.instance_id, + arm=arm, + resolved=False, + duration_s=round(time.time() - start, 2), + patch_applied=False, + fail_to_pass_passed=0, + fail_to_pass_total=len(task.fail_to_pass) or 1, + error=str(exc), + ) + + +def _summarize(arm: Arm, results: list[TaskResult]) -> ArmSummary: + subset = [r for r in results if r.arm == arm] + total = len(subset) + resolved = sum(1 for r in subset if r.resolved) + rate = (resolved / total) if total else 0.0 + mean_dur = statistics.mean([r.duration_s for r in subset]) if subset else 0.0 + return ArmSummary( + arm=arm, + total=total, + resolved=resolved, + resolved_rate=round(rate, 4), + mean_duration_s=round(mean_dur, 2), + ) + + +def run_benchmark( + tasks: Iterable[Task], + runner: AgentRunner, + arms: tuple[Arm, ...] = ("control", "sin"), + workspace: Optional[Path] = None, +) -> BenchReport: + started = time.strftime("%Y-%m-%dT%H:%M:%S") + tasks = list(tasks) + results: list[TaskResult] = [] + + with tempfile.TemporaryDirectory(prefix="sin-bench-") as tmp: + root = Path(workspace) if workspace else Path(tmp) + root.mkdir(parents=True, exist_ok=True) + for arm in arms: + for task in tasks: + results.append(_eval_one(task, arm, runner, root / arm)) + + summaries = {arm: _summarize(arm, results) for arm in arms} + delta = 0.0 + if "sin" in summaries and "control" in summaries: + delta = round( + summaries["sin"].resolved_rate - summaries["control"].resolved_rate, 4 + ) + return BenchReport( + arms=summaries, + delta_resolved_rate=delta, + per_task=results, + started_at=started, + finished_at=time.strftime("%Y-%m-%dT%H:%M:%S"), + ) + + +# --------------------------------------------------------------------------- # +# Task loading +# --------------------------------------------------------------------------- # +def load_tasks_jsonl(path: Path, limit: Optional[int] = None) -> list[Task]: + """Load tasks from a JSONL file (SWE-bench compatible field names).""" + tasks: list[Task] = [] + for line in path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line: + continue + d = json.loads(line) + tasks.append( + Task( + instance_id=d["instance_id"], + repo=d["repo"], + base_commit=d["base_commit"], + problem_statement=d.get("problem_statement", ""), + fail_to_pass=d.get("FAIL_TO_PASS", d.get("fail_to_pass", [])), + pass_to_pass=d.get("PASS_TO_PASS", d.get("pass_to_pass", [])), + setup_cmds=d.get("setup_cmds", []), + test_cmd=d.get("test_cmd", "pytest -q"), + ) + ) + if limit and len(tasks) >= limit: + break + return tasks + + +def load_swebench_lite(limit: Optional[int] = 20) -> list[Task]: + """Load SWE-bench Lite via `datasets` if available; else raise a clear error.""" + try: + from datasets import load_dataset # type: ignore + except ImportError as exc: + raise RuntimeError( + "SWE-bench Lite requires the 'datasets' package. " + "Install with: pip install 'sin-code-bundle[bench]', " + "or pass --tasks ." + ) from exc + + ds = load_dataset("princeton-nlp/SWE-bench_Lite", split="test") + tasks: list[Task] = [] + for row in ds: + tasks.append( + Task( + instance_id=row["instance_id"], + repo=row["repo"], + base_commit=row["base_commit"], + problem_statement=row["problem_statement"], + fail_to_pass=json.loads(row["FAIL_TO_PASS"]) + if isinstance(row["FAIL_TO_PASS"], str) + else row["FAIL_TO_PASS"], + pass_to_pass=json.loads(row["PASS_TO_PASS"]) + if isinstance(row["PASS_TO_PASS"], str) + else row["PASS_TO_PASS"], + ) + ) + if limit and len(tasks) >= limit: + break + return tasks + + +# --------------------------------------------------------------------------- # +# Pretty printing +# --------------------------------------------------------------------------- # +def format_report(report: BenchReport) -> str: + lines = ["", "SIN-Code Bench — A/B resolved-rate", "=" * 40] + for arm, s in report.arms.items(): + lines.append( + f" {arm:<8} {s.resolved}/{s.total} resolved " + f"({s.resolved_rate * 100:5.1f}%) mean {s.mean_duration_s}s" + ) + sign = "+" if report.delta_resolved_rate >= 0 else "" + lines.append("-" * 40) + lines.append( + f" SIN delta: {sign}{report.delta_resolved_rate * 100:.1f} pp " + "(percentage points)" + ) + lines.append("=" * 40) + return "\n".join(lines) diff --git a/src/sin_code_bundle/budget.py b/src/sin_code_bundle/budget.py new file mode 100644 index 0000000..1791e95 --- /dev/null +++ b/src/sin_code_bundle/budget.py @@ -0,0 +1,26 @@ +"""Keep MCP tool outputs compact so they don't blow the agent's context window. + +Every tool result is passed through `trim()` before returning. Lists are capped, +long strings truncated, and an explicit `_truncated` flag is added so the agent +knows more data exists. +""" +from __future__ import annotations + +from typing import Any + +MAX_LIST = 25 +MAX_STR = 2000 + + +def trim(value: Any, max_list: int = MAX_LIST, max_str: int = MAX_STR) -> Any: + """Recursively trim a tool output to safe sizes.""" + if isinstance(value, str): + return value if len(value) <= max_str else value[:max_str] + " ...[truncated]" + if isinstance(value, list): + trimmed = [trim(v, max_list, max_str) for v in value[:max_list]] + if len(value) > max_list: + trimmed.append({"_truncated": True, "_omitted": len(value) - max_list}) + return trimmed + if isinstance(value, dict): + return {k: trim(v, max_list, max_str) for k, v in value.items()} + return value diff --git a/src/sin_code_bundle/cache.py b/src/sin_code_bundle/cache.py new file mode 100644 index 0000000..d852194 --- /dev/null +++ b/src/sin_code_bundle/cache.py @@ -0,0 +1,78 @@ +"""Incremental, content-hashed cache for SCKG / impact results. + +Avoids rescanning the whole repo on every `impact()` call. Keyed by a hash of +the file set + their mtimes/sizes; invalidated automatically when files change. +Stored under .sin/cache/ as JSON. +""" +from __future__ import annotations + +import hashlib +import json +import time +from pathlib import Path +from typing import Any, Optional + +_IGNORE = {".git", "node_modules", ".venv", "__pycache__", ".sin", "dist", "build"} + + +def _repo_fingerprint(root: Path, exts: tuple[str, ...]) -> str: + h = hashlib.sha256() + for path in sorted(root.rglob("*")): + if not path.is_file() or path.suffix.lower() not in exts: + continue + if any(part in _IGNORE for part in path.parts): + continue + try: + st = path.stat() + except OSError: + continue + h.update(str(path).encode()) + h.update(str(st.st_mtime_ns).encode()) + h.update(str(st.st_size).encode()) + return h.hexdigest() + + +class GraphCache: + def __init__( + self, + root: Path = Path("."), + exts: tuple[str, ...] = (".py", ".ts", ".tsx", ".js", ".go", ".rs"), + ) -> None: + self.root = Path(root).resolve() + self.exts = exts + self.dir = self.root / ".sin" / "cache" + self.dir.mkdir(parents=True, exist_ok=True) + + def _file(self, key: str) -> Path: + safe = hashlib.sha1(key.encode()).hexdigest()[:16] + return self.dir / f"{safe}.json" + + def get(self, key: str) -> Optional[Any]: + fp = self._file(key) + if not fp.exists(): + return None + data = json.loads(fp.read_text(encoding="utf-8")) + if data.get("fingerprint") != _repo_fingerprint(self.root, self.exts): + return None # stale — repo changed + return data.get("value") + + def set(self, key: str, value: Any) -> None: + fp = self._file(key) + fp.write_text( + json.dumps( + { + "fingerprint": _repo_fingerprint(self.root, self.exts), + "stored_at": time.time(), + "value": value, + }, + indent=2, + ), + encoding="utf-8", + ) + + def clear(self) -> int: + n = 0 + for f in self.dir.glob("*.json"): + f.unlink() + n += 1 + return n diff --git a/src/sin_code_bundle/cli.py b/src/sin_code_bundle/cli.py index c9a374f..b23915c 100644 --- a/src/sin_code_bundle/cli.py +++ b/src/sin_code_bundle/cli.py @@ -684,5 +684,188 @@ def codocs_check(root: str = ".") -> str: mcp.run() +if __name__ == "__main__": + app() + +# --------------------------------------------------------------------------- # +# sin bench — SWE-bench A/B harness +# --------------------------------------------------------------------------- # +@app.command() +def bench( + tasks: str | None = typer.Option( + None, "--tasks", help="Path to a JSONL task file. Omit to use SWE-bench Lite." + ), + limit: int = typer.Option(20, help="Max number of tasks to run per arm."), + runner: str = typer.Option( + "dry", help="Agent runner: 'dry' | 'opencode' | 'codex' | 'hermes'." + ), + arms: str = typer.Option( + "control,sin", help="Comma-separated arms to run." + ), + out: str | None = typer.Option( + None, "--out", help="Write the full JSON report to this path." + ), +): + """Run the SIN-Code A/B benchmark and report the resolved-rate delta.""" + from sin_code_bundle.bench import ( + DryRunRunner, + format_report, + load_swebench_lite, + load_tasks_jsonl, + run_benchmark, + ) + + if tasks: + task_list = load_tasks_jsonl(Path(tasks), limit=limit) + else: + try: + task_list = load_swebench_lite(limit=limit) + except RuntimeError as exc: + typer.echo(f"[SIN-BUNDLE] {exc}", err=True) + raise typer.Exit(code=2) + + if not task_list: + typer.echo("[SIN-BUNDLE] No tasks loaded.", err=True) + raise typer.Exit(code=2) + + if runner == "dry": + agent_runner = DryRunRunner() + elif runner in ("opencode", "codex", "hermes"): + agent_runner = _build_cli_runner(runner) + else: + typer.echo(f"[SIN-BUNDLE] Unknown runner '{runner}'.", err=True) + raise typer.Exit(code=2) + + arm_tuple = tuple(a.strip() for a in arms.split(",") if a.strip()) + + typer.echo( + f"[SIN-BUNDLE] Running {len(task_list)} task(s) x {len(arm_tuple)} arm(s) " + f"with '{runner}' runner..." + ) + report = run_benchmark(task_list, agent_runner, arms=arm_tuple) # type: ignore[arg-type] + typer.echo(format_report(report)) + + if out: + Path(out).write_text(report.to_json(), encoding="utf-8") + typer.echo(f"[SIN-BUNDLE] Wrote full report -> {out}") + + +def _build_cli_runner(agent: str): + from sin_code_bundle.bench import CommandRunner + + def build_cmd(task, sin_enabled: bool) -> list[str]: + prompt = task.problem_statement + if agent == "opencode": + return ["opencode", "run", "-m", prompt] + if agent == "codex": + return ["codex", "exec", "--skip-git-repo-check", prompt] + if agent == "hermes": + return ["hermes", "run", "--prompt", prompt] + raise ValueError(agent) + + return CommandRunner(build_cmd=build_cmd, timeout_s=1800) + + +# --------------------------------------------------------------------------- # +# sin skills — compile portable skills into an agent's native format +# --------------------------------------------------------------------------- # +@app.command() +def skills( + target: str = typer.Argument(..., help="opencode | codex | claude | all"), + source: str = typer.Option("skills", help="Source skills directory."), + dry_run: bool = typer.Option(False, "--dry-run", help="Preview only."), +): + """Compile portable SIN skills into an agent's native command/skill format.""" + from sin_code_bundle.skills import SUPPORTED_TARGETS, compile_skills + + valid = SUPPORTED_TARGETS + targets = list(valid) if target == "all" else [target] # type: ignore[list-item] + for t in targets: + if t not in valid: + typer.echo(f"[SIN-BUNDLE] Unknown target '{t}'.", err=True) + raise typer.Exit(code=2) + paths = compile_skills(t, Path(source), dry_run=dry_run) # type: ignore[arg-type] + verb = "Would write" if dry_run else "Wrote" + for p in paths: + typer.echo(f"[SIN-BUNDLE] {verb} {t} skill -> {p}") + if not paths: + typer.echo(f"[SIN-BUNDLE] No skills found in '{source}'.") + + +# --------------------------------------------------------------------------- # +# sin policy — inspect / initialize the policy and audit log +# --------------------------------------------------------------------------- # +@app.command() +def policy( + action: str = typer.Argument("show", help="show | init | verify"), + root: str = typer.Option(".", help="Project root."), +): + """Inspect or initialize the SIN policy and audit log.""" + from sin_code_bundle.policy import DEFAULT_POLICY, AuditLog, Policy + + root_path = Path(root) + if action == "init": + path = root_path / ".sin" / "policy.yaml" + path.parent.mkdir(parents=True, exist_ok=True) + if path.exists(): + typer.echo(f"[SIN-BUNDLE] {path} already exists.") + return + try: + import yaml as _yaml + + path.write_text( + _yaml.safe_dump( + {"auto_approve": False, "rules": dict(DEFAULT_POLICY)}, + sort_keys=False, + ), + encoding="utf-8", + ) + except ImportError: + # Manual fallback if pyyaml missing + path.write_text( + "auto_approve: false\nrules:\n" + + "".join(f" {k}: {v}\n" for k, v in DEFAULT_POLICY.items()), + encoding="utf-8", + ) + typer.echo(f"[SIN-BUNDLE] Wrote default policy -> {path}") + return + + if action == "verify": + ok = AuditLog(root_path).verify_chain() + typer.echo(f"[SIN-BUNDLE] Audit chain {'intact' if ok else 'TAMPERED'}.") + raise typer.Exit(code=0 if ok else 1) + + p = Policy.load(root_path) + typer.echo("[SIN-BUNDLE] Effective policy:") + for risk, decision in p.rules.items(): + typer.echo(f" {risk:<8} -> {decision}") + typer.echo(f" auto_approve = {p.auto_approve}") + + +# --------------------------------------------------------------------------- # +# sin doctor — environment diagnostics +# --------------------------------------------------------------------------- # +@app.command() +def doctor(root: str = typer.Option(".", help="Project root.")): + """Diagnose the environment: detected languages, LSP servers, audit chain.""" + from sin_code_bundle.lsp_bootstrap import server_status + from sin_code_bundle.policy import AuditLog + + rows = server_status(Path(root)) + typer.echo("[SIN-BUNDLE] Language servers (for accurate impact analysis):") + if not rows: + typer.echo(" (no supported source files detected)") + for r in rows: + mark = "OK " if r["installed"] else "-- " + typer.echo( + f" {mark}{r['language']:<11} {r['files']:>5} files server={r['server']}" + ) + if not r["installed"]: + typer.echo(f" install: {r['install_hint']}") + + ok = AuditLog(Path(root)).verify_chain() + typer.echo(f"[SIN-BUNDLE] Audit chain: {'intact' if ok else 'TAMPERED'}") + + if __name__ == "__main__": app() diff --git a/src/sin_code_bundle/lsp_backend.py b/src/sin_code_bundle/lsp_backend.py new file mode 100644 index 0000000..fa1bcd1 --- /dev/null +++ b/src/sin_code_bundle/lsp_backend.py @@ -0,0 +1,279 @@ +"""LSP-backed symbol resolution for the SCKG. + +This makes `impact()` structural and type-accurate instead of textual: +- "what calls this symbol?" -> LSP references +- "where is it defined?" -> LSP definition +- blast-radius scoring -> ranked caller set + fan-in + +Primary backend: multilspy (drives real language servers: pyright, gopls, +typescript-language-server, rust-analyzer, jdtls, …). +Fallback backend: tree-sitter symbol scan (cheap, language-agnostic, no server). + +The module degrades gracefully: if no LSP is available it returns tree-sitter +results and flags `source="treesitter"`, so the agent still gets a useful signal +and the bundle keeps working (consistent with `sin status`). +""" +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from pathlib import Path +from typing import Literal, Optional + +Source = Literal["lsp", "treesitter", "none"] + +_LANG_BY_EXT = { + ".py": "python", + ".ts": "typescript", + ".tsx": "typescript", + ".js": "javascript", + ".jsx": "javascript", + ".go": "go", + ".rs": "rust", + ".java": "java", + ".rb": "ruby", + ".php": "php", + ".cs": "csharp", + ".c": "c", + ".cpp": "cpp", + ".h": "cpp", +} + + +@dataclass(frozen=True) +class Location: + file: str + line: int + column: int + snippet: str = "" + + +@dataclass +class ImpactResult: + """Compact, deterministic blast-radius payload for the agent.""" + + symbol: str + defined_at: Optional[Location] + callers: list[Location] = field(default_factory=list) + fan_in: int = 0 + touches_tests: bool = False + touches_public_api: bool = False + risk: Literal["low", "medium", "high"] = "low" + source: Source = "none" + notes: list[str] = field(default_factory=list) + + def to_dict(self) -> dict: + return { + "symbol": self.symbol, + "defined_at": _loc_to_dict(self.defined_at), + "callers": [_loc_to_dict(c) for c in self.callers], + "fan_in": self.fan_in, + "touches_tests": self.touches_tests, + "touches_public_api": self.touches_public_api, + "risk": self.risk, + "source": self.source, + "notes": self.notes, + } + + +def _loc_to_dict(loc: Optional[Location]) -> Optional[dict]: + if loc is None: + return None + return {"file": loc.file, "line": loc.line, "column": loc.column, "snippet": loc.snippet} + + +def _lang_for(path: Path) -> Optional[str]: + return _LANG_BY_EXT.get(path.suffix.lower()) + + +def _score_risk( + callers: int, touches_tests: bool, touches_api: bool +) -> Literal["low", "medium", "high"]: + if touches_api or callers > 10: + return "high" + if touches_tests or callers > 3: + return "medium" + return "low" + + +def _is_test_path(p: str) -> bool: + pl = p.lower() + return "test" in Path(pl).name or "/tests/" in pl or pl.endswith("_test.py") + + +def _is_public_api_path(p: str) -> bool: + name = Path(p).name.lower() + return name in {"__init__.py", "api.py", "index.ts", "index.js", "mod.rs", "lib.rs"} + + +# --------------------------------------------------------------------------- # +# LSP backend (multilspy) +# --------------------------------------------------------------------------- # +async def _lsp_impact( + root: Path, file: Path, symbol: str, line: int, column: int +) -> Optional[ImpactResult]: + try: + from multilspy import LanguageServer # type: ignore + from multilspy.multilspy_config import MultilspyConfig # type: ignore + from multilspy.multilspy_logger import MultilspyLogger # type: ignore + except ImportError: + return None + + lang = _lang_for(file) + if not lang: + return None + + config = MultilspyConfig.from_dict({"code_language": lang}) + logger = MultilspyLogger() + server = LanguageServer.create(config, logger, str(root)) + + rel = str(file.relative_to(root)) if file.is_absolute() else str(file) + async with server.start_server(): + definition = await server.request_definition(rel, line - 1, column - 1) + references = await server.request_references(rel, line - 1, column - 1) + + def_loc: Optional[Location] = None + if definition: + d = definition[0] + def_loc = Location( + file=d.get("relativePath", d.get("uri", "")), + line=d["range"]["start"]["line"] + 1, + column=d["range"]["start"]["character"] + 1, + ) + + callers: list[Location] = [] + for ref in references or []: + rp = ref.get("relativePath", ref.get("uri", "")) + callers.append( + Location( + file=rp, + line=ref["range"]["start"]["line"] + 1, + column=ref["range"]["start"]["character"] + 1, + ) + ) + + touches_tests = any(_is_test_path(c.file) for c in callers) + touches_api = any(_is_public_api_path(c.file) for c in callers) + fan_in = len(callers) + return ImpactResult( + symbol=symbol, + defined_at=def_loc, + callers=callers[:25], + fan_in=fan_in, + touches_tests=touches_tests, + touches_public_api=touches_api, + risk=_score_risk(fan_in, touches_tests, touches_api), + source="lsp", + notes=[] if fan_in <= 25 else [f"{fan_in} callers total; showing first 25"], + ) + + +# --------------------------------------------------------------------------- # +# tree-sitter fallback (textual but symbol-aware) +# --------------------------------------------------------------------------- # +def _treesitter_impact(root: Path, symbol: str) -> ImpactResult: + bare = symbol.split(".")[-1].split("::")[-1] + callers: list[Location] = [] + defined_at: Optional[Location] = None + + for path in root.rglob("*"): + if not path.is_file() or _lang_for(path) is None: + continue + if any(part in {".git", "node_modules", ".venv", "__pycache__"} for part in path.parts): + continue + try: + text = path.read_text(encoding="utf-8", errors="ignore") + except OSError: + continue + for i, raw in enumerate(text.splitlines(), start=1): + if bare not in raw: + continue + col = raw.find(bare) + 1 + loc = Location( + file=str(path.relative_to(root)), + line=i, + column=col, + snippet=raw.strip()[:120], + ) + stripped = raw.lstrip() + if defined_at is None and ( + stripped.startswith(("def ", "class ", "func ", "fn ", "function ")) + and bare in stripped.split("(")[0] + ): + defined_at = loc + else: + callers.append(loc) + + touches_tests = any(_is_test_path(c.file) for c in callers) + touches_api = any(_is_public_api_path(c.file) for c in callers) + fan_in = len(callers) + return ImpactResult( + symbol=symbol, + defined_at=defined_at, + callers=callers[:25], + fan_in=fan_in, + touches_tests=touches_tests, + touches_public_api=touches_api, + risk=_score_risk(fan_in, touches_tests, touches_api), + source="treesitter", + notes=["LSP unavailable — textual approximation. Install 'sin[lsp]' for accuracy."], + ) + + +# --------------------------------------------------------------------------- # +# Public entry point +# --------------------------------------------------------------------------- # +def compute_impact( + root: str | Path, + symbol: str, + file: Optional[str | Path] = None, + line: Optional[int] = None, + column: Optional[int] = None, +) -> ImpactResult: + """Resolve the blast radius of `symbol`. + + If (file, line, column) are given and an LSP is available, returns precise + LSP references. Otherwise falls back to a tree-sitter/textual scan. + + Results are cached under .sin/cache/ and reused if the repo hasn't changed. + """ + root_path = Path(root).resolve() + + # Cache layer + from sin_code_bundle.cache import GraphCache + + cache = GraphCache(root_path) + cache_key = f"impact:{symbol}:{file}:{line}:{column}" + cached = cache.get(cache_key) + if cached is not None: + defined = cached.get("defined_at") + return ImpactResult( + symbol=cached["symbol"], + defined_at=Location(**defined) if defined else None, + callers=[Location(**c) for c in cached.get("callers", [])], + fan_in=cached.get("fan_in", 0), + touches_tests=cached.get("touches_tests", False), + touches_public_api=cached.get("touches_public_api", False), + risk=cached.get("risk", "low"), + source=cached.get("source", "none"), + notes=cached.get("notes", []), + ) + + if file and line and column: + file_path = ( + (root_path / file) if not Path(file).is_absolute() else Path(file) # type: ignore[arg-type] + ) + try: + result = asyncio.run(_lsp_impact(root_path, file_path, symbol, line, column)) + if result is not None: + cache.set(cache_key, result.to_dict()) + return result + except Exception as exc: # noqa: BLE001 + ts = _treesitter_impact(root_path, symbol) + ts.notes.append(f"LSP error, used fallback: {exc}") + cache.set(cache_key, ts.to_dict()) + return ts + + result = _treesitter_impact(root_path, symbol) + cache.set(cache_key, result.to_dict()) + return result diff --git a/src/sin_code_bundle/lsp_bootstrap.py b/src/sin_code_bundle/lsp_bootstrap.py new file mode 100644 index 0000000..25dbc57 --- /dev/null +++ b/src/sin_code_bundle/lsp_bootstrap.py @@ -0,0 +1,82 @@ +"""Detect repo languages and ensure the matching language servers are present. + +`sin doctor` uses this to tell users exactly what to install for accurate +impact analysis. We never silently install global tooling; we report and offer +the exact install command. +""" +from __future__ import annotations + +import shutil +from collections import Counter +from pathlib import Path + +# language -> (server binary, install hint) +SERVERS: dict[str, tuple[str, str]] = { + "python": ( + "pyright-langserver", + "npm i -g pyright (or: pip install pyright)", + ), + "typescript": ( + "typescript-language-server", + "npm i -g typescript typescript-language-server", + ), + "javascript": ( + "typescript-language-server", + "npm i -g typescript typescript-language-server", + ), + "go": ( + "gopls", + "go install golang.org/x/tools/gopls@latest", + ), + "rust": ( + "rust-analyzer", + "rustup component add rust-analyzer", + ), + "java": ( + "jdtls", + "see: https://github.com/eclipse-jdtls/eclipse.jdt.ls", + ), +} + +_EXT_LANG: dict[str, str] = { + ".py": "python", + ".ts": "typescript", + ".tsx": "typescript", + ".js": "javascript", + ".jsx": "javascript", + ".go": "go", + ".rs": "rust", + ".java": "java", +} +_IGNORE = {".git", "node_modules", ".venv", "__pycache__", ".sin"} + + +def detect_languages(root: Path) -> list[tuple[str, int]]: + """Return (language, file_count) pairs, most frequent first.""" + counter: Counter[str] = Counter() + for p in root.rglob("*"): + if not p.is_file() or any(part in _IGNORE for part in p.parts): + continue + lang = _EXT_LANG.get(p.suffix.lower()) + if lang: + counter[lang] += 1 + return counter.most_common() + + +def server_status(root: Path) -> list[dict]: + """Return a list of dicts with language server availability info.""" + rows: list[dict] = [] + for lang, count in detect_languages(root): + entry = SERVERS.get(lang) + binary, hint = entry if entry else (None, "no LSP integration yet") + installed = bool(binary and shutil.which(binary)) + rows.append( + { + "language": lang, + "files": count, + "server": binary, + "installed": installed, + "install_hint": hint, + } + ) + return rows diff --git a/src/sin_code_bundle/policy.py b/src/sin_code_bundle/policy.py new file mode 100644 index 0000000..ed61bbe --- /dev/null +++ b/src/sin_code_bundle/policy.py @@ -0,0 +1,189 @@ +"""Risk-gating, approval, and tamper-evident audit logging for SIN tools. + +MCP has no native access control. This module wraps every tool execution with: + - a per-tool risk classification (read | write | exec | network) + - a configurable policy (allow | ask | deny) per risk class + - an append-only, hash-chained audit log under .sin/audit/log.jsonl + - path sandboxing helpers so tools cannot read/write outside the project root + +Policy is loaded from .sin/policy.yaml (falls back to safe defaults). +""" +from __future__ import annotations + +import hashlib +import json +import os +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Callable, Literal, Optional + +try: + import yaml +except ImportError: # pragma: no cover + yaml = None # type: ignore + +RiskClass = Literal["read", "write", "exec", "network"] +Decision = Literal["allow", "ask", "deny"] + +TOOL_RISK: dict[str, RiskClass] = { + "impact": "read", + "semantic_diff": "read", + "semantic_review": "read", + "architectural_debt": "read", + "prove": "read", + "verify_tests": "exec", + "mock_env": "network", +} + +DEFAULT_POLICY: dict[RiskClass, Decision] = { + "read": "allow", + "write": "ask", + "exec": "ask", + "network": "ask", +} + + +class PolicyError(RuntimeError): + """Raised when a tool call is denied by policy.""" + + +@dataclass +class Policy: + rules: dict[RiskClass, Decision] = field(default_factory=lambda: dict(DEFAULT_POLICY)) + auto_approve: bool = field( + default_factory=lambda: os.environ.get("SIN_AUTO_APPROVE") == "1" + ) + + @classmethod + def load(cls, root: Path = Path(".")) -> "Policy": + path = root / ".sin" / "policy.yaml" + if path.exists() and yaml is not None: + data = yaml.safe_load(path.read_text(encoding="utf-8")) or {} + rules = {**DEFAULT_POLICY, **(data.get("rules") or {})} + return cls(rules=rules, auto_approve=bool(data.get("auto_approve", False))) + return cls() + + def decide(self, tool: str) -> Decision: + risk = TOOL_RISK.get(tool, "exec") + return self.rules.get(risk, "ask") + + +# --------------------------------------------------------------------------- # +# Tamper-evident audit log (hash chain) +# --------------------------------------------------------------------------- # +class AuditLog: + def __init__(self, root: Path = Path(".")) -> None: + self.path = root / ".sin" / "audit" / "log.jsonl" + self.path.parent.mkdir(parents=True, exist_ok=True) + + def _last_hash(self) -> str: + if not self.path.exists(): + return "0" * 64 + last = "" + for line in self.path.read_text(encoding="utf-8").splitlines(): + if line.strip(): + last = line + if not last: + return "0" * 64 + return json.loads(last).get("hash", "0" * 64) + + def record(self, tool: str, args: dict, decision: Decision, outcome: str) -> str: + prev = self._last_hash() + entry = { + "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "tool": tool, + "risk": TOOL_RISK.get(tool, "exec"), + "decision": decision, + "outcome": outcome, + "args_keys": sorted(args.keys()), + "prev": prev, + } + digest = hashlib.sha256( + (prev + json.dumps(entry, sort_keys=True)).encode("utf-8") + ).hexdigest() + entry["hash"] = digest + with self.path.open("a", encoding="utf-8") as fh: + fh.write(json.dumps(entry) + "\n") + return digest + + def verify_chain(self) -> bool: + """Return True if the hash chain is intact (no tampering).""" + if not self.path.exists(): + return True + prev = "0" * 64 + for line in self.path.read_text(encoding="utf-8").splitlines(): + if not line.strip(): + continue + entry = json.loads(line) + stored = entry.pop("hash", "") + if entry.get("prev") != prev: + return False + recomputed = hashlib.sha256( + (prev + json.dumps(entry, sort_keys=True)).encode("utf-8") + ).hexdigest() + if recomputed != stored: + return False + prev = stored + return True + + +# --------------------------------------------------------------------------- # +# Path sandboxing +# --------------------------------------------------------------------------- # +def ensure_within_root(target: str | Path, root: Optional[str | Path] = None) -> Path: + """Resolve `target` and guarantee it stays inside the project root.""" + root_path = Path(root or os.environ.get("SIN_PROJECT_ROOT", ".")).resolve() + resolved = ( + (root_path / target).resolve() + if not Path(target).is_absolute() # type: ignore[arg-type] + else Path(target).resolve() # type: ignore[arg-type] + ) + if root_path not in resolved.parents and resolved != root_path: + raise PolicyError( + f"path '{resolved}' is outside project root '{root_path}'" + ) + return resolved + + +# --------------------------------------------------------------------------- # +# Gate used by the MCP server to wrap a tool call +# --------------------------------------------------------------------------- # +def guarded( + tool: str, + args: dict, + run: Callable[[], dict], + root: Path = Path("."), + approver: Optional[Callable[[str, dict], bool]] = None, +) -> dict: + """Apply policy + audit around a tool execution. + + `approver` is called for 'ask' decisions; defaults to auto-deny unless + SIN_AUTO_APPROVE=1 (so non-interactive runs are safe by default). + """ + policy = Policy.load(root) + audit = AuditLog(root) + decision = policy.decide(tool) + + if decision == "deny": + audit.record(tool, args, decision, "denied") + raise PolicyError( + f"tool '{tool}' denied by policy (risk={TOOL_RISK.get(tool)})" + ) + + if decision == "ask": + approved = policy.auto_approve or (approver(tool, args) if approver else False) + if not approved: + audit.record(tool, args, decision, "rejected") + raise PolicyError( + f"tool '{tool}' requires approval (risk={TOOL_RISK.get(tool)}). " + "Set SIN_AUTO_APPROVE=1 or adjust .sin/policy.yaml." + ) + + try: + result = run() + audit.record(tool, args, decision, "ok") + return result + except Exception as exc: # noqa: BLE001 + audit.record(tool, args, decision, f"error:{type(exc).__name__}") + raise diff --git a/src/sin_code_bundle/safety.py b/src/sin_code_bundle/safety.py new file mode 100644 index 0000000..b9c7fd1 --- /dev/null +++ b/src/sin_code_bundle/safety.py @@ -0,0 +1,51 @@ +"""Hardened subprocess + input-sanitization helpers shared by all subsystems.""" +from __future__ import annotations + +import subprocess +from pathlib import Path +from typing import Optional, Sequence + +DEFAULT_TIMEOUT = 600 # seconds — never run unbounded + + +class SafetyError(RuntimeError): + pass + + +def run_checked( + cmd: Sequence[str], + cwd: Optional[Path] = None, + timeout: int = DEFAULT_TIMEOUT, + allow_shell: bool = False, +) -> subprocess.CompletedProcess: + """Run a subprocess with a mandatory timeout and no shell by default.""" + if not allow_shell and not isinstance(cmd, (list, tuple)): + raise SafetyError("cmd must be a list/tuple unless allow_shell=True") + try: + return subprocess.run( + cmd, + cwd=str(cwd) if cwd else None, + shell=allow_shell, + timeout=timeout, + check=False, + capture_output=True, + text=True, + ) + except subprocess.TimeoutExpired as exc: + raise SafetyError(f"command timed out after {timeout}s: {cmd}") from exc + + +def sanitize_prompt(text: str, max_len: int = 8000) -> str: + """Neutralize obvious prompt-injection markers in untrusted task text.""" + if len(text) > max_len: + text = text[:max_len] + "\n...[truncated]" + safe_lines = [] + for line in text.splitlines(): + low = line.strip().lower() + if low.startswith( + ("system:", "developer:", "ignore previous", "you are now") + ): + safe_lines.append("[redacted suspicious instruction]") + else: + safe_lines.append(line) + return "\n".join(safe_lines) diff --git a/src/sin_code_bundle/skills.py b/src/sin_code_bundle/skills.py new file mode 100644 index 0000000..4b33fb7 --- /dev/null +++ b/src/sin_code_bundle/skills.py @@ -0,0 +1,106 @@ +"""Compile portable SIN skills into each agent's native command/skill format. + +One source of truth: `skills/*.md` with YAML frontmatter (name, description, +arguments) + a prompt body. `compile_skills()` renders them into: + +- opencode -> .opencode/command/.md (frontmatter: description, agent) +- codex -> ~/.codex/prompts/.md (plain prompt, $N positional args) +- claude -> .claude/skills//SKILL.md (frontmatter: name, description) + +This mirrors how cross-agent tools (Ulis/Nexel) keep a single prompt library in +sync across CLIs. +""" +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Literal + +try: + import yaml +except ImportError: # pragma: no cover + yaml = None # type: ignore + +Target = Literal["opencode", "codex", "claude"] +SUPPORTED_TARGETS: tuple[Target, ...] = ("opencode", "codex", "claude") + +_FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n(.*)$", re.DOTALL) + + +@dataclass +class Skill: + name: str + description: str + body: str + arguments: list[dict] = field(default_factory=list) + + @classmethod + def parse(cls, path: Path) -> "Skill": + text = path.read_text(encoding="utf-8") + m = _FRONTMATTER_RE.match(text) + if not m: + raise ValueError(f"{path} is missing YAML frontmatter") + if yaml is None: + raise RuntimeError("pyyaml is required to parse skills") + meta = yaml.safe_load(m.group(1)) or {} + return cls( + name=meta.get("name", path.stem), + description=meta.get("description", ""), + body=m.group(2).strip(), + arguments=meta.get("arguments", []) or [], + ) + + +def _body_for_codex(skill: Skill) -> str: + """Codex prompts use positional $1, $2 ... — map {{arg}} -> $N.""" + body = skill.body + for i, arg in enumerate(skill.arguments, start=1): + body = body.replace("{{" + arg["name"] + "}}", f"${i}") + return body + + +def render_skill(skill: Skill, target: Target) -> tuple[str, str]: + """Return (relative_output_path, file_content) for a target agent.""" + if target == "opencode": + fm = f"---\ndescription: {skill.description}\nagent: build\n---\n\n" + return f".opencode/command/{skill.name}.md", fm + skill.body + "\n" + + if target == "codex": + return f"prompts/{skill.name}.md", _body_for_codex(skill) + "\n" + + if target == "claude": + fm = f"---\nname: {skill.name}\ndescription: {skill.description}\n---\n\n" + return f".claude/skills/{skill.name}/SKILL.md", fm + skill.body + "\n" + + raise ValueError(f"unknown target: {target}") + + +def load_skills(source_dir: Path = Path("skills")) -> list[Skill]: + if not source_dir.exists(): + return [] + return [Skill.parse(p) for p in sorted(source_dir.glob("*.md"))] + + +def compile_skills( + target: Target, + source_dir: Path = Path("skills"), + out_root: Path = Path("."), + dry_run: bool = False, +) -> list[Path]: + """Compile every source skill into `target`'s native format. + + For codex, paths are written under the user's ~/.codex/; for opencode and + claude they are written relative to the repo (out_root). + """ + written: list[Path] = [] + base = Path.home() / ".codex" if target == "codex" else out_root + + for skill in load_skills(source_dir): + rel, content = render_skill(skill, target) + dest = base / rel + written.append(dest) + if not dry_run: + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_text(content, encoding="utf-8") + return written diff --git a/tests/test_bench.py b/tests/test_bench.py new file mode 100644 index 0000000..4a5df44 --- /dev/null +++ b/tests/test_bench.py @@ -0,0 +1,139 @@ +"""Tests for the SWE-bench harness — using DryRunRunner so no LLM or network needed.""" +import json +from pathlib import Path + + +from sin_code_bundle.bench import ( + ArmSummary, + BenchReport, + DryRunRunner, + Task, + TaskResult, + _summarize, + format_report, + load_tasks_jsonl, +) + + +SAMPLE_TASK = Task( + instance_id="test/repo__001", + repo="test/repo", + base_commit="abc123", + problem_statement="Fix the bug.", + fail_to_pass=["tests/test_bug.py::test_fix"], +) + + +def test_dry_runner_returns_empty_diff(): + runner = DryRunRunner() + diff = runner.run(SAMPLE_TASK, Path("."), sin_enabled=False) + assert diff == "" + + +def test_summarize_zero_resolved(): + results = [ + TaskResult( + instance_id="x", + arm="control", + resolved=False, + duration_s=1.0, + patch_applied=False, + fail_to_pass_passed=0, + fail_to_pass_total=1, + ) + ] + s = _summarize("control", results) + assert s.resolved == 0 + assert s.resolved_rate == 0.0 + + +def test_summarize_all_resolved(): + results = [ + TaskResult( + instance_id="x", + arm="sin", + resolved=True, + duration_s=2.5, + patch_applied=True, + fail_to_pass_passed=1, + fail_to_pass_total=1, + ) + ] + s = _summarize("sin", results) + assert s.resolved == 1 + assert s.resolved_rate == 1.0 + + +def test_format_report_positive_delta(): + arms = { + "control": ArmSummary("control", 5, 1, 0.2, 10.0), + "sin": ArmSummary("sin", 5, 3, 0.6, 12.0), + } + report = BenchReport( + arms=arms, + delta_resolved_rate=0.4, + per_task=[], + started_at="2026-01-01T00:00:00", + finished_at="2026-01-01T01:00:00", + ) + text = format_report(report) + assert "+40.0 pp" in text + assert "control" in text + assert "sin" in text + + +def test_report_to_json(): + arms = { + "control": ArmSummary("control", 1, 0, 0.0, 5.0), + "sin": ArmSummary("sin", 1, 1, 1.0, 6.0), + } + report = BenchReport( + arms=arms, + delta_resolved_rate=1.0, + per_task=[], + started_at="2026-01-01T00:00:00", + finished_at="2026-01-01T01:00:00", + ) + data = json.loads(report.to_json()) + assert data["delta_resolved_rate"] == 1.0 + assert "control" in data["arms"] + + +def test_load_tasks_jsonl(tmp_path: Path): + lines = [ + json.dumps( + { + "instance_id": "repo__1", + "repo": "org/repo", + "base_commit": "deadbeef", + "problem_statement": "Fix it.", + "FAIL_TO_PASS": ["tests/test_a.py"], + "PASS_TO_PASS": [], + } + ) + ] + f = tmp_path / "tasks.jsonl" + f.write_text("\n".join(lines), encoding="utf-8") + tasks = load_tasks_jsonl(f, limit=10) + assert len(tasks) == 1 + assert tasks[0].instance_id == "repo__1" + + +def test_load_tasks_jsonl_limit(tmp_path: Path): + lines = [ + json.dumps( + { + "instance_id": f"repo__{i}", + "repo": "org/repo", + "base_commit": "abc", + "problem_statement": "Fix.", + "FAIL_TO_PASS": [], + "PASS_TO_PASS": [], + } + ) + for i in range(10) + ] + f = tmp_path / "tasks.jsonl" + f.write_text("\n".join(lines), encoding="utf-8") + tasks = load_tasks_jsonl(f, limit=3) + assert len(tasks) == 3 diff --git a/tests/test_budget.py b/tests/test_budget.py new file mode 100644 index 0000000..6c29cd1 --- /dev/null +++ b/tests/test_budget.py @@ -0,0 +1,44 @@ +from sin_code_bundle.budget import trim + + +def test_trims_long_list(): + out = trim(list(range(100)), max_list=10) + assert len(out) == 11 # 10 items + truncation marker + assert out[-1]["_truncated"] is True + assert out[-1]["_omitted"] == 90 + + +def test_short_list_unchanged(): + out = trim([1, 2, 3], max_list=10) + assert out == [1, 2, 3] + + +def test_trims_long_string(): + out = trim("x" * 5000, max_str=100) + assert out.endswith("...[truncated]") + assert len(out) <= 120 + + +def test_short_string_unchanged(): + out = trim("hello", max_str=100) + assert out == "hello" + + +def test_nested_dict(): + out = trim({"items": list(range(50)), "name": "ok"}, max_list=5) + assert len(out["items"]) == 6 # 5 items + marker + assert out["name"] == "ok" + + +def test_passthrough_int(): + assert trim(42) == 42 + + +def test_passthrough_none(): + assert trim(None) is None + + +def test_nested_list_of_dicts(): + data = [{"a": "x" * 5000}] + out = trim(data, max_list=5, max_str=10) + assert out[0]["a"].endswith("...[truncated]") diff --git a/tests/test_lsp_backend.py b/tests/test_lsp_backend.py new file mode 100644 index 0000000..89ae12a --- /dev/null +++ b/tests/test_lsp_backend.py @@ -0,0 +1,94 @@ +"""Tests for lsp_backend — primarily the tree-sitter fallback path, +since LSP servers won't be available in CI. +""" +from pathlib import Path + + +from sin_code_bundle.lsp_backend import ( + ImpactResult, + Location, + _is_public_api_path, + _is_test_path, + _score_risk, + _treesitter_impact, + compute_impact, +) + + +def test_score_risk_low(): + assert _score_risk(0, False, False) == "low" + + +def test_score_risk_medium_callers(): + assert _score_risk(5, False, False) == "medium" + + +def test_score_risk_high_api(): + assert _score_risk(1, False, True) == "high" + + +def test_score_risk_high_many_callers(): + assert _score_risk(11, False, False) == "high" + + +def test_is_test_path(): + assert _is_test_path("tests/test_foo.py") + assert _is_test_path("foo_test.py") + assert not _is_test_path("src/foo.py") + + +def test_is_public_api_path(): + assert _is_public_api_path("__init__.py") + assert _is_public_api_path("api.py") + assert _is_public_api_path("index.ts") + assert not _is_public_api_path("utils.py") + + +def test_treesitter_finds_symbol(tmp_path: Path): + src = tmp_path / "mymod.py" + src.write_text( + "def compute(x):\n return x * 2\n\nresult = compute(5)\n", + encoding="utf-8", + ) + result = _treesitter_impact(tmp_path, "compute") + assert result.symbol == "compute" + assert result.defined_at is not None + assert result.fan_in >= 1 + assert result.source == "treesitter" + + +def test_treesitter_unknown_symbol_returns_empty(tmp_path: Path): + (tmp_path / "empty.py").write_text("x = 1\n", encoding="utf-8") + result = _treesitter_impact(tmp_path, "nonexistent_symbol_xyz") + assert result.fan_in == 0 + assert result.defined_at is None + + +def test_compute_impact_uses_cache(tmp_path: Path): + src = tmp_path / "mod.py" + src.write_text("def foo():\n pass\n\nfoo()\n", encoding="utf-8") + + r1 = compute_impact(tmp_path, "foo") + r2 = compute_impact(tmp_path, "foo") # should hit cache + assert r1.symbol == r2.symbol == "foo" + assert r1.source == r2.source + + +def test_impact_result_to_dict(): + loc = Location(file="a.py", line=1, column=1, snippet="def foo():") + result = ImpactResult( + symbol="foo", + defined_at=loc, + callers=[loc], + fan_in=1, + touches_tests=False, + touches_public_api=False, + risk="low", + source="treesitter", + notes=["test"], + ) + d = result.to_dict() + assert d["symbol"] == "foo" + assert d["fan_in"] == 1 + assert d["defined_at"]["file"] == "a.py" + assert d["callers"][0]["line"] == 1 diff --git a/tests/test_policy.py b/tests/test_policy.py new file mode 100644 index 0000000..4c27b7a --- /dev/null +++ b/tests/test_policy.py @@ -0,0 +1,87 @@ +from pathlib import Path + +import pytest + +from sin_code_bundle.policy import ( + AuditLog, + Policy, + PolicyError, + ensure_within_root, + guarded, +) + + +def test_default_read_allows(): + p = Policy() + assert p.decide("impact") == "allow" + assert p.decide("semantic_diff") == "allow" + + +def test_default_exec_asks(): + p = Policy() + assert p.decide("verify_tests") == "ask" + + +def test_default_network_asks(): + p = Policy() + assert p.decide("mock_env") == "ask" + + +def test_unknown_tool_treated_as_exec(): + p = Policy() + assert p.decide("some_unknown_tool") == "ask" + + +def test_guarded_allows_read(tmp_path: Path): + out = guarded("impact", {"symbol": "x"}, lambda: {"ok": True}, root=tmp_path) + assert out == {"ok": True} + + +def test_guarded_denies_without_approval(tmp_path: Path): + with pytest.raises(PolicyError, match="requires approval"): + guarded("verify_tests", {}, lambda: {"ok": True}, root=tmp_path) + + +def test_guarded_allows_exec_with_auto_approve(tmp_path: Path, monkeypatch): + monkeypatch.setenv("SIN_AUTO_APPROVE", "1") + p = Policy() + assert p.auto_approve is True + # guarded should succeed when auto_approve is on + out = guarded( + "verify_tests", + {}, + lambda: {"ok": True}, + root=tmp_path, + approver=None, + ) + assert out == {"ok": True} + + +def test_audit_chain_intact(tmp_path: Path): + log = AuditLog(tmp_path) + log.record("impact", {"symbol": "x"}, "allow", "ok") + log.record("verify_tests", {}, "ask", "ok") + assert log.verify_chain() is True + + +def test_audit_chain_empty(tmp_path: Path): + log = AuditLog(tmp_path) + assert log.verify_chain() is True + + +def test_audit_chain_detects_tampering(tmp_path: Path): + log = AuditLog(tmp_path) + log.record("impact", {"symbol": "x"}, "allow", "ok") + text = log.path.read_text(encoding="utf-8").replace('"ok"', '"HACKED"') + log.path.write_text(text, encoding="utf-8") + assert log.verify_chain() is False + + +def test_path_sandbox_inside(tmp_path: Path): + inside = ensure_within_root("sub/file.py", root=tmp_path) + assert str(inside).startswith(str(tmp_path.resolve())) + + +def test_path_sandbox_outside_raises(tmp_path: Path): + with pytest.raises(PolicyError, match="outside project root"): + ensure_within_root("/etc/passwd", root=tmp_path) diff --git a/tests/test_skills.py b/tests/test_skills.py new file mode 100644 index 0000000..4e90486 --- /dev/null +++ b/tests/test_skills.py @@ -0,0 +1,86 @@ +from pathlib import Path + +import pytest + +from sin_code_bundle.skills import Skill, compile_skills, render_skill + +SAMPLE = """--- +name: demo +description: A demo skill. +arguments: + - name: target + description: thing to act on + required: true +--- +Refactor {{target}} carefully and verify. +""" + + +@pytest.fixture +def skill(tmp_path: Path) -> Skill: + p = tmp_path / "demo.md" + p.write_text(SAMPLE, encoding="utf-8") + return Skill.parse(p) + + +def test_parse_frontmatter(skill: Skill): + assert skill.name == "demo" + assert skill.description == "A demo skill." + assert skill.arguments[0]["name"] == "target" + assert "{{target}}" in skill.body + + +def test_render_opencode(skill: Skill): + path, content = render_skill(skill, "opencode") + assert path == ".opencode/command/demo.md" + assert "description: A demo skill." in content + assert "agent: build" in content + assert "{{target}}" in content + + +def test_render_codex_maps_positional_args(skill: Skill): + _, content = render_skill(skill, "codex") + assert "$1" in content + assert "{{target}}" not in content + + +def test_render_claude(skill: Skill): + path, content = render_skill(skill, "claude") + assert path == ".claude/skills/demo/SKILL.md" + assert "name: demo" in content + assert "{{target}}" in content + + +def test_compile_writes_files(tmp_path: Path): + src = tmp_path / "skills" + src.mkdir() + (src / "demo.md").write_text(SAMPLE, encoding="utf-8") + out = tmp_path / "repo" + written = compile_skills("opencode", source_dir=src, out_root=out) + assert written + assert written[0].exists() + assert "demo" in written[0].read_text() + + +def test_compile_dry_run_does_not_write(tmp_path: Path): + src = tmp_path / "skills" + src.mkdir() + (src / "demo.md").write_text(SAMPLE, encoding="utf-8") + out = tmp_path / "repo" + written = compile_skills("opencode", source_dir=src, out_root=out, dry_run=True) + assert written + assert not written[0].exists() + + +def test_load_skills_empty_dir(tmp_path: Path): + from sin_code_bundle.skills import load_skills + + result = load_skills(tmp_path / "no-such-dir") + assert result == [] + + +def test_missing_frontmatter_raises(tmp_path: Path): + p = tmp_path / "bad.md" + p.write_text("No frontmatter here.\n", encoding="utf-8") + with pytest.raises(ValueError, match="frontmatter"): + Skill.parse(p)