diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..561b9fb --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,56 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: '3.12' + - name: Install ruff + run: pip install ruff + - name: Ruff check + run: ruff check afterburn/ tests/ + - name: Ruff format check + run: ruff format --check afterburn/ tests/ + + test: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ['3.10', '3.11', '3.12'] + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install + run: | + pip install -e . + pip install pytest + - name: Test + run: pytest tests/ -v + + leak-guard: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Scan for accidental leaks + shell: bash + run: | + set -e + # Patterns: editable-install pseudo-pins, host paths, API keys, private keys. + # Excludes node_modules, .git, .venv, the workflows dir itself. + PATTERN='UNKNOWN @ file://|/home/[a-z][a-z0-9_-]+/(orchestrator|models)/|sk-ant-api03-[A-Za-z0-9_-]{20,}|sk-proj-[A-Za-z0-9_-]{20,}|-----BEGIN (RSA |EC |OPENSSH |DSA )?PRIVATE KEY-----' + if git ls-files | grep -vE '^\.github/workflows/' | xargs -r grep -EHn "$PATTERN" 2>/dev/null; then + echo "::error::Leak guard found suspicious patterns above" + exit 1 + fi + echo "Leak guard clean" diff --git a/afterburn/archive.py b/afterburn/archive.py index deaaeb5..ba3a880 100644 --- a/afterburn/archive.py +++ b/afterburn/archive.py @@ -100,7 +100,11 @@ def run_archive(args) -> None: """Archive old sessions and clean history.""" cwd = args.cwd if hasattr(args, "cwd") and args.cwd else os.getcwd() slug = _current_project_slug(cwd) - sessions_base = Path(args.sessions_dir) if hasattr(args, "sessions_dir") and args.sessions_dir else default_sessions_dir() + sessions_base = ( + Path(args.sessions_dir) + if hasattr(args, "sessions_dir") and args.sessions_dir + else default_sessions_dir() + ) project_dir = sessions_base / slug if not project_dir.exists(): @@ -126,20 +130,26 @@ def run_archive(args) -> None: sys.exit(0) total_size = sum(f.stat().st_size for f in stale) - print(f"Found {len(stale)} sessions older than {max_age} days ({total_size / 1024 / 1024:.1f}MB)") + print( + f"Found {len(stale)} sessions older than {max_age} days ({total_size / 1024 / 1024:.1f}MB)" + ) if args.dry_run: print("\n[dry-run] Would archive:") for f in stale: age_days = (time.time() - f.stat().st_mtime) / 86400 - print(f" {f.name} ({f.stat().st_size / 1024:.0f}KB, {age_days:.0f} days old)") + print( + f" {f.name} ({f.stat().st_size / 1024:.0f}KB, {age_days:.0f} days old)" + ) return # Archive archive_path = _archive_sessions(stale, project_dir) archive_size = archive_path.stat().st_size print(f"\nArchived to: {archive_path}") - print(f" {total_size / 1024 / 1024:.1f}MB → {archive_size / 1024 / 1024:.1f}MB ({archive_size / total_size * 100:.0f}%)") + print( + f" {total_size / 1024 / 1024:.1f}MB → {archive_size / 1024 / 1024:.1f}MB ({archive_size / total_size * 100:.0f}%)" + ) # Collect session IDs before deleting session_ids = {f.stem for f in stale} diff --git a/afterburn/cli.py b/afterburn/cli.py index 76d93f9..6d9e1aa 100644 --- a/afterburn/cli.py +++ b/afterburn/cli.py @@ -19,7 +19,9 @@ def main(): choices=["friction", "patterns", "gaps", "releases"], help="Run a single analysis pass (default: all three)", ) - discover_parser.add_argument("--since", help="Only analyze sessions after this date (YYYY-MM-DD)") + discover_parser.add_argument( + "--since", help="Only analyze sessions after this date (YYYY-MM-DD)" + ) discover_parser.add_argument("--project", help="Filter to a specific project slug") discover_parser.add_argument( "--sessions-dir", @@ -57,7 +59,9 @@ def main(): ) # evolve - evolve_parser = subparsers.add_parser("evolve", help="Evolve a skill via experiment loop") + evolve_parser = subparsers.add_parser( + "evolve", help="Evolve a skill via experiment loop" + ) evolve_parser.add_argument("--skill", required=True, help="Skill name to evolve") evolve_parser.add_argument( "--max-iterations", @@ -65,13 +69,17 @@ def main(): default=10, help="Maximum experiment iterations (default: 10)", ) - evolve_parser.add_argument("--dry-run", action="store_true", help="Show plan without executing") + evolve_parser.add_argument( + "--dry-run", action="store_true", help="Show plan without executing" + ) # status subparsers.add_parser("status", help="Show last run summary") # archive - archive_parser = subparsers.add_parser("archive", help="Archive old sessions and clean history") + archive_parser = subparsers.add_parser( + "archive", help="Archive old sessions and clean history" + ) archive_parser.add_argument( "--days", type=int, @@ -86,11 +94,17 @@ def main(): "--cwd", help="Working directory to derive project slug from (default: current directory)", ) - archive_parser.add_argument("--dry-run", action="store_true", help="Show what would be archived") + archive_parser.add_argument( + "--dry-run", action="store_true", help="Show what would be archived" + ) # narrative - narrative_parser = subparsers.add_parser("narrative", help="Generate a narrative session report") - narrative_parser.add_argument("--today", action="store_true", help="Today's sessions only") + narrative_parser = subparsers.add_parser( + "narrative", help="Generate a narrative session report" + ) + narrative_parser.add_argument( + "--today", action="store_true", help="Today's sessions only" + ) narrative_parser.add_argument("--week", action="store_true", help="Last 7 days") narrative_parser.add_argument("--month", action="store_true", help="Last 30 days") narrative_parser.add_argument("--since", help="Sessions since date (YYYY-MM-DD)") @@ -104,10 +118,14 @@ def main(): help="Directory path — include all projects under this path", ) narrative_parser.add_argument("--sessions-dir", help="Custom session directory") - narrative_parser.add_argument("--no-llm", action="store_true", help="Stats only, no LLM narrative generation") + narrative_parser.add_argument( + "--no-llm", action="store_true", help="Stats only, no LLM narrative generation" + ) # install - install_parser = subparsers.add_parser("install", help="Install Claude Code slash commands") + install_parser = subparsers.add_parser( + "install", help="Install Claude Code slash commands" + ) install_parser.add_argument( "--global", dest="global_install", diff --git a/afterburn/dead_releases.py b/afterburn/dead_releases.py index 4f5dccc..02e1914 100644 --- a/afterburn/dead_releases.py +++ b/afterburn/dead_releases.py @@ -3,8 +3,6 @@ import json import re import subprocess -import sys -from pathlib import Path from afterburn.findings import Finding from afterburn.scanner import SessionInfo @@ -111,11 +109,17 @@ def find_new_symbols_in_range( Returns a list of dicts with keys: name, file, kind (function/class). """ if previous_tag: - diff_output = _run_git(repo_path, "diff", f"{previous_tag}..{tag}", "--unified=0") + diff_output = _run_git( + repo_path, "diff", f"{previous_tag}..{tag}", "--unified=0" + ) else: # First tag — diff against empty tree diff_output = _run_git( - repo_path, "diff", "4b825dc642cb6eb9a060e54bf899d15f7b422b10", tag, "--unified=0" + repo_path, + "diff", + "4b825dc642cb6eb9a060e54bf899d15f7b422b10", + tag, + "--unified=0", ) if not diff_output: @@ -147,7 +151,12 @@ def find_new_symbols_in_range( # Skip private/dunder if name.startswith("_"): continue - kind = "class" if line.strip().startswith("+class") or line.strip().startswith("+ class") else "function" + kind = ( + "class" + if line.strip().startswith("+class") + or line.strip().startswith("+ class") + else "function" + ) symbols.append({"name": name, "file": current_file, "kind": kind}) # TypeScript/JavaScript patterns @@ -155,7 +164,17 @@ def find_new_symbols_in_range( m = TS_DEF_PATTERN.match(line) if m: name = m.group(1) - if name in ("if", "else", "for", "while", "return", "switch", "case", "try", "catch"): + if name in ( + "if", + "else", + "for", + "while", + "return", + "switch", + "case", + "try", + "catch", + ): continue kind = "class" if "class " in line else "function" symbols.append({"name": name, "file": current_file, "kind": kind}) @@ -188,9 +207,7 @@ def detect_dead_releases(repo_path: str) -> list[Finding]: return Findings for any dead code at release time. """ # Get all version tags sorted by creation date - tag_output = _run_git( - repo_path, "tag", "--list", "--sort=-version:refname" - ) + tag_output = _run_git(repo_path, "tag", "--list", "--sort=-version:refname") if not tag_output: return [] @@ -220,15 +237,17 @@ def detect_dead_releases(repo_path: str) -> list[Finding]: if dead_symbols: names = [f"{s['name']} ({s['kind']} in {s['file']})" for s in dead_symbols] - findings.append(Finding( - type="friction", - description=f"Dead code shipped in {tag}: {len(dead_symbols)} unwired symbol(s)", - confidence=min(1.0, len(dead_symbols) / max(len(symbols), 1)), - frequency=len(dead_symbols), - sessions=[], - evidence=f"Symbols with zero callers at release: {', '.join(names[:10])}", - verification=f"git diff {previous_tag + '..' + tag if previous_tag else tag} --stat", - theme="dead_release", - )) + findings.append( + Finding( + type="friction", + description=f"Dead code shipped in {tag}: {len(dead_symbols)} unwired symbol(s)", + confidence=min(1.0, len(dead_symbols) / max(len(symbols), 1)), + frequency=len(dead_symbols), + sessions=[], + evidence=f"Symbols with zero callers at release: {', '.join(names[:10])}", + verification=f"git diff {previous_tag + '..' + tag if previous_tag else tag} --stat", + theme="dead_release", + ) + ) return findings diff --git a/afterburn/discover.py b/afterburn/discover.py index 54e2591..148f3a2 100644 --- a/afterburn/discover.py +++ b/afterburn/discover.py @@ -38,7 +38,11 @@ def run_discover(args) -> None: print(f"Found {len(sessions)} sessions ({total_size / 1024 / 1024:.1f}MB)") output_dir = Path(".afterburn") - passes = [args.analysis_pass] if args.analysis_pass else ["friction", "patterns", "gaps", "releases"] + passes = ( + [args.analysis_pass] + if args.analysis_pass + else ["friction", "patterns", "gaps", "releases"] + ) all_findings: list[Finding] = [] for pass_name in passes: @@ -50,20 +54,27 @@ def run_discover(args) -> None: write_findings(all_findings, output_dir, fmt=args.format) # Generate skill candidates from correction taxonomy - from afterburn.passes import extract_taxonomy_from_findings, generate_skill_candidates + from afterburn.passes import ( + extract_taxonomy_from_findings, + generate_skill_candidates, + ) taxonomy_counts = extract_taxonomy_from_findings(all_findings) skill_candidates = generate_skill_candidates(taxonomy_counts) if skill_candidates: write_skill_candidates(skill_candidates, output_dir) - print(f" Generated {len(skill_candidates)} skill candidates from correction taxonomy") + print( + f" Generated {len(skill_candidates)} skill candidates from correction taxonomy" + ) _write_provenance(output_dir, sessions, passes) print(f"\nResults written to {output_dir}/") -def _run_pass(pass_name: str, sessions: list[SessionInfo], max_calls: int = 1000) -> list[Finding]: +def _run_pass( + pass_name: str, sessions: list[SessionInfo], max_calls: int = 1000 +) -> list[Finding]: """Run a single analysis pass.""" from afterburn.passes import run_friction_pass, run_gaps_pass, run_patterns_pass @@ -86,7 +97,9 @@ def _run_pass(pass_name: str, sessions: list[SessionInfo], max_calls: int = 1000 return [] -def _write_provenance(output_dir: Path, sessions: list[SessionInfo], passes: list[str]) -> None: +def _write_provenance( + output_dir: Path, sessions: list[SessionInfo], passes: list[str] +) -> None: """Write provenance metadata.""" from datetime import datetime, timezone @@ -113,7 +126,9 @@ def show_status() -> None: prov = json.load(f) print(f"Last run: {prov['analyzed_at']}") - print(f"Sessions: {prov['sessions_count']} ({prov['total_bytes'] / 1024 / 1024:.1f}MB)") + print( + f"Sessions: {prov['sessions_count']} ({prov['total_bytes'] / 1024 / 1024:.1f}MB)" + ) print(f"Passes: {', '.join(prov['passes'])}") for name in ["fix-list.md", "pattern-catalog.md", "skill-gaps.md"]: diff --git a/afterburn/findings.py b/afterburn/findings.py index f32932f..e91dc54 100644 --- a/afterburn/findings.py +++ b/afterburn/findings.py @@ -42,7 +42,9 @@ def to_json(self) -> str: def to_markdown(self) -> str: lines = [f"### {self.description}"] lines.append("") - lines.append(f"**Type**: {self.type} | **Confidence**: {self.confidence:.2f} | **Frequency**: {self.frequency}") + lines.append( + f"**Type**: {self.type} | **Confidence**: {self.confidence:.2f} | **Frequency**: {self.frequency}" + ) lines.append(f"**Theme**: {self.theme}") lines.append(f"**Sessions**: {len(self.sessions)} sessions") if self.evidence: @@ -69,7 +71,9 @@ def to_dict(self) -> dict: return asdict(self) -def write_skill_candidates(candidates: list["SkillCandidate"], output_dir: Path) -> None: +def write_skill_candidates( + candidates: list["SkillCandidate"], output_dir: Path +) -> None: """Write skill candidates to output directory. Generates a markdown summary and individual skill drafts. @@ -108,7 +112,9 @@ def write_skill_candidates(candidates: list["SkillCandidate"], output_dir: Path) skill_path.write_text(c.draft_skill_md) -def write_findings(findings: list[Finding], output_dir: Path, fmt: str = "markdown") -> None: +def write_findings( + findings: list[Finding], output_dir: Path, fmt: str = "markdown" +) -> None: """Write findings to output files.""" output_dir.mkdir(parents=True, exist_ok=True) @@ -133,7 +139,9 @@ def write_findings(findings: list[Finding], output_dir: Path, fmt: str = "markdo path = output_dir / f"{basename}.md" with open(path, "w") as fh: fh.write(f"# {finding_type.title()} Findings\n\n") - fh.write(f"*{len(items)} findings across {len(set(s for i in items for s in i.sessions))} sessions*\n\n") + fh.write( + f"*{len(items)} findings across {len(set(s for i in items for s in i.sessions))} sessions*\n\n" + ) for item in sorted(items, key=lambda x: x.frequency, reverse=True): fh.write(item.to_markdown()) fh.write("---\n\n") diff --git a/afterburn/narrative.py b/afterburn/narrative.py index ae53acd..0d06d60 100644 --- a/afterburn/narrative.py +++ b/afterburn/narrative.py @@ -1,20 +1,16 @@ """Narrative session report — enhanced insights with timeframe and project filtering.""" -import json -import os import sys -from collections import Counter, defaultdict +from collections import Counter from datetime import datetime, timezone from pathlib import Path from afterburn.passes import ( - CORRECTION_PATTERNS, _extract_messages, - _is_false_positive, classify_correction, suggest_remediation, ) -from afterburn.scanner import SessionInfo, discover_sessions, group_sessions_by_parent +from afterburn.scanner import SessionInfo, discover_sessions def _extract_facets(session: SessionInfo) -> dict: @@ -49,12 +45,14 @@ def _extract_facets(session: SessionInfo) -> dict: text = msg.get("content", "") if "" in text: import re + match = re.search(r"(\w+)", text) if match: skills_used.append(match.group(1)) # Corrections, confirmations, and correction taxonomy import re + corrections = 0 confirmations = 0 correction_taxonomy = Counter() @@ -63,14 +61,22 @@ def _extract_facets(session: SessionInfo) -> dict: if len(text) > 1000 or not text: continue # Skip system injections - if "" in text or "base directory for this skill" in text.lower(): + if ( + "" in text + or "base directory for this skill" in text.lower() + ): continue if any(w in text for w in ["no ", "stop", "wrong", "don't", "undo", "revert"]): - if not any(w in text for w in ["no problem", "no worries", "don't worry", "no need"]): + if not any( + w in text + for w in ["no problem", "no worries", "don't worry", "no need"] + ): corrections += 1 taxonomy = classify_correction(text) correction_taxonomy[taxonomy] += 1 - if any(w in text for w in ["yes", "perfect", "great", "exactly", "awesome", "nice"]): + if any( + w in text for w in ["yes", "perfect", "great", "exactly", "awesome", "nice"] + ): if len(text) < 200: confirmations += 1 @@ -93,10 +99,21 @@ def _extract_facets(session: SessionInfo) -> dict: for msg in messages: text = msg.get("content", "") # Detect dispatch/swarm patterns in orchestrator sessions - if any(kw in text.lower() for kw in ["dispatching agent", "agent completed", "/dispatch", "/swarm"]): + if any( + kw in text.lower() + for kw in [ + "dispatching agent", + "agent completed", + "/dispatch", + "/swarm", + ] + ): if "dispatch" in text.lower(): agents_dispatched += 1 - if any(kw in text.lower() for kw in ["agent completed", "succeeded", "completed successfully"]): + if any( + kw in text.lower() + for kw in ["agent completed", "succeeded", "completed successfully"] + ): agents_succeeded += 1 return { @@ -121,9 +138,15 @@ def _extract_facets(session: SessionInfo) -> dict: } -def _generate_narrative_llm(facets: list[dict], timeframe: str, project: str | None) -> str: +def _generate_narrative_llm( + facets: list[dict], timeframe: str, project: str | None +) -> str: """Use LLM to generate the narrative report from facets.""" - from afterburn.vendor.rlm_repl.llm_client import ClaudeCLIClient, LLMClient, _detect_backend + from afterburn.vendor.rlm_repl.llm_client import ( + ClaudeCLIClient, + LLMClient, + _detect_backend, + ) backend = _detect_backend() if backend == "claude": @@ -161,7 +184,8 @@ def _generate_narrative_llm(facets: list[dict], timeframe: str, project: str | N classified_taxonomy = {k: v for k, v in all_taxonomy.items() if k != "unclassified"} if classified_taxonomy: taxonomy_lines = "\n".join( - f"- {t}: {c}" for t, c in sorted(classified_taxonomy.items(), key=lambda x: -x[1]) + f"- {t}: {c}" + for t, c in sorted(classified_taxonomy.items(), key=lambda x: -x[1]) ) unclassified_count = all_taxonomy.get("unclassified", 0) if unclassified_count: @@ -171,7 +195,9 @@ def _generate_narrative_llm(facets: list[dict], timeframe: str, project: str | N # Generate remediation suggestions remediations = suggest_remediation(all_taxonomy) - remediation_lines = "\n".join(f"- {s}" for s in remediations) if remediations else "- (none)" + remediation_lines = ( + "\n".join(f"- {s}" for s in remediations) if remediations else "- (none)" + ) stats_block = f"""## Session Statistics - Sessions: {total_sessions} @@ -186,10 +212,10 @@ def _generate_narrative_llm(facets: list[dict], timeframe: str, project: str | N {taxonomy_lines} ## Top Tools -{chr(10).join(f'- {name}: {count}x' for name, count in all_tools.most_common(10))} +{chr(10).join(f"- {name}: {count}x" for name, count in all_tools.most_common(10))} ## Skills Used -{chr(10).join(f'- /{name}: {count}x' for name, count in all_skills.most_common(10)) if all_skills else '- (none detected)'} +{chr(10).join(f"- /{name}: {count}x" for name, count in all_skills.most_common(10)) if all_skills else "- (none detected)"} ## Remediation Suggestions {remediation_lines} @@ -197,7 +223,7 @@ def _generate_narrative_llm(facets: list[dict], timeframe: str, project: str | N prompt = f"""You are writing a development activity narrative report. Write in second person ("you"). The timeframe is: {timeframe} -{f'Project: {project}' if project else 'All projects'} +{f"Project: {project}" if project else "All projects"} Here are the aggregated statistics: @@ -216,7 +242,10 @@ def _generate_narrative_llm(facets: list[dict], timeframe: str, project: str | N Keep it under 500 words. Be specific, reference actual numbers. No fluff.""" messages = [ - {"role": "system", "content": "You write concise, data-driven development reports. Second person. No emojis."}, + { + "role": "system", + "content": "You write concise, data-driven development reports. Second person. No emojis.", + }, {"role": "user", "content": prompt}, ] @@ -226,7 +255,11 @@ def _generate_narrative_llm(facets: list[dict], timeframe: str, project: str | N def run_narrative(args) -> None: """Generate a narrative session report.""" - sessions_dir = Path(args.sessions_dir) if hasattr(args, "sessions_dir") and args.sessions_dir else None + sessions_dir = ( + Path(args.sessions_dir) + if hasattr(args, "sessions_dir") and args.sessions_dir + else None + ) project = args.project if hasattr(args, "project") else None # Multi-project support @@ -243,10 +276,12 @@ def run_narrative(args) -> None: timeframe_label = f"today ({since})" elif hasattr(args, "week") and args.week: from datetime import timedelta + since = (datetime.now(timezone.utc) - timedelta(days=7)).strftime("%Y-%m-%d") timeframe_label = f"last 7 days (since {since})" elif hasattr(args, "month") and args.month: from datetime import timedelta + since = (datetime.now(timezone.utc) - timedelta(days=30)).strftime("%Y-%m-%d") timeframe_label = f"last 30 days (since {since})" elif hasattr(args, "since") and args.since: @@ -270,7 +305,9 @@ def run_narrative(args) -> None: sys.exit(0) total_size = sum(s.size_bytes for s in sessions) - print(f"Analyzing {len(sessions)} sessions ({total_size / 1024 / 1024:.1f}MB) — {timeframe_label}") + print( + f"Analyzing {len(sessions)} sessions ({total_size / 1024 / 1024:.1f}MB) — {timeframe_label}" + ) # Extract facets from all sessions (no LLM needed for this phase) facets = [] @@ -317,7 +354,7 @@ def _stats_only_report(facets: list[dict], timeframe: str, project: str | None) total_messages = sum(f.get("message_count", 0) for f in facets) total_tools = sum(f.get("total_tool_calls", 0) for f in facets) total_errors = sum(f.get("tool_errors", 0) for f in facets) - total_corrections = sum(f.get("corrections", 0) for f in facets) + sum(f.get("corrections", 0) for f in facets) total_confirmations = sum(f.get("confirmations", 0) for f in facets) # Orchestrator / agent breakdown diff --git a/afterburn/passes.py b/afterburn/passes.py index 1c12f76..635e6a6 100644 --- a/afterburn/passes.py +++ b/afterburn/passes.py @@ -4,7 +4,6 @@ import re import sys from collections import Counter, defaultdict -from pathlib import Path from afterburn.findings import Finding, SkillCandidate from afterburn.scanner import SessionInfo @@ -110,6 +109,7 @@ def suggest_remediation(taxonomy_counts: Counter) -> list[str]: return suggestions + def generate_skill_candidates(taxonomy_counts: Counter) -> list[SkillCandidate]: """Generate targeted skill candidates from correction taxonomy distribution. @@ -236,13 +236,15 @@ def generate_skill_candidates(taxonomy_counts: Counter) -> list[SkillCandidate]: if taxonomy_type not in templates: continue tmpl = templates[taxonomy_type] - candidates.append(SkillCandidate( - name=tmpl["name"], - description=tmpl["description"], - steps=tmpl["steps"], - evidence_sessions=[], # Populated by caller if needed - draft_skill_md=tmpl["draft_skill_md"], - )) + candidates.append( + SkillCandidate( + name=tmpl["name"], + description=tmpl["description"], + steps=tmpl["steps"], + evidence_sessions=[], # Populated by caller if needed + draft_skill_md=tmpl["draft_skill_md"], + ) + ) return candidates @@ -260,7 +262,10 @@ def extract_taxonomy_from_findings(findings: list[Finding]) -> Counter: if f.theme == "correction:summary" and f.evidence: # Parse "Breakdown: process: 5, accuracy: 3. Remediations: ..." import re - breakdown_match = re.search(r"Breakdown:\s*(.+?)\.(?:\s*Remediations:|$)", f.evidence) + + breakdown_match = re.search( + r"Breakdown:\s*(.+?)\.(?:\s*Remediations:|$)", f.evidence + ) if breakdown_match: pairs = breakdown_match.group(1).split(",") for pair in pairs: @@ -296,7 +301,10 @@ def _extract_messages(session: SessionInfo) -> list[dict]: continue try: record = json.loads(line) - if record.get("type") in ("user", "assistant") and "message" in record: + if ( + record.get("type") in ("user", "assistant") + and "message" in record + ): msg = record["message"] content = msg.get("content", "") # Flatten content blocks to text @@ -309,16 +317,20 @@ def _extract_messages(session: SessionInfo) -> list[dict]: elif block.get("type") == "tool_result": # Check for errors in tool results if block.get("is_error"): - text_parts.append(f"[TOOL_ERROR: {block.get('content', '')[:200]}]") + text_parts.append( + f"[TOOL_ERROR: {block.get('content', '')[:200]}]" + ) elif isinstance(block, str): text_parts.append(block) content = "\n".join(text_parts) - messages.append({ - "role": msg.get("role", record["type"]), - "content": content[:5000], # Cap per-message size - "type": record["type"], - "raw_content": msg.get("content"), - }) + messages.append( + { + "role": msg.get("role", record["type"]), + "content": content[:5000], # Cap per-message size + "type": record["type"], + "raw_content": msg.get("content"), + } + ) except json.JSONDecodeError: continue except (OSError, PermissionError): @@ -343,12 +355,17 @@ def _extract_tool_denials(messages: list[dict]) -> list[dict]: continue for block in raw: if isinstance(block, dict) and block.get("type") == "tool_result": - if block.get("is_error") and "denied" in str(block.get("content", "")).lower(): - denials.append({ - "index": i, - "tool": block.get("tool_use_id", "unknown"), - "reason": str(block.get("content", ""))[:200], - }) + if ( + block.get("is_error") + and "denied" in str(block.get("content", "")).lower() + ): + denials.append( + { + "index": i, + "tool": block.get("tool_use_id", "unknown"), + "reason": str(block.get("content", ""))[:200], + } + ) return denials @@ -358,14 +375,18 @@ def _extract_tool_errors(messages: list[dict]) -> list[dict]: for i, msg in enumerate(messages): content = msg.get("content", "") if "[TOOL_ERROR:" in content: - errors.append({ - "index": i, - "error": content[:300], - }) + errors.append( + { + "index": i, + "error": content[:300], + } + ) return errors -def run_friction_pass(sessions: list[SessionInfo], max_sessions: int = 200) -> list[Finding]: +def run_friction_pass( + sessions: list[SessionInfo], max_sessions: int = 200 +) -> list[Finding]: """Extract friction signals from sessions.""" correction_themes: dict[str, list] = defaultdict(list) correction_taxonomy_counts: Counter = Counter() @@ -377,7 +398,9 @@ def run_friction_pass(sessions: list[SessionInfo], max_sessions: int = 200) -> l sorted_sessions = sorted(sessions, key=lambda s: s.size_bytes, reverse=True) # Split into direct-parse (<10MB) and RLM-required (>=10MB) - direct = [s for s in sorted_sessions if s.size_bytes < 10 * 1024 * 1024][:max_sessions] + direct = [s for s in sorted_sessions if s.size_bytes < 10 * 1024 * 1024][ + :max_sessions + ] large = [s for s in sorted_sessions if s.size_bytes >= 10 * 1024 * 1024] # Analyze large sessions via RLM REPL @@ -385,7 +408,10 @@ def run_friction_pass(sessions: list[SessionInfo], max_sessions: int = 200) -> l if large: rlm_findings = _rlm_friction_analysis(large) if rlm_findings: - print(f" RLM analyzed {len(large)} large sessions → {len(rlm_findings)} findings", file=sys.stderr) + print( + f" RLM analyzed {len(large)} large sessions → {len(rlm_findings)} findings", + file=sys.stderr, + ) analyzable = direct @@ -411,11 +437,21 @@ def run_friction_pass(sessions: list[SessionInfo], max_sessions: int = 200) -> l continue # Skip system injections, skill content, and command messages - if any(marker in text for marker in [ - "", "", "Base directory for this skill:", - "# /", "## Steps", "## Usage", "---\nname:", "SKILL.md", - "", "system-reminder", - ]): + if any( + marker in text + for marker in [ + "", + "", + "Base directory for this skill:", + "# /", + "## Steps", + "## Usage", + "---\nname:", + "SKILL.md", + "", + "system-reminder", + ] + ): continue # Skip very long messages (>1000 chars) — likely pasted content, not corrections @@ -434,13 +470,15 @@ def run_friction_pass(sessions: list[SessionInfo], max_sessions: int = 200) -> l taxonomy = classify_correction(text) correction_taxonomy_counts[taxonomy] += 1 - correction_themes[theme].append({ - "session_id": session.session_id, - "text": text[:500], - "context": context, - "project": session.project_slug, - "taxonomy": taxonomy, - }) + correction_themes[theme].append( + { + "session_id": session.session_id, + "text": text[:500], + "context": context, + "project": session.project_slug, + "taxonomy": taxonomy, + } + ) break # One match per message # Find tool denials @@ -477,62 +515,84 @@ def run_friction_pass(sessions: list[SessionInfo], max_sessions: int = 200) -> l # Determine dominant taxonomy for this theme theme_taxonomies = Counter(e.get("taxonomy", "unclassified") for e in events) dominant_taxonomy = theme_taxonomies.most_common(1)[0][0] - taxonomy_theme = f"correction:{dominant_taxonomy}" if dominant_taxonomy != "unclassified" else theme - findings.append(Finding( - type="friction", - description=f"User correction: {theme.replace('_', ' ')}", - confidence=min(1.0, len(events) / total_analyzed) if total_analyzed > 0 else 0, - frequency=len(events), - sessions=unique_sessions[:20], - evidence=f"Example: \"{example['text'][:300]}\"", - verification=None, - theme=taxonomy_theme, - )) + taxonomy_theme = ( + f"correction:{dominant_taxonomy}" + if dominant_taxonomy != "unclassified" + else theme + ) + findings.append( + Finding( + type="friction", + description=f"User correction: {theme.replace('_', ' ')}", + confidence=min(1.0, len(events) / total_analyzed) + if total_analyzed > 0 + else 0, + frequency=len(events), + sessions=unique_sessions[:20], + evidence=f'Example: "{example["text"][:300]}"', + verification=None, + theme=taxonomy_theme, + ) + ) # Store taxonomy counts and remediation on the findings list as metadata # by adding a summary finding when there are classified corrections - classified = {k: v for k, v in correction_taxonomy_counts.items() if k != "unclassified"} + classified = { + k: v for k, v in correction_taxonomy_counts.items() if k != "unclassified" + } if classified: - taxonomy_summary = ", ".join(f"{t}: {c}" for t, c in sorted(classified.items(), key=lambda x: -x[1])) + taxonomy_summary = ", ".join( + f"{t}: {c}" for t, c in sorted(classified.items(), key=lambda x: -x[1]) + ) remediations = suggest_remediation(correction_taxonomy_counts) remediation_text = "; ".join(remediations) if remediations else "None" - findings.append(Finding( - type="friction", - description="Correction taxonomy breakdown", - confidence=1.0, - frequency=sum(classified.values()), - sessions=[], - evidence=f"Breakdown: {taxonomy_summary}. Remediations: {remediation_text}", - theme="correction:summary", - )) + findings.append( + Finding( + type="friction", + description="Correction taxonomy breakdown", + confidence=1.0, + frequency=sum(classified.values()), + sessions=[], + evidence=f"Breakdown: {taxonomy_summary}. Remediations: {remediation_text}", + theme="correction:summary", + ) + ) # Convert tool denials to findings for reason, count in tool_denial_counts.most_common(10): if count < 2: continue - findings.append(Finding( - type="friction", - description=f"Tool denial: {reason[:100]}", - confidence=min(1.0, count / total_analyzed) if total_analyzed > 0 else 0, - frequency=count, - sessions=[], - evidence=f"Denied {count} times across sessions", - theme="tool_denial", - )) + findings.append( + Finding( + type="friction", + description=f"Tool denial: {reason[:100]}", + confidence=min(1.0, count / total_analyzed) + if total_analyzed > 0 + else 0, + frequency=count, + sessions=[], + evidence=f"Denied {count} times across sessions", + theme="tool_denial", + ) + ) # Convert error patterns to findings for error_type, count in error_patterns.most_common(10): if count < 3: continue - findings.append(Finding( - type="friction", - description=f"Recurring error: {error_type}", - confidence=min(1.0, count / total_analyzed) if total_analyzed > 0 else 0, - frequency=count, - sessions=[], - evidence=f"Occurred {count} times", - theme="tool_error", - )) + findings.append( + Finding( + type="friction", + description=f"Recurring error: {error_type}", + confidence=min(1.0, count / total_analyzed) + if total_analyzed > 0 + else 0, + frequency=count, + sessions=[], + evidence=f"Occurred {count} times", + theme="tool_error", + ) + ) # Merge RLM findings for large sessions if rlm_findings: @@ -553,7 +613,10 @@ def _rlm_friction_analysis(large_sessions: list[SessionInfo]) -> list[Finding]: rlm = RLM_REPL(verbose=True) for session in large_sessions: - print(f" [RLM] Analyzing large session {session.session_id[:12]}... ({session.size_bytes / 1024 / 1024:.1f}MB)", file=sys.stderr) + print( + f" [RLM] Analyzing large session {session.session_id[:12]}... ({session.size_bytes / 1024 / 1024:.1f}MB)", + file=sys.stderr, + ) # Load session as list of message dicts messages = [] @@ -565,22 +628,30 @@ def _rlm_friction_analysis(large_sessions: list[SessionInfo]) -> list[Finding]: continue try: record = json.loads(line) - if record.get("type") in ("user", "assistant") and "message" in record: + if ( + record.get("type") in ("user", "assistant") + and "message" in record + ): msg = record["message"] content = msg.get("content", "") if isinstance(content, list): text_parts = [] for block in content: - if isinstance(block, dict) and block.get("type") == "text": + if ( + isinstance(block, dict) + and block.get("type") == "text" + ): text_parts.append(block.get("text", "")[:2000]) content = "\n".join(text_parts) elif isinstance(content, str): content = content[:2000] - messages.append({ - "role": msg.get("role", "unknown"), - "content": content, - "index": len(messages), - }) + messages.append( + { + "role": msg.get("role", "unknown"), + "content": content, + "index": len(messages), + } + ) except json.JSONDecodeError: continue except (OSError, PermissionError): @@ -609,44 +680,56 @@ def _rlm_friction_analysis(large_sessions: list[SessionInfo]) -> list[Finding]: try: parsed = json.loads(result) for item in parsed: - findings.append(Finding( - type="friction", - description=item.get("correction", "")[:200], - confidence=0.8, - frequency=1, - sessions=[session.session_id], - evidence=item.get("what_went_wrong", "")[:300], - theme=item.get("theme", "rlm_detected"), - )) + findings.append( + Finding( + type="friction", + description=item.get("correction", "")[:200], + confidence=0.8, + frequency=1, + sessions=[session.session_id], + evidence=item.get("what_went_wrong", "")[:300], + theme=item.get("theme", "rlm_detected"), + ) + ) except json.JSONDecodeError: pass elif isinstance(result, list): for item in result: if isinstance(item, dict): - findings.append(Finding( - type="friction", - description=str(item.get("correction", ""))[:200], - confidence=0.8, - frequency=1, - sessions=[session.session_id], - evidence=str(item.get("what_went_wrong", ""))[:300], - theme=str(item.get("theme", "rlm_detected")), - )) + findings.append( + Finding( + type="friction", + description=str(item.get("correction", ""))[:200], + confidence=0.8, + frequency=1, + sessions=[session.session_id], + evidence=str(item.get("what_went_wrong", ""))[:300], + theme=str(item.get("theme", "rlm_detected")), + ) + ) except Exception as e: - print(f" [RLM] Error on session {session.session_id[:12]}: {e}", file=sys.stderr) + print( + f" [RLM] Error on session {session.session_id[:12]}: {e}", + file=sys.stderr, + ) continue return findings -def run_patterns_pass(sessions: list[SessionInfo], max_sessions: int = 200) -> list[Finding]: +def run_patterns_pass( + sessions: list[SessionInfo], max_sessions: int = 200 +) -> list[Finding]: """Extract successful patterns from sessions.""" confirmations: dict[str, list] = defaultdict(list) total_analyzed = 0 CONFIRM_PATTERNS = [ - (r"\b(yes|yeah|yep|exactly|perfect|great|awesome|nice)\b[.!]*$", "explicit_confirmation"), + ( + r"\b(yes|yeah|yep|exactly|perfect|great|awesome|nice)\b[.!]*$", + "explicit_confirmation", + ), (r"\bthat'?s (right|correct|it|what i wanted)\b", "explicit_confirmation"), (r"\bgood (job|work|call|approach)\b", "praise"), (r"\bkeep (doing|going|that)\b", "continuation"), @@ -683,11 +766,13 @@ def run_patterns_pass(sessions: list[SessionInfo], max_sessions: int = 200) -> l if j > 0 and messages[j - 1]["role"] == "assistant": context = messages[j - 1]["content"][:500] - confirmations[theme].append({ - "session_id": session.session_id, - "text": text[:200], - "context": context, - }) + confirmations[theme].append( + { + "session_id": session.session_id, + "text": text[:200], + "context": context, + } + ) break findings: list[Finding] = [] @@ -695,25 +780,31 @@ def run_patterns_pass(sessions: list[SessionInfo], max_sessions: int = 200) -> l if len(events) < 3: continue unique_sessions = list(set(e["session_id"] for e in events)) - findings.append(Finding( - type="pattern", - description=f"Confirmed approach: {theme.replace('_', ' ')}", - confidence=len(unique_sessions) / total_analyzed if total_analyzed > 0 else 0, - frequency=len(events), - sessions=unique_sessions[:20], - evidence=f"Example confirmation: \"{events[0]['text'][:200]}\" after: \"{events[0]['context'][:200]}\"", - theme=theme, - )) + findings.append( + Finding( + type="pattern", + description=f"Confirmed approach: {theme.replace('_', ' ')}", + confidence=len(unique_sessions) / total_analyzed + if total_analyzed > 0 + else 0, + frequency=len(events), + sessions=unique_sessions[:20], + evidence=f'Example confirmation: "{events[0]["text"][:200]}" after: "{events[0]["context"][:200]}"', + theme=theme, + ) + ) return findings -def run_gaps_pass(sessions: list[SessionInfo], max_sessions: int = 100) -> list[Finding]: +def run_gaps_pass( + sessions: list[SessionInfo], max_sessions: int = 100 +) -> list[Finding]: """Detect repeated manual workflows that could be skills.""" # For gaps, we look for command-message patterns (slash command invocations) # and repeated multi-step bash sequences skill_invocations: Counter = Counter() - command_sequences: Counter = Counter() + Counter() analyzable = [s for s in sessions if s.size_bytes < 50 * 1024 * 1024][:max_sessions] @@ -745,14 +836,16 @@ def run_gaps_pass(sessions: list[SessionInfo], max_sessions: int = 100) -> list[ if skill_invocations: top_skills = skill_invocations.most_common(20) skills_summary = ", ".join(f"{name} ({count}x)" for name, count in top_skills) - findings.append(Finding( - type="gap", - description="Skill usage frequency distribution", - confidence=1.0, - frequency=sum(skill_invocations.values()), - sessions=[], - evidence=f"Top skills: {skills_summary}", - theme="skill_usage", - )) + findings.append( + Finding( + type="gap", + description="Skill usage frequency distribution", + confidence=1.0, + frequency=sum(skill_invocations.values()), + sessions=[], + evidence=f"Top skills: {skills_summary}", + theme="skill_usage", + ) + ) return findings diff --git a/afterburn/scanner.py b/afterburn/scanner.py index c876d6d..6fed4b3 100644 --- a/afterburn/scanner.py +++ b/afterburn/scanner.py @@ -1,7 +1,6 @@ """Session file discovery and filtering.""" import json -import os import re from dataclasses import dataclass from datetime import datetime @@ -163,11 +162,15 @@ def group_sessions_by_parent( if parent_slug is not None: # This is a worktree / child session - bucket = groups.setdefault(parent_slug, {"parent_sessions": [], "child_sessions": []}) + bucket = groups.setdefault( + parent_slug, {"parent_sessions": [], "child_sessions": []} + ) bucket["child_sessions"].append(session) else: # This is a parent (non-worktree) session - bucket = groups.setdefault(session.project_slug, {"parent_sessions": [], "child_sessions": []}) + bucket = groups.setdefault( + session.project_slug, {"parent_sessions": [], "child_sessions": []} + ) bucket["parent_sessions"].append(session) return groups diff --git a/afterburn/vendor/rlm_repl/engine.py b/afterburn/vendor/rlm_repl/engine.py index 04c2907..e9c2dfb 100644 --- a/afterburn/vendor/rlm_repl/engine.py +++ b/afterburn/vendor/rlm_repl/engine.py @@ -7,7 +7,11 @@ import sys from dataclasses import dataclass, field -from afterburn.vendor.rlm_repl.llm_client import ClaudeCLIClient, LLMClient, _detect_backend +from afterburn.vendor.rlm_repl.llm_client import ( + ClaudeCLIClient, + LLMClient, + _detect_backend, +) from afterburn.vendor.rlm_repl.sandbox import REPLSandbox @@ -79,12 +83,18 @@ def __post_init__(self): if backend == "claude": # Use Claude CLI — model names map to claude models root_model = self.root_model if self.root_model != "auto" else "haiku" - rec_model = self.recursive_model if self.recursive_model != "auto" else "haiku" + rec_model = ( + self.recursive_model if self.recursive_model != "auto" else "haiku" + ) self._root_client = ClaudeCLIClient(model=root_model) self._recursive_client = ClaudeCLIClient(model=rec_model) if self.verbose: import sys - print(f" [RLM] Using Claude CLI (root={root_model}, recursive={rec_model})", file=sys.stderr) + + print( + f" [RLM] Using Claude CLI (root={root_model}, recursive={rec_model})", + file=sys.stderr, + ) else: # Use OpenAI-compatible API self._root_client = LLMClient( @@ -113,7 +123,10 @@ def completion(self, context, query: str, system_prompt: str = "") -> str: # Create sandbox with recursive LLM wired in def llm_query(prompt: str) -> str: messages = [ - {"role": "system", "content": "You are a helpful analyst. Answer concisely."}, + { + "role": "system", + "content": "You are a helpful analyst. Answer concisely.", + }, {"role": "user", "content": prompt}, ] return self._recursive_client.chat(messages, max_tokens=4096) @@ -124,16 +137,22 @@ def llm_query(prompt: str) -> str: # Build initial messages messages = [ {"role": "system", "content": sys_prompt}, - {"role": "user", "content": ( - f"Analyze the data in `context` to answer this question:\n\n{query}\n\n" - f"Context info: {context_desc}\n\n" - "Start by inspecting the context structure." - )}, + { + "role": "user", + "content": ( + f"Analyze the data in `context` to answer this question:\n\n{query}\n\n" + f"Context info: {context_desc}\n\n" + "Start by inspecting the context structure." + ), + }, ] for iteration in range(self.max_iterations): if self.verbose: - print(f" [RLM] iteration {iteration + 1}/{self.max_iterations}", file=sys.stderr) + print( + f" [RLM] iteration {iteration + 1}/{self.max_iterations}", + file=sys.stderr, + ) # Get LLM response response = self._root_client.chat(messages, max_tokens=8192) @@ -143,16 +162,23 @@ def llm_query(prompt: str) -> str: if not code_blocks: # Check if response contains FINAL() directly in text - final_match = re.search(r'FINAL\(["\'](.+?)["\']\)', response, re.DOTALL) + final_match = re.search( + r'FINAL\(["\'](.+?)["\']\)', response, re.DOTALL + ) if final_match: return final_match.group(1) # No code and no FINAL — ask LLM to continue messages.append({"role": "assistant", "content": response}) - messages.append({"role": "user", "content": ( - "Please write Python code in a ```repl block to continue analysis. " - "When done, call FINAL(answer) with your answer." - )}) + messages.append( + { + "role": "user", + "content": ( + "Please write Python code in a ```repl block to continue analysis. " + "When done, call FINAL(answer) with your answer." + ), + } + ) continue # Execute each code block @@ -181,7 +207,9 @@ def llm_query(prompt: str) -> str: if all_stdout: combined_stdout = "\n".join(all_stdout) if len(combined_stdout) > self.max_output_length: - combined_stdout = combined_stdout[:self.max_output_length] + "\n[TRUNCATED]" + combined_stdout = ( + combined_stdout[: self.max_output_length] + "\n[TRUNCATED]" + ) result_parts.append(f"stdout:\n{combined_stdout}") if all_stderr: combined_stderr = "\n".join(all_stderr) diff --git a/afterburn/vendor/rlm_repl/llm_client.py b/afterburn/vendor/rlm_repl/llm_client.py index 93accc2..f3576a6 100644 --- a/afterburn/vendor/rlm_repl/llm_client.py +++ b/afterburn/vendor/rlm_repl/llm_client.py @@ -1,9 +1,8 @@ """LLM client — supports OpenAI-compatible API or Claude CLI.""" -import json import os import subprocess -from dataclasses import dataclass, field +from dataclasses import dataclass import requests @@ -19,7 +18,9 @@ def _detect_backend() -> str: try: result = subprocess.run( ["claude", "--version"], - capture_output=True, text=True, timeout=5, + capture_output=True, + text=True, + timeout=5, ) if result.returncode == 0: return "claude" diff --git a/afterburn/vendor/rlm_repl/sandbox.py b/afterburn/vendor/rlm_repl/sandbox.py index 71e4402..cf44311 100644 --- a/afterburn/vendor/rlm_repl/sandbox.py +++ b/afterburn/vendor/rlm_repl/sandbox.py @@ -1,7 +1,6 @@ """REPL sandbox — executes Python code with injected tools.""" import io -import sys import traceback from contextlib import redirect_stderr, redirect_stdout @@ -82,7 +81,9 @@ def _handle_final(self, answer): def _handle_final_var(self, variable_name: str): self._final_var_name = variable_name - val = self._globals.get(variable_name, f"[variable '{variable_name}' not found]") + val = self._globals.get( + variable_name, f"[variable '{variable_name}' not found]" + ) self._final_answer = val return val @@ -90,4 +91,8 @@ def _handle_final_var(self, variable_name: str): def locals(self) -> dict: """Get current sandbox variables (excluding builtins and tools).""" skip = {"__builtins__", "llm_query", "FINAL", "FINAL_VAR", "context"} - return {k: v for k, v in self._globals.items() if k not in skip and not k.startswith("_")} + return { + k: v + for k, v in self._globals.items() + if k not in skip and not k.startswith("_") + } diff --git a/tests/test_archive.py b/tests/test_archive.py index 7b66cf4..faa3536 100644 --- a/tests/test_archive.py +++ b/tests/test_archive.py @@ -43,8 +43,12 @@ def test_archive_sessions(): project_dir = Path(tmpdir) s1 = project_dir / "session1.jsonl" s2 = project_dir / "session2.jsonl" - s1.write_text('{"type":"user","message":{"role":"user","content":"hello"}}\n' * 100) - s2.write_text('{"type":"assistant","message":{"role":"assistant","content":"hi"}}\n' * 50) + s1.write_text( + '{"type":"user","message":{"role":"user","content":"hello"}}\n' * 100 + ) + s2.write_text( + '{"type":"assistant","message":{"role":"assistant","content":"hi"}}\n' * 50 + ) archive_path = _archive_sessions([s1, s2], project_dir) assert archive_path.exists() diff --git a/tests/test_correlation.py b/tests/test_correlation.py index 67a8ae8..4e81bc7 100644 --- a/tests/test_correlation.py +++ b/tests/test_correlation.py @@ -11,12 +11,16 @@ ) -def _create_session(base_dir: Path, slug: str, session_id: str, size_kb: int = 20) -> Path: +def _create_session( + base_dir: Path, slug: str, session_id: str, size_kb: int = 20 +) -> Path: """Create a fake session JSONL file.""" project_dir = base_dir / slug project_dir.mkdir(parents=True, exist_ok=True) session_file = project_dir / f"{session_id}.jsonl" - content = json.dumps({"type": "user", "message": {"role": "user", "content": "test"}}) + content = json.dumps( + {"type": "user", "message": {"role": "user", "content": "test"}} + ) # Pad to desired size lines = [content] * max(1, (size_kb * 1024) // len(content)) session_file.write_text("\n".join(lines)) @@ -54,8 +58,12 @@ def test_group_sessions_parent_and_children(): with tempfile.TemporaryDirectory() as tmpdir: base = Path(tmpdir) _create_session(base, "-home-user-project", "parent1", size_kb=20) - _create_session(base, "-home-user-project--claude-worktrees-agent-aaa", "child1", size_kb=20) - _create_session(base, "-home-user-project--claude-worktrees-agent-bbb", "child2", size_kb=20) + _create_session( + base, "-home-user-project--claude-worktrees-agent-aaa", "child1", size_kb=20 + ) + _create_session( + base, "-home-user-project--claude-worktrees-agent-bbb", "child2", size_kb=20 + ) sessions = discover_sessions(sessions_dir=base, include_worktrees=True) groups = group_sessions_by_parent(sessions) @@ -73,7 +81,9 @@ def test_group_sessions_orphan_children(): """Worktree sessions whose parent has no sessions still form a group.""" with tempfile.TemporaryDirectory() as tmpdir: base = Path(tmpdir) - _create_session(base, "-home-user-proj--claude-worktrees-agent-x", "orphan1", size_kb=20) + _create_session( + base, "-home-user-proj--claude-worktrees-agent-x", "orphan1", size_kb=20 + ) sessions = discover_sessions(sessions_dir=base, include_worktrees=True) groups = group_sessions_by_parent(sessions) @@ -104,9 +114,13 @@ def test_group_sessions_multiple_parents(): with tempfile.TemporaryDirectory() as tmpdir: base = Path(tmpdir) _create_session(base, "-home-user-alpha", "a1", size_kb=20) - _create_session(base, "-home-user-alpha--claude-worktrees-S-001", "ac1", size_kb=20) + _create_session( + base, "-home-user-alpha--claude-worktrees-S-001", "ac1", size_kb=20 + ) _create_session(base, "-home-user-beta", "b1", size_kb=20) - _create_session(base, "-home-user-beta--claude-worktrees-S-002", "bc1", size_kb=20) + _create_session( + base, "-home-user-beta--claude-worktrees-S-002", "bc1", size_kb=20 + ) sessions = discover_sessions(sessions_dir=base, include_worktrees=True) groups = group_sessions_by_parent(sessions) @@ -173,8 +187,12 @@ def test_worktree_sessions_identified_as_children(): with tempfile.TemporaryDirectory() as tmpdir: base = Path(tmpdir) _create_session(base, "-home-user-project", "orchestrator", size_kb=20) - _create_session(base, "-home-user-project--claude-worktrees-agent-abc", "agent1", size_kb=20) - _create_session(base, "-home-user-project--claude-worktrees-agent-def", "agent2", size_kb=20) + _create_session( + base, "-home-user-project--claude-worktrees-agent-abc", "agent1", size_kb=20 + ) + _create_session( + base, "-home-user-project--claude-worktrees-agent-def", "agent2", size_kb=20 + ) sessions = discover_sessions(sessions_dir=base, include_worktrees=True) groups = group_sessions_by_parent(sessions) diff --git a/tests/test_dead_releases.py b/tests/test_dead_releases.py index 461c3a1..d89c8ee 100644 --- a/tests/test_dead_releases.py +++ b/tests/test_dead_releases.py @@ -2,8 +2,6 @@ import os import subprocess -import tempfile -from pathlib import Path import pytest @@ -27,8 +25,13 @@ def run_git(*args): capture_output=True, text=True, check=True, - env={**os.environ, "GIT_AUTHOR_NAME": "Test", "GIT_AUTHOR_EMAIL": "t@t.com", - "GIT_COMMITTER_NAME": "Test", "GIT_COMMITTER_EMAIL": "t@t.com"}, + env={ + **os.environ, + "GIT_AUTHOR_NAME": "Test", + "GIT_AUTHOR_EMAIL": "t@t.com", + "GIT_COMMITTER_NAME": "Test", + "GIT_COMMITTER_EMAIL": "t@t.com", + }, ) run_git("init") @@ -49,9 +52,7 @@ def run_git(*args): "def dead_func():\n return 0\n" ) (repo / "main.py").write_text( - "from lib import helper, used_func\n\n" - "result = helper()\n" - "other = used_func()\n" + "from lib import helper, used_func\n\nresult = helper()\nother = used_func()\n" ) run_git("add", ".") run_git("commit", "-m", "add functions") @@ -117,8 +118,13 @@ def test_no_tags_returns_empty(tmp_path): ["git", "commit", "-m", "init"], cwd=str(repo), capture_output=True, - env={**os.environ, "GIT_AUTHOR_NAME": "Test", "GIT_AUTHOR_EMAIL": "t@t.com", - "GIT_COMMITTER_NAME": "Test", "GIT_COMMITTER_EMAIL": "t@t.com"}, + env={ + **os.environ, + "GIT_AUTHOR_NAME": "Test", + "GIT_AUTHOR_EMAIL": "t@t.com", + "GIT_COMMITTER_NAME": "Test", + "GIT_COMMITTER_EMAIL": "t@t.com", + }, ) findings = detect_dead_releases(str(repo)) assert findings == [] diff --git a/tests/test_scanner.py b/tests/test_scanner.py index 5ecc9ab..dc27675 100644 --- a/tests/test_scanner.py +++ b/tests/test_scanner.py @@ -7,12 +7,16 @@ from afterburn.scanner import discover_sessions -def _create_session(base_dir: Path, slug: str, session_id: str, size_kb: int = 20) -> Path: +def _create_session( + base_dir: Path, slug: str, session_id: str, size_kb: int = 20 +) -> Path: """Create a fake session JSONL file.""" project_dir = base_dir / slug project_dir.mkdir(parents=True, exist_ok=True) session_file = project_dir / f"{session_id}.jsonl" - content = json.dumps({"type": "user", "message": {"role": "user", "content": "test"}}) + content = json.dumps( + {"type": "user", "message": {"role": "user", "content": "test"}} + ) # Pad to desired size lines = [content] * max(1, (size_kb * 1024) // len(content)) session_file.write_text("\n".join(lines)) @@ -33,7 +37,9 @@ def test_discover_excludes_worktrees(): with tempfile.TemporaryDirectory() as tmpdir: base = Path(tmpdir) _create_session(base, "-home-user-project", "abc123", size_kb=20) - _create_session(base, "-home-user-project--claude-worktrees-S-001", "wt001", size_kb=20) + _create_session( + base, "-home-user-project--claude-worktrees-S-001", "wt001", size_kb=20 + ) sessions = discover_sessions(sessions_dir=base, include_worktrees=False) assert len(sessions) == 1 @@ -44,7 +50,9 @@ def test_discover_includes_worktrees_when_flag_set(): with tempfile.TemporaryDirectory() as tmpdir: base = Path(tmpdir) _create_session(base, "-home-user-project", "abc123", size_kb=20) - _create_session(base, "-home-user-project--claude-worktrees-S-001", "wt001", size_kb=20) + _create_session( + base, "-home-user-project--claude-worktrees-S-001", "wt001", size_kb=20 + ) sessions = discover_sessions(sessions_dir=base, include_worktrees=True) assert len(sessions) == 2