Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 271 additions & 1 deletion codec_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import secrets
import threading
import time
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional

Expand Down Expand Up @@ -818,3 +818,273 @@ def verify_audit_log(path: "Path | str | None" = None) -> dict:

summary["integrity_ok"] = summary["broken_lines"] == 0
return summary


# ── Query API for the /audit dashboard (codec_audit.html) ─────────────────────
#
# codec_audit.html and routes/audit.py (`read_events` / `get_stats`) predate
# the unified schema:1 envelope — they were written against an older,
# pre-979edab log format that had explicit `cat`/`lvl`/`sum` fields on every
# line. That format is gone; the unified envelope (§6 of AGENTS.md) has no
# `cat` field by design. Rather than resurrect the old write format (which
# would be an envelope-schema change — a §10 don't-touch-zone), these
# functions derive `cat`/`lvl`/`sum` at READ time from the fields the unified
# envelope already carries (`event`, `source`, `tool`, `transport`, `outcome`,
# `level`). The write path (audit()/log_event()) is untouched.
#
# The category mapping is a best-effort UI convenience, not a stable
# contract — safe to extend as new event/source names are added elsewhere.

_ROTATED_LOG_RE = re.compile(r"^audit\.log\.(\d{4}-\d{2}-\d{2})$")

# Exact `event` name → category. Checked before source/keyword matching so
# specific security/scheduling events aren't miscategorized by a keyword
# collision (e.g. "trigger_fired" contains no "scheduled"-ish substring).
_EVENT_CATEGORY_MAP = {
"hook_error": "error",
"tool_vetoed": "security",
"skill_load_blocked": "security",
"plugin_load_blocked": "security",
"plugin_hook_timeout": "security",
"permission_gate_blocked": "security",
"file_write_blocked": "security",
"python_exec_blocked": "security",
"internal_token_mismatch": "security",
"trigger_fired": "scheduled",
"trigger_evaluated": "scheduled",
"trigger_blocked": "scheduled",
"trigger_muted": "scheduled",
"shift_report_started": "scheduled",
"shift_report_completed": "scheduled",
"voice_mode_changed": "voice",
}

# Emitter `source` → category, for infra-level events that don't carry a
# `tool` name (heartbeat ticks, voice session lifecycle, auth, etc.).
_SOURCE_CATEGORY_MAP = {
"codec-voice": "voice",
"codec-auth": "auth",
"codec-oauth-provider": "auth",
"codec-keychain": "auth",
"codec-plugin-trust": "security",
"codec-scheduler": "scheduled",
"codec-autopilot": "scheduled",
"codec-hotkey": "hotkey",
"codec-dictate": "draft",
"codec-self-improve": "draft",
"codec-heartbeat": "system",
}

# Substring match over "<event> <tool>" (lowercased), checked in order.
_KEYWORD_CATEGORY_MAP = (
("tts", "tts"),
("stt", "stt"),
("whisper", "stt"),
("vision", "vision"),
("screenshot", "screenshot"),
("oauth", "auth"),
("auth", "auth"),
("keychain", "auth"),
("config", "config"),
)


def _categorize_event(record: dict) -> str:
"""Best-effort category bucket (one of codec_audit.html's 16 filter
pills) for a single audit record. See module note above — not a
stable contract, safe to extend."""
event = (record.get("event") or "").lower()
source = record.get("source") or ""
tool = (record.get("tool") or "").lower()
transport = record.get("transport") or ""

if event in _EVENT_CATEGORY_MAP:
return _EVENT_CATEGORY_MAP[event]
if source in _SOURCE_CATEGORY_MAP:
return _SOURCE_CATEGORY_MAP[source]
if transport == "voice":
return "voice"

haystack = f"{event} {tool}"
for keyword, cat in _KEYWORD_CATEGORY_MAP:
if keyword in haystack:
return cat

if tool and event in ("tool_call", "tool_result"):
return "skill"
if source in ("codec-dashboard", "codec-dispatch", "codec-session"):
return "command"
return "system"


def _level_for(record: dict) -> str:
"""Derive a display level. Prefers the explicit `level` field (set by
most Step 2+ events); falls back to `outcome` for base tool_call /
tool_result records that never set `level`."""
lvl = record.get("level")
if lvl:
return lvl
outcome = record.get("outcome")
if outcome in ("error", "timeout"):
return "error"
if outcome in ("denied", "validation"):
return "warning"
return "info"


def _summary_for(record: dict) -> str:
"""One-line summary for the timeline row. Prefers `message`; falls
back to a synthesized "<event>: <tool>" for records with no message."""
msg = record.get("message")
if msg:
return msg
tool = record.get("tool") or ""
event = record.get("event") or ""
if tool and event:
return f"{event}: {tool}"
return tool or event


def _norm_ts(ts: str | None) -> str:
"""Normalize a UTC ISO8601 timestamp's offset suffix to 'Z' so
lexicographic string comparison works regardless of whether it came
from Python's isoformat() ('+00:00') or JS's toISOString() ('Z')."""
if not ts:
return ""
return ts.replace("+00:00", "Z").replace("+0000", "Z")


def _log_files_desc() -> list[tuple[Optional[str], Path]]:
"""All audit log files, newest first: the live log (date=None), then
rotated audit.log.YYYY-MM-DD files in descending date order. Ignores
non-date suffixes (notably the audit.log.lock flock sidecar)."""
files: list[tuple[Optional[str], Path]] = []
if _AUDIT_LOG.exists():
files.append((None, _AUDIT_LOG))
rotated = []
for p in _AUDIT_DIR.glob("audit.log.*"):
m = _ROTATED_LOG_RE.match(p.name)
if m:
rotated.append((m.group(1), p))
rotated.sort(key=lambda t: t[0], reverse=True)
files.extend(rotated)
return files


def _iter_json_lines(path: Path, reverse: bool = False):
"""Yield parsed JSON records from `path`. Skips blank/malformed lines."""
try:
text = path.read_text(encoding="utf-8", errors="replace")
except OSError:
return
lines = text.splitlines()
if reverse:
lines = list(reversed(lines))
for raw in lines:
raw = raw.strip()
if not raw:
continue
try:
obj = json.loads(raw)
except json.JSONDecodeError:
continue
if isinstance(obj, dict):
yield obj


def read_events(
categories: list[str] | None = None,
level: str | None = None,
search: str | None = None,
since: str | None = None,
until: str | None = None,
limit: int = 500,
) -> list[dict]:
"""Return matching events from the audit log(s), newest first, shaped
for codec_audit.html (adds `cat`/`lvl`/`src`/`sum` on top of the raw
unified-envelope fields).

categories : filter to these derived categories (e.g. ["skill", "error"])
level : exact match against the derived level ("info"/"warning"/"error")
search : case-insensitive substring match against summary/event/tool/source
since, until : ISO-8601 timestamp bounds (inclusive), either offset style
limit : maximum events to return
"""
cats_set = set(categories) if categories else None
search_lower = search.lower() if search else None
since_n = _norm_ts(since) if since else None
until_n = _norm_ts(until) if until else None

results: list[dict] = []
for date_str, path in _log_files_desc():
# A rotated file covers exactly one UTC calendar day. If its last
# possible timestamp is still before `since`, this file and every
# older one (list is date-descending) can be skipped entirely.
if since_n and date_str is not None and f"{date_str}T23:59:59.999Z" < since_n:
break
for rec in _iter_json_lines(path, reverse=True):
ts_n = _norm_ts(rec.get("ts"))
if since_n and ts_n < since_n:
continue
if until_n and ts_n > until_n:
continue

cat = _categorize_event(rec)
if cats_set and cat not in cats_set:
continue
lvl = _level_for(rec)
if level and lvl != level:
continue
summary = _summary_for(rec)
if search_lower:
haystack = " ".join(str(v) for v in (
summary, rec.get("event", ""), rec.get("tool", ""), rec.get("source", "")
)).lower()
if search_lower not in haystack:
continue

ev = dict(rec)
ev["cat"] = cat
ev["lvl"] = lvl
ev["src"] = rec.get("source", "")
ev["sum"] = summary
results.append(ev)
if len(results) >= limit:
return results
return results


def get_stats(hours: int = 24) -> dict:
"""Aggregate event counts over the last `hours` hours, shaped for
codec_audit.html's stats panel: {total_24h, errors_24h, by_category,
by_level}. Field names stay '_24h' regardless of `hours` to match the
UI's fixed contract (routes/audit.py always calls this with hours=24)."""
cutoff_n = _norm_ts((datetime.now(timezone.utc) - timedelta(hours=hours))
.isoformat(timespec="milliseconds"))

total = 0
errors = 0
by_category: dict[str, int] = {}
by_level: dict[str, int] = {}

for date_str, path in _log_files_desc():
if date_str is not None and f"{date_str}T23:59:59.999Z" < cutoff_n:
break
for rec in _iter_json_lines(path):
ts_n = _norm_ts(rec.get("ts"))
if ts_n < cutoff_n:
continue
total += 1
cat = _categorize_event(rec)
lvl = _level_for(rec)
by_category[cat] = by_category.get(cat, 0) + 1
by_level[lvl] = by_level.get(lvl, 0) + 1
if lvl == "error":
errors += 1

return {
"total_24h": total,
"errors_24h": errors,
"by_category": by_category,
"by_level": by_level,
}
Loading