diff --git a/codec_audit.py b/codec_audit.py index 95625ef..e255c92 100644 --- a/codec_audit.py +++ b/codec_audit.py @@ -58,7 +58,7 @@ import secrets import threading import time -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Optional @@ -818,3 +818,273 @@ def verify_audit_log(path: "Path | str | None" = None) -> dict: summary["integrity_ok"] = summary["broken_lines"] == 0 return summary + + +# ── Query API for the /audit dashboard (codec_audit.html) ───────────────────── +# +# codec_audit.html and routes/audit.py (`read_events` / `get_stats`) predate +# the unified schema:1 envelope — they were written against an older, +# pre-979edab log format that had explicit `cat`/`lvl`/`sum` fields on every +# line. That format is gone; the unified envelope (§6 of AGENTS.md) has no +# `cat` field by design. Rather than resurrect the old write format (which +# would be an envelope-schema change — a §10 don't-touch-zone), these +# functions derive `cat`/`lvl`/`sum` at READ time from the fields the unified +# envelope already carries (`event`, `source`, `tool`, `transport`, `outcome`, +# `level`). The write path (audit()/log_event()) is untouched. +# +# The category mapping is a best-effort UI convenience, not a stable +# contract — safe to extend as new event/source names are added elsewhere. + +_ROTATED_LOG_RE = re.compile(r"^audit\.log\.(\d{4}-\d{2}-\d{2})$") + +# Exact `event` name → category. Checked before source/keyword matching so +# specific security/scheduling events aren't miscategorized by a keyword +# collision (e.g. "trigger_fired" contains no "scheduled"-ish substring). +_EVENT_CATEGORY_MAP = { + "hook_error": "error", + "tool_vetoed": "security", + "skill_load_blocked": "security", + "plugin_load_blocked": "security", + "plugin_hook_timeout": "security", + "permission_gate_blocked": "security", + "file_write_blocked": "security", + "python_exec_blocked": "security", + "internal_token_mismatch": "security", + "trigger_fired": "scheduled", + "trigger_evaluated": "scheduled", + "trigger_blocked": "scheduled", + "trigger_muted": "scheduled", + "shift_report_started": "scheduled", + "shift_report_completed": "scheduled", + "voice_mode_changed": "voice", +} + +# Emitter `source` → category, for infra-level events that don't carry a +# `tool` name (heartbeat ticks, voice session lifecycle, auth, etc.). +_SOURCE_CATEGORY_MAP = { + "codec-voice": "voice", + "codec-auth": "auth", + "codec-oauth-provider": "auth", + "codec-keychain": "auth", + "codec-plugin-trust": "security", + "codec-scheduler": "scheduled", + "codec-autopilot": "scheduled", + "codec-hotkey": "hotkey", + "codec-dictate": "draft", + "codec-self-improve": "draft", + "codec-heartbeat": "system", +} + +# Substring match over " " (lowercased), checked in order. +_KEYWORD_CATEGORY_MAP = ( + ("tts", "tts"), + ("stt", "stt"), + ("whisper", "stt"), + ("vision", "vision"), + ("screenshot", "screenshot"), + ("oauth", "auth"), + ("auth", "auth"), + ("keychain", "auth"), + ("config", "config"), +) + + +def _categorize_event(record: dict) -> str: + """Best-effort category bucket (one of codec_audit.html's 16 filter + pills) for a single audit record. See module note above — not a + stable contract, safe to extend.""" + event = (record.get("event") or "").lower() + source = record.get("source") or "" + tool = (record.get("tool") or "").lower() + transport = record.get("transport") or "" + + if event in _EVENT_CATEGORY_MAP: + return _EVENT_CATEGORY_MAP[event] + if source in _SOURCE_CATEGORY_MAP: + return _SOURCE_CATEGORY_MAP[source] + if transport == "voice": + return "voice" + + haystack = f"{event} {tool}" + for keyword, cat in _KEYWORD_CATEGORY_MAP: + if keyword in haystack: + return cat + + if tool and event in ("tool_call", "tool_result"): + return "skill" + if source in ("codec-dashboard", "codec-dispatch", "codec-session"): + return "command" + return "system" + + +def _level_for(record: dict) -> str: + """Derive a display level. Prefers the explicit `level` field (set by + most Step 2+ events); falls back to `outcome` for base tool_call / + tool_result records that never set `level`.""" + lvl = record.get("level") + if lvl: + return lvl + outcome = record.get("outcome") + if outcome in ("error", "timeout"): + return "error" + if outcome in ("denied", "validation"): + return "warning" + return "info" + + +def _summary_for(record: dict) -> str: + """One-line summary for the timeline row. Prefers `message`; falls + back to a synthesized ": " for records with no message.""" + msg = record.get("message") + if msg: + return msg + tool = record.get("tool") or "" + event = record.get("event") or "" + if tool and event: + return f"{event}: {tool}" + return tool or event + + +def _norm_ts(ts: str | None) -> str: + """Normalize a UTC ISO8601 timestamp's offset suffix to 'Z' so + lexicographic string comparison works regardless of whether it came + from Python's isoformat() ('+00:00') or JS's toISOString() ('Z').""" + if not ts: + return "" + return ts.replace("+00:00", "Z").replace("+0000", "Z") + + +def _log_files_desc() -> list[tuple[Optional[str], Path]]: + """All audit log files, newest first: the live log (date=None), then + rotated audit.log.YYYY-MM-DD files in descending date order. Ignores + non-date suffixes (notably the audit.log.lock flock sidecar).""" + files: list[tuple[Optional[str], Path]] = [] + if _AUDIT_LOG.exists(): + files.append((None, _AUDIT_LOG)) + rotated = [] + for p in _AUDIT_DIR.glob("audit.log.*"): + m = _ROTATED_LOG_RE.match(p.name) + if m: + rotated.append((m.group(1), p)) + rotated.sort(key=lambda t: t[0], reverse=True) + files.extend(rotated) + return files + + +def _iter_json_lines(path: Path, reverse: bool = False): + """Yield parsed JSON records from `path`. Skips blank/malformed lines.""" + try: + text = path.read_text(encoding="utf-8", errors="replace") + except OSError: + return + lines = text.splitlines() + if reverse: + lines = list(reversed(lines)) + for raw in lines: + raw = raw.strip() + if not raw: + continue + try: + obj = json.loads(raw) + except json.JSONDecodeError: + continue + if isinstance(obj, dict): + yield obj + + +def read_events( + categories: list[str] | None = None, + level: str | None = None, + search: str | None = None, + since: str | None = None, + until: str | None = None, + limit: int = 500, +) -> list[dict]: + """Return matching events from the audit log(s), newest first, shaped + for codec_audit.html (adds `cat`/`lvl`/`src`/`sum` on top of the raw + unified-envelope fields). + + categories : filter to these derived categories (e.g. ["skill", "error"]) + level : exact match against the derived level ("info"/"warning"/"error") + search : case-insensitive substring match against summary/event/tool/source + since, until : ISO-8601 timestamp bounds (inclusive), either offset style + limit : maximum events to return + """ + cats_set = set(categories) if categories else None + search_lower = search.lower() if search else None + since_n = _norm_ts(since) if since else None + until_n = _norm_ts(until) if until else None + + results: list[dict] = [] + for date_str, path in _log_files_desc(): + # A rotated file covers exactly one UTC calendar day. If its last + # possible timestamp is still before `since`, this file and every + # older one (list is date-descending) can be skipped entirely. + if since_n and date_str is not None and f"{date_str}T23:59:59.999Z" < since_n: + break + for rec in _iter_json_lines(path, reverse=True): + ts_n = _norm_ts(rec.get("ts")) + if since_n and ts_n < since_n: + continue + if until_n and ts_n > until_n: + continue + + cat = _categorize_event(rec) + if cats_set and cat not in cats_set: + continue + lvl = _level_for(rec) + if level and lvl != level: + continue + summary = _summary_for(rec) + if search_lower: + haystack = " ".join(str(v) for v in ( + summary, rec.get("event", ""), rec.get("tool", ""), rec.get("source", "") + )).lower() + if search_lower not in haystack: + continue + + ev = dict(rec) + ev["cat"] = cat + ev["lvl"] = lvl + ev["src"] = rec.get("source", "") + ev["sum"] = summary + results.append(ev) + if len(results) >= limit: + return results + return results + + +def get_stats(hours: int = 24) -> dict: + """Aggregate event counts over the last `hours` hours, shaped for + codec_audit.html's stats panel: {total_24h, errors_24h, by_category, + by_level}. Field names stay '_24h' regardless of `hours` to match the + UI's fixed contract (routes/audit.py always calls this with hours=24).""" + cutoff_n = _norm_ts((datetime.now(timezone.utc) - timedelta(hours=hours)) + .isoformat(timespec="milliseconds")) + + total = 0 + errors = 0 + by_category: dict[str, int] = {} + by_level: dict[str, int] = {} + + for date_str, path in _log_files_desc(): + if date_str is not None and f"{date_str}T23:59:59.999Z" < cutoff_n: + break + for rec in _iter_json_lines(path): + ts_n = _norm_ts(rec.get("ts")) + if ts_n < cutoff_n: + continue + total += 1 + cat = _categorize_event(rec) + lvl = _level_for(rec) + by_category[cat] = by_category.get(cat, 0) + 1 + by_level[lvl] = by_level.get(lvl, 0) + 1 + if lvl == "error": + errors += 1 + + return { + "total_24h": total, + "errors_24h": errors, + "by_category": by_category, + "by_level": by_level, + } diff --git a/tests/test_audit_query.py b/tests/test_audit_query.py new file mode 100644 index 0000000..dba7f87 --- /dev/null +++ b/tests/test_audit_query.py @@ -0,0 +1,223 @@ +"""Tests for the audit query API — read_events() + get_stats(). + +Both functions read the unified schema:1 envelope (§6) from ~/.codec/audit.log ++ rotated audit.log.YYYY-MM-DD files and shape it for the /audit dashboard +(codec_audit.html), which expects per-event `cat`/`lvl`/`sum`/`ts`/`src` and +stats `total_24h`/`errors_24h`/`by_category`/`by_level`. Neither function +writes anything — the write path (audit()/log_event()) is untouched. + +Tests redirect codec_audit._AUDIT_LOG and _AUDIT_DIR to a temp directory so +the real ~/.codec/audit.log is never touched. +""" +from __future__ import annotations + +import json +import sys +from pathlib import Path + +import pytest + +_REPO = Path(__file__).resolve().parents[1] +if str(_REPO) not in sys.path: + sys.path.insert(0, str(_REPO)) + +import codec_audit + + +@pytest.fixture +def isolated_audit(tmp_path, monkeypatch): + """Redirect both the live log and the rotation directory to tmp_path.""" + test_log = tmp_path / "audit.log" + monkeypatch.setattr(codec_audit, "_AUDIT_LOG", test_log) + monkeypatch.setattr(codec_audit, "_AUDIT_DIR", tmp_path) + return tmp_path + + +def _write_raw(path: Path, records: list[dict]) -> None: + """Write raw JSON-line records directly (bypasses audit() for + deterministic timestamps in tests).""" + with open(path, "a", encoding="utf-8") as f: + for r in records: + f.write(json.dumps(r) + "\n") + + +def _rec(**overrides) -> dict: + base = { + "ts": "2026-07-02T12:00:00.000+00:00", + "schema": 1, + "event": "tool_result", + "source": "codec-mcp-http", + "tool": "weather", + "outcome": "ok", + "transport": "http", + } + base.update(overrides) + return base + + +# ── read_events: empty / basic shape ──────────────────────────────────────── + +def test_read_events_empty_when_no_log(isolated_audit): + assert codec_audit.read_events() == [] + + +def test_read_events_returns_ui_facing_fields(isolated_audit): + codec_audit.audit("weather", event="tool_result", source="codec-mcp-http", + outcome="ok", message="fetched forecast") + events = codec_audit.read_events() + assert len(events) == 1 + ev = events[0] + for key in ("ts", "cat", "lvl", "src", "sum"): + assert key in ev, f"missing UI field: {key}" + assert ev["src"] == "codec-mcp-http" + assert ev["sum"] == "fetched forecast" + assert ev["lvl"] == "info" + + +def test_read_events_newest_first(isolated_audit): + codec_audit.audit("weather", event="tool_result", source="codec-mcp-http") + codec_audit.audit("translate", event="tool_result", source="codec-mcp-http") + events = codec_audit.read_events() + assert [e["tool"] for e in events] == ["translate", "weather"] + + +def test_read_events_skips_malformed_lines(isolated_audit, tmp_path): + codec_audit.audit("weather", event="tool_result", source="codec-mcp-http") + with open(codec_audit._AUDIT_LOG, "a", encoding="utf-8") as f: + f.write("not json\n") + f.write("\n") # blank line + events = codec_audit.read_events() + assert len(events) == 1 + + +def test_read_events_ignores_lock_sidecar_file(isolated_audit, tmp_path): + (tmp_path / "audit.log.lock").write_text("irrelevant flock sidecar") + codec_audit.audit("weather", event="tool_result", source="codec-mcp-http") + events = codec_audit.read_events() + assert len(events) == 1 + + +# ── read_events: filters ──────────────────────────────────────────────────── + +def test_read_events_filters_by_category(isolated_audit): + codec_audit.audit("weather", event="tool_result", source="codec-mcp-http") + codec_audit.log_event("voice_session_start", "codec-voice", "voice up") + events = codec_audit.read_events(categories=["voice"]) + assert len(events) == 1 + assert events[0]["cat"] == "voice" + + +def test_read_events_filters_by_level(isolated_audit): + codec_audit.log_event("service_down", "codec-heartbeat", "down", level="error") + codec_audit.log_event("heartbeat_tick", "codec-heartbeat", "tick", level="info") + events = codec_audit.read_events(level="error") + assert len(events) == 1 + assert events[0]["lvl"] == "error" + + +def test_read_events_filters_by_search_case_insensitive(isolated_audit): + codec_audit.audit("weather", event="tool_result", source="codec-mcp-http", message="Forecast OK") + codec_audit.audit("translate", event="tool_result", source="codec-mcp-http", message="Translated text") + events = codec_audit.read_events(search="forecast") + assert len(events) == 1 + assert events[0]["tool"] == "weather" + + +def test_read_events_respects_limit(isolated_audit): + for i in range(5): + codec_audit.audit(f"tool{i}", event="tool_result", source="codec-mcp-http") + events = codec_audit.read_events(limit=2) + assert len(events) == 2 + + +def test_read_events_filters_by_since_until(isolated_audit): + _write_raw(codec_audit._AUDIT_LOG, [ + _rec(ts="2026-07-01T00:00:00.000+00:00", tool="old"), + _rec(ts="2026-07-02T12:00:00.000+00:00", tool="mid"), + _rec(ts="2026-07-03T00:00:00.000+00:00", tool="new"), + ]) + events = codec_audit.read_events(since="2026-07-02T00:00:00.000Z", + until="2026-07-02T23:59:59.999Z") + assert [e["tool"] for e in events] == ["mid"] + + +# ── read_events: spans rotated files ──────────────────────────────────────── + +def test_read_events_spans_rotated_files(isolated_audit, tmp_path): + rotated = tmp_path / "audit.log.2026-07-01" + _write_raw(rotated, [_rec(ts="2026-07-01T10:00:00.000+00:00", tool="yesterday")]) + _write_raw(codec_audit._AUDIT_LOG, [_rec(ts="2026-07-02T10:00:00.000+00:00", tool="today")]) + events = codec_audit.read_events() + assert [e["tool"] for e in events] == ["today", "yesterday"] + + +# ── get_stats ──────────────────────────────────────────────────────────────── + +def test_get_stats_empty_when_no_log(isolated_audit): + stats = codec_audit.get_stats(hours=24) + assert stats == { + "total_24h": 0, + "errors_24h": 0, + "by_category": {}, + "by_level": {}, + } + + +def test_get_stats_counts_totals_and_errors(isolated_audit): + codec_audit.audit("weather", event="tool_result", source="codec-mcp-http", outcome="ok") + codec_audit.audit("weather", event="tool_result", source="codec-mcp-http", outcome="error", + error="boom", error_type="RuntimeError") + stats = codec_audit.get_stats(hours=24) + assert stats["total_24h"] == 2 + assert stats["errors_24h"] == 1 + assert stats["by_level"]["error"] == 1 + assert stats["by_level"]["info"] == 1 + + +def test_get_stats_by_category_has_expected_keys(isolated_audit): + codec_audit.audit("weather", event="tool_result", source="codec-mcp-http") + codec_audit.log_event("voice_session_start", "codec-voice", "voice up") + stats = codec_audit.get_stats(hours=24) + assert stats["by_category"]["skill"] == 1 + assert stats["by_category"]["voice"] == 1 + + +def test_get_stats_excludes_events_outside_window(isolated_audit): + _write_raw(codec_audit._AUDIT_LOG, [ + _rec(ts="2020-01-01T00:00:00.000+00:00", tool="ancient"), + ]) + codec_audit.audit("weather", event="tool_result", source="codec-mcp-http") + stats = codec_audit.get_stats(hours=24) + assert stats["total_24h"] == 1 + + +# ── category / level derivation ───────────────────────────────────────────── + +@pytest.mark.parametrize("record,expected_cat", [ + (_rec(event="tool_result", source="codec-mcp-http", tool="weather"), "skill"), + (_rec(event="voice_mode_changed", source="codec-voice", tool=""), "voice"), + (_rec(event="hook_error", source="codec-hooks", tool=""), "error"), + (_rec(event="tool_vetoed", source="codec-hooks", tool="terminal"), "security"), + (_rec(event="skill_load_blocked", source="codec-skill-registry", tool=""), "security"), + (_rec(event="trigger_fired", source="codec-triggers", tool=""), "scheduled"), + (_rec(event="tool_result", source="codec-mcp", tool="tts_say"), "tts"), + (_rec(event="ask_user_question_emit", source="codec-ask-user", tool="", agent="Writer"), "system"), +]) +def test_categorize_event(record, expected_cat): + assert codec_audit._categorize_event(record) == expected_cat + + +def test_level_for_prefers_explicit_level(): + assert codec_audit._level_for(_rec(level="warning", outcome="ok")) == "warning" + + +def test_level_for_derives_from_outcome_error(): + rec = _rec(outcome="error") + rec.pop("level", None) + assert codec_audit._level_for(rec) == "error" + + +def test_level_for_defaults_to_info(): + rec = _rec(outcome="ok") + rec.pop("level", None) + assert codec_audit._level_for(rec) == "info"