diff --git a/README.md b/README.md index be8faa9e..286dda3c 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,10 @@ bash brain-bar/build-app.sh # Build, sign, install LaunchAgent Requires the BrainLayer MCP server. The build script refuses non-canonical checkouts and dirty trees by default ([#265](https://github.com/EtanHey/brainlayer/pull/265)) and stamps each bundle with `GitCommit`, `GitDescribe`, and `BuildTimeUTC` in `Info.plist` ([#264](https://github.com/EtanHey/brainlayer/pull/264)) so a stale install is diagnosable in seconds. +## Writer Arbitration + +Background producers run with `BRAINLAYER_ARBITRATED=1` and append writes to `~/.brainlayer/queue/`; `com.brainlayer.drain.plist` drains that queue every 500ms as the single writer. Trigram FTS maintenance is explicit via `brainlayer repair-fts` and the weekly `com.brainlayer.repair-fts.plist`, not synchronous startup work. See [docs/arbitration.md](docs/arbitration.md). + ## Recent Hardening (2026-04-15 → 2026-05-02) Two-week stability sprint behind the next presentation. Every line below traces to a merged PR. diff --git a/docs/arbitration.md b/docs/arbitration.md new file mode 100644 index 00000000..c441d9c6 --- /dev/null +++ b/docs/arbitration.md @@ -0,0 +1,19 @@ +# BrainLayer Writer Arbitration + +BrainLayer uses a single-writer arbitration path for background producers that can otherwise fight over SQLite's write lock. + +## Operator Contract + +- `BRAINLAYER_ARBITRATED=1` makes producers enqueue writes instead of writing directly to the database. The current launchd templates set this for watch and enrichment. +- `BRAINLAYER_DRAIN_EMBED=0` disables post-drain embedding, mainly for tests or emergency operator debugging. Production should leave it enabled so queued `brain_store` chunks reach semantic search. +- The unified durable queue lives at `~/.brainlayer/queue/`. Each event is one JSONL file named by source, timestamp, and UUID. +- `com.brainlayer.drain.plist` runs `scripts/drain_daemon.py`, which drains the queue every 500ms under `BEGIN IMMEDIATE`. +- The drain daemon opens SQLite with APSW, loads `sqlite-vec`, writes chunks/enrichment updates, and embeds queued `brain_store` chunks into `chunk_vectors`/`chunk_vectors_binary` before removing their queue files. +- Legacy `pending-stores.jsonl` is migrated by `brainlayer flush`; migration assigns stable chunk IDs so rerunning after a crash is `INSERT OR IGNORE` safe. + +## FTS Repair + +- Startup no longer performs large synchronous trigram repairs. +- Run `brainlayer repair-fts` for an explicit `chunks_fts_trigram` rebuild. +- `scripts/launchd/com.brainlayer.repair-fts.plist` schedules that repair weekly. +- Set `BRAINLAYER_REPAIR=1` only for an operator-controlled process that should run the repair during VectorStore initialization. diff --git a/scripts/drain_daemon.py b/scripts/drain_daemon.py new file mode 100644 index 00000000..82a1af32 --- /dev/null +++ b/scripts/drain_daemon.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 +"""Launchd wrapper for BrainLayer's single-writer drain daemon.""" + +from __future__ import annotations + +import sys +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] +SRC_DIR = REPO_ROOT / "src" +if str(SRC_DIR) not in sys.path: + sys.path.insert(0, str(SRC_DIR)) + +from brainlayer.drain import drain_once, main, run_daemon # noqa: E402,F401 + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/launchd/com.brainlayer.drain.plist b/scripts/launchd/com.brainlayer.drain.plist new file mode 100644 index 00000000..d1152d73 --- /dev/null +++ b/scripts/launchd/com.brainlayer.drain.plist @@ -0,0 +1,33 @@ + + + + + Label + com.brainlayer.drain + + ProgramArguments + + __PYTHON_BIN__ + __REPO_ROOT__/scripts/drain_daemon.py + --interval + 0.5 + + + StandardOutPath + __HOME__/.brainlayer/logs/drain.log + StandardErrorPath + __HOME__/.brainlayer/logs/drain.err + + KeepAlive + + RunAtLoad + + ThrottleInterval + 5 + Nice + 10 + ProcessType + Background + + + diff --git a/scripts/launchd/com.brainlayer.enrichment.plist b/scripts/launchd/com.brainlayer.enrichment.plist index f8ac4d34..836e97a9 100644 --- a/scripts/launchd/com.brainlayer.enrichment.plist +++ b/scripts/launchd/com.brainlayer.enrichment.plist @@ -36,6 +36,8 @@ __BRAINLAYER_DIR__/src BRAINLAYER_STALL_TIMEOUT 300 + BRAINLAYER_ARBITRATED + 1 BRAINLAYER_ENRICH_RATE 15 BRAINLAYER_ENRICH_CONCURRENCY diff --git a/scripts/launchd/com.brainlayer.repair-fts.plist b/scripts/launchd/com.brainlayer.repair-fts.plist new file mode 100644 index 00000000..10c5057f --- /dev/null +++ b/scripts/launchd/com.brainlayer.repair-fts.plist @@ -0,0 +1,35 @@ + + + + + Label + com.brainlayer.repair-fts + + ProgramArguments + + __BRAINLAYER_BIN__ + repair-fts + + + + StartCalendarInterval + + Weekday + 0 + Hour + 4 + Minute + 30 + + + StandardOutPath + __HOME__/.local/share/brainlayer/logs/repair-fts.log + StandardErrorPath + __HOME__/.local/share/brainlayer/logs/repair-fts.err + + Nice + 15 + ProcessType + Background + + diff --git a/scripts/launchd/com.brainlayer.watch.plist b/scripts/launchd/com.brainlayer.watch.plist new file mode 100644 index 00000000..2fe0c98f --- /dev/null +++ b/scripts/launchd/com.brainlayer.watch.plist @@ -0,0 +1,48 @@ + + + + + Label + com.brainlayer.watch + + ProgramArguments + + __BRAINLAYER_BIN__ + watch + --poll + 1.0 + --batch-size + 10 + --flush-ms + 500 + + + EnvironmentVariables + + PATH + /usr/local/bin:/usr/bin:/bin:__HOME__/.local/bin + PYTHONUNBUFFERED + 1 + PYTHONPATH + __BRAINLAYER_DIR__/src + BRAINLAYER_ARBITRATED + 1 + + + StandardOutPath + __HOME__/.local/share/brainlayer/logs/watch.log + StandardErrorPath + __HOME__/.local/share/brainlayer/logs/watch.err + + KeepAlive + + RunAtLoad + + ThrottleInterval + 5 + Nice + 10 + ProcessType + Background + + diff --git a/scripts/launchd/install.sh b/scripts/launchd/install.sh index 1ec6ffa4..cc67f889 100755 --- a/scripts/launchd/install.sh +++ b/scripts/launchd/install.sh @@ -4,11 +4,14 @@ # Usage: # ./scripts/launchd/install.sh # Install all # ./scripts/launchd/install.sh index # Install indexing only +# ./scripts/launchd/install.sh watch # Install watcher only # ./scripts/launchd/install.sh enrich # Install enrichment only +# ./scripts/launchd/install.sh drain # Install queue drain only # ./scripts/launchd/install.sh decay # Install decay only # ./scripts/launchd/install.sh load enrichment # ./scripts/launchd/install.sh unload enrichment # ./scripts/launchd/install.sh checkpoint # Install WAL checkpoint only +# ./scripts/launchd/install.sh repair-fts # Install weekly explicit FTS repair # ./scripts/launchd/install.sh backup # Install daily DB backup only # ./scripts/launchd/install.sh remove # Unload and remove all set -euo pipefail @@ -20,8 +23,15 @@ BRAINLAYER_LOG_DIR="$HOME/.local/share/brainlayer/logs" BRAINLAYER_LIB_DIR="$HOME/.local/lib/brainlayer" BRAINLAYER_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)" BRAINLAYER_BIN="${BRAINLAYER_BIN:-$(which brainlayer 2>/dev/null || echo "$HOME/.local/bin/brainlayer")}" +PYTHON_BIN="${PYTHON_BIN:-$(command -v python3)}" GOOGLE_API_KEY="${GOOGLE_API_KEY:-}" +if [ -z "$PYTHON_BIN" ]; then + echo "ERROR: python3 not found in PATH" + echo "Install Python 3 or set PYTHON_BIN=/path/to/python3" + exit 1 +fi + if [ ! -x "$BRAINLAYER_BIN" ]; then echo "ERROR: brainlayer binary not found at $BRAINLAYER_BIN" echo "Install with: pip install -e . (from brainlayer repo)" @@ -30,6 +40,7 @@ if [ ! -x "$BRAINLAYER_BIN" ]; then fi mkdir -p "$LAUNCH_DIR" "$LOG_DIR" "$BRAINLAYER_LOG_DIR" "$BRAINLAYER_LIB_DIR" +mkdir -p "$HOME/.brainlayer/logs" "$HOME/.brainlayer/queue" resolve_google_api_key() { if [ -n "${GOOGLE_API_KEY:-}" ]; then @@ -88,6 +99,8 @@ install_plist() { -e "s|__HOME__|$HOME|g" \ -e "s|__BRAINLAYER_BIN__|$BRAINLAYER_BIN|g" \ -e "s|__BRAINLAYER_DIR__|$BRAINLAYER_DIR|g" \ + -e "s|__PYTHON_BIN__|$PYTHON_BIN|g" \ + -e "s|__REPO_ROOT__|$BRAINLAYER_DIR|g" \ -e "s|__GOOGLE_API_KEY__|$google_api_key|g" \ "$src" > "$dst" @@ -135,6 +148,9 @@ case "${1:-all}" in enrichment) install_plist enrichment ;; + watch) + install_plist watch + ;; decay) install_plist decay ;; @@ -147,15 +163,24 @@ case "${1:-all}" in checkpoint) install_plist wal-checkpoint ;; + drain) + install_plist drain + ;; + repair-fts) + install_plist repair-fts + ;; backup) install_backup_script install_plist backup-daily ;; all) install_plist index + install_plist drain + install_plist watch install_plist enrichment install_plist decay install_plist wal-checkpoint + install_plist repair-fts install_backup_script install_plist backup-daily # Remove old enrich plist if present @@ -165,13 +190,16 @@ case "${1:-all}" in remove_plist index remove_plist enrich 2>/dev/null || true remove_plist enrichment 2>/dev/null || true + remove_plist watch 2>/dev/null || true remove_plist decay 2>/dev/null || true + remove_plist drain 2>/dev/null || true remove_plist wal-checkpoint + remove_plist repair-fts 2>/dev/null || true remove_plist backup-daily 2>/dev/null || true rm -f "$BRAINLAYER_LIB_DIR/backup-daily.sh" ;; *) - echo "Usage: $0 [index|enrich|enrichment|decay|load [name]|unload [name]|checkpoint|backup|all|remove]" + echo "Usage: $0 [index|watch|enrich|enrichment|decay|drain|repair-fts|load [name]|unload [name]|checkpoint|backup|all|remove]" exit 1 ;; esac diff --git a/src/brainlayer/cli/__init__.py b/src/brainlayer/cli/__init__.py index d612c8fa..e4a643ff 100644 --- a/src/brainlayer/cli/__init__.py +++ b/src/brainlayer/cli/__init__.py @@ -1,5 +1,7 @@ """BrainLayer CLI - Command line interface for the knowledge pipeline.""" +import hashlib +import json import re as _re import sys import time @@ -1053,6 +1055,20 @@ def wal_checkpoint( raise typer.Exit(1) +@app.command("repair-fts") +def repair_fts() -> None: + """Explicitly rebuild FTS maintenance tables outside normal startup.""" + from ..paths import get_db_path + from ..vector_store import VectorStore + + store = VectorStore(get_db_path()) + try: + result = store.repair_fts(rebuild_trigram=True) + finally: + store.close() + console.print_json(data=result) + + @app.command("enrich-sessions") def enrich_sessions( project: str = typer.Option(None, "--project", "-p", help="Only enrich sessions from this project"), @@ -2077,36 +2093,82 @@ def hooks( @app.command("flush") def flush() -> None: - """Flush pending-stores.jsonl (queued items from DB lock errors).""" - from ..paths import DEFAULT_DB_PATH - - queue_path = DEFAULT_DB_PATH.parent / "pending-stores.jsonl" - if not queue_path.exists(): - rprint("[dim]No pending stores to flush.[/]") - return - - lines = queue_path.read_text().strip().splitlines() - if not lines: - rprint("[dim]Queue is empty.[/]") - queue_path.unlink(missing_ok=True) - return - - rprint(f"[bold]Flushing {len(lines)} queued store(s)...[/]") - - from ..embeddings import get_embedding_model - from ..mcp import _flush_pending_stores - from ..vector_store import VectorStore - - store = VectorStore(DEFAULT_DB_PATH) - model = get_embedding_model() + """Flush pending store queues and the unified arbitration queue.""" + from ..drain import drain_once + from ..paths import get_db_path + from ..queue_io import enqueue_store, get_queue_dir - flushed = _flush_pending_stores(store, model.embed_query) - store.close() + db_path = get_db_path() + queue_path = db_path.parent / "pending-stores.jsonl" + migrated = 0 + skipped = 0 + try: + if queue_path.exists(): + lines = queue_path.read_text().strip().splitlines() + for line in lines: + if not line.strip(): + continue + try: + item = json.loads(line) + except json.JSONDecodeError: + skipped += 1 + continue + content = item.get("content") + if not content: + skipped += 1 + continue + stable_payload = { + "content": content, + "memory_type": item.get("memory_type", "note"), + "project": item.get("project"), + "tags": item.get("tags"), + "importance": item.get("importance"), + } + chunk_id = ( + item.get("chunk_id") + or "pending-" + + hashlib.sha256( + json.dumps(stable_payload, sort_keys=True, separators=(",", ":")).encode("utf-8") + ).hexdigest()[:16] + ) + enqueue_store( + content=content, + memory_type=item.get("memory_type", "note"), + project=item.get("project"), + tags=item.get("tags"), + importance=item.get("importance"), + source="pending", + confidence_score=item.get("confidence_score"), + outcome=item.get("outcome"), + reversibility=item.get("reversibility"), + files_changed=item.get("files_changed"), + entity_id=item.get("entity_id"), + status=item.get("status"), + severity=item.get("severity"), + file_path=item.get("file_path"), + function_name=item.get("function_name"), + line_number=item.get("line_number"), + supersedes=item.get("supersedes"), + chunk_id=chunk_id, + ) + migrated += 1 + queue_path.unlink(missing_ok=True) + + queue_dir = get_queue_dir() + drained = 0 + while True: + count = drain_once(db_path=db_path, queue_dir=queue_dir) + if count == 0: + break + drained += count + except Exception as exc: + rprint(f"[bold red]Error flushing queues:[/] {exc}") + raise typer.Exit(1) - rprint(f"[green]Flushed {flushed} item(s).[/]") - remaining = len(lines) - flushed - if remaining > 0: - rprint(f"[yellow]{remaining} item(s) still pending (DB may still be locked).[/]") + rprint( + f"[green]Migrated {migrated} pending store(s); drained {drained} unified queue item(s); " + f"skipped {skipped} malformed item(s).[/]" + ) @app.command("code-intel") diff --git a/src/brainlayer/drain.py b/src/brainlayer/drain.py new file mode 100644 index 00000000..561a7b00 --- /dev/null +++ b/src/brainlayer/drain.py @@ -0,0 +1,470 @@ +"""Single-writer drain loop for BrainLayer's durable JSONL queue.""" + +from __future__ import annotations + +import argparse +import fcntl +import hashlib +import json +import logging +import os +import time +import uuid +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Callable + +import apsw +import sqlite_vec + +from ._helpers import serialize_f32 +from .paths import get_db_path + +logger = logging.getLogger(__name__) + + +@dataclass +class ApplyResult: + chunk_id: str | None = None + collision_chunk_id: str | None = None + + +def _default_db_path() -> Path: + return get_db_path() + + +def _default_queue_dir() -> Path: + from .queue_io import get_queue_dir + + return get_queue_dir() + + +def _default_log_path() -> Path: + return Path.home() / ".brainlayer" / "logs" / "drain.log" + + +def _log(path: Path, message: str) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + stamp = datetime.now(timezone.utc).isoformat() + with path.open("a", encoding="utf-8") as handle: + handle.write(f"{stamp} {message}\n") + + +def _open_connection(db_path: Path) -> apsw.Connection: + conn = apsw.Connection(str(db_path)) + conn.setbusytimeout(200) + conn.enableloadextension(True) + conn.loadextension(sqlite_vec.loadable_path()) + conn.enableloadextension(False) + return conn + + +def _acquire_queue_lock(queue_dir: Path) -> int: + fd = os.open(queue_dir, os.O_RDONLY) + try: + fcntl.flock(fd, fcntl.LOCK_EX) + except Exception: + os.close(fd) + raise + return fd + + +def _columns(conn: apsw.Connection, table: str) -> set[str]: + return {row[1] for row in conn.execute(f"PRAGMA table_info({table})")} + + +def _insert_chunk(conn: apsw.Connection, values: dict[str, Any]) -> None: + cols = _columns(conn, "chunks") + row = {key: value for key, value in values.items() if key in cols} + if "id" not in row and "chunk_id" in cols: + row["chunk_id"] = values["id"] + if "chunk_id" not in row and "id" in cols: + row["id"] = values["id"] + names = list(row) + placeholders = ", ".join("?" for _ in names) + sql = f"INSERT OR IGNORE INTO chunks ({', '.join(names)}) VALUES ({placeholders})" + conn.execute(sql, [row[name] for name in names]) + + +def _event_payload(event: dict[str, Any]) -> dict[str, Any]: + if "kind" in event: + return event + if "content" in event and "memory_type" in event: + return {"kind": "store_memory", **event} + if "session_id" in event and "content" in event: + return {"kind": "hook_chunk", **event} + return {"kind": "unknown", **event} + + +def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: + raw_content = event.get("content") + if raw_content is None: + logger.warning("Skipping malformed store event without content") + return ApplyResult() + content = str(raw_content).strip() + if not content: + logger.warning("Skipping malformed store event with empty content") + return ApplyResult() + now = datetime.now(timezone.utc).isoformat() + metadata = {"memory_type": event.get("memory_type", "note")} + raw_metadata = event.get("metadata") + if isinstance(raw_metadata, dict): + metadata.update(raw_metadata) + elif raw_metadata: + logger.warning("Skipping non-object store metadata for chunk_id=%s", event.get("chunk_id")) + chunk_id = event.get("chunk_id") or f"manual-{uuid.uuid4().hex[:16]}" + existing = conn.execute("SELECT content FROM chunks WHERE id = ?", (chunk_id,)).fetchone() + if existing: + if str(existing[0]).strip() == content: + return ApplyResult(chunk_id=chunk_id) + return ApplyResult(collision_chunk_id=chunk_id) + tags = event.get("tags") + _insert_chunk( + conn, + { + "id": chunk_id, + "content": content, + "metadata": json.dumps(metadata), + "source_file": "brainlayer-queue", + "project": event.get("project"), + "content_type": event.get("memory_type", "note"), + "value_type": "HIGH", + "char_count": len(content), + "source": event.get("source") or "manual", + "created_at": now, + "enriched_at": now, + "summary": content[:200], + "tags": json.dumps(tags) if tags else None, + "importance": float(event["importance"]) if event.get("importance") is not None else None, + }, + ) + supersedes = event.get("supersedes") or metadata.get("supersedes") + cols = _columns(conn, "chunks") + if supersedes and "superseded_by" in cols: + conn.execute("UPDATE chunks SET superseded_by = ? WHERE id = ?", (chunk_id, supersedes)) + for vector_table in ("chunk_vectors", "chunk_vectors_binary"): + if conn.execute( + "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,) + ).fetchone(): + conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (supersedes,)) + entity_id = event.get("entity_id") or metadata.get("entity_id") + if entity_id and {"kg_entities", "kg_entity_chunks"}.issubset(_table_names(conn)): + if conn.execute("SELECT id FROM kg_entities WHERE id = ?", (entity_id,)).fetchone(): + conn.execute( + """ + INSERT OR REPLACE INTO kg_entity_chunks(entity_id, chunk_id, relevance, context) + VALUES (?, ?, ?, ?) + """, + (entity_id, chunk_id, 1.0, f"Stored via brain_store: {event.get('memory_type', 'note')}"), + ) + else: + logger.warning("Skipping entity link for unknown entity_id=%s chunk_id=%s", entity_id, chunk_id) + return ApplyResult(chunk_id=chunk_id) + + +def _apply_watcher(conn: apsw.Connection, event: dict[str, Any]) -> None: + chunk_id = event.get("chunk_id") + raw_content = event.get("content") + if not chunk_id or raw_content is None: + logger.warning("Skipping malformed watcher event without chunk_id/content") + return + content = str(raw_content).strip() + if not content: + logger.warning("Skipping malformed watcher event with empty content") + return + tags = event.get("tags") + _insert_chunk( + conn, + { + "id": chunk_id, + "content": content, + "metadata": json.dumps(event.get("metadata") or {}), + "source_file": event.get("source_file") or "realtime-watcher", + "project": event.get("project"), + "content_type": event.get("content_type") or "assistant_text", + "value_type": event.get("value_type") or "HIGH", + "char_count": len(content), + "source": "realtime_watcher", + "created_at": event.get("created_at") or datetime.now(timezone.utc).isoformat(), + "conversation_id": event.get("conversation_id"), + "sender": event.get("sender"), + "tags": json.dumps(tags) if tags else None, + }, + ) + + +def _apply_hook(conn: apsw.Connection, event: dict[str, Any]) -> None: + raw_content = event.get("content") + if raw_content is None: + logger.warning("Skipping malformed hook event without content") + return + content = str(raw_content).strip() + if not content: + logger.warning("Skipping malformed hook event with empty content") + return + content_hash = event.get("content_hash") or hashlib.sha256(content.encode()).hexdigest()[:16] + session_id = event.get("session_id") or "unknown" + chunk_id = event.get("chunk_id") or f"rt-{str(session_id)[:8]}-{content_hash}" + ts_raw = event.get("timestamp") + try: + timestamp = float(ts_raw) if ts_raw is not None else time.time() + except (TypeError, ValueError): + logger.warning("Invalid hook timestamp %r; using current time", ts_raw) + timestamp = time.time() + _insert_chunk( + conn, + { + "id": chunk_id, + "content": content, + "metadata": json.dumps({"session_id": session_id, "content_hash": content_hash}), + "source_file": "realtime-hook", + "project": event.get("project"), + "content_type": "assistant_text", + "value_type": "HIGH", + "char_count": len(content), + "source": "realtime", + "created_at": datetime.fromtimestamp(timestamp, timezone.utc).isoformat(), + "conversation_id": session_id, + "importance": 5, + }, + ) + + +def _apply_enrichment(conn: apsw.Connection, event: dict[str, Any]) -> None: + chunk_id = event.get("chunk_id") + if not chunk_id: + logger.warning("Skipping malformed enrichment event without chunk_id") + return + enrichment = event.get("enrichment") or {} + cols = _columns(conn, "chunks") + updates: dict[str, Any] = {} + mappings = { + "summary": "summary", + "importance": "importance", + "intent": "intent", + "epistemic_level": "epistemic_level", + "version_scope": "version_scope", + "debt_impact": "debt_impact", + "sentiment_label": "sentiment_label", + "sentiment_score": "sentiment_score", + } + for key, col in mappings.items(): + if col in cols and key in enrichment: + updates[col] = enrichment[key] + for key in ("tags", "primary_symbols", "external_deps", "key_facts", "resolved_queries", "sentiment_signals"): + if key in cols and enrichment.get(key) is not None: + updates[key] = json.dumps(enrichment[key]) + if "resolved_query" in cols: + resolved_query = enrichment.get("resolved_query") + if ( + not resolved_query + and isinstance(enrichment.get("resolved_queries"), list) + and enrichment["resolved_queries"] + ): + resolved_query = enrichment["resolved_queries"][0] + if resolved_query: + updates["resolved_query"] = resolved_query + if "raw_entities_json" in cols and event.get("entities") is not None: + updates["raw_entities_json"] = json.dumps(event["entities"]) + if "content_hash" in cols and event.get("content_hash"): + updates["content_hash"] = event["content_hash"] + if "enriched_at" in cols: + updates["enriched_at"] = datetime.now(timezone.utc).isoformat() + if not updates: + return + assignments = ", ".join(f"{col} = ?" for col in updates) + conn.execute(f"UPDATE chunks SET {assignments} WHERE id = ?", [*updates.values(), chunk_id]) + + +def _apply_event(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: + event = _event_payload(event) + kind = event.get("kind") + if kind == "store_memory": + return _apply_store(conn, event) + elif kind == "watcher_chunk": + _apply_watcher(conn, event) + elif kind == "hook_chunk": + _apply_hook(conn, event) + elif kind == "enrichment_update": + _apply_enrichment(conn, event) + return ApplyResult() + + +def _read_events(path: Path) -> list[dict[str, Any]]: + events = [] + for lineno, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1): + if line.strip(): + try: + event = json.loads(line) + except json.JSONDecodeError as exc: + logger.warning("Skipping malformed JSON in %s:%d: %s", path, lineno, exc) + continue + if isinstance(event, dict): + events.append(event) + else: + logger.warning("Skipping non-object JSON in %s:%d", path, lineno) + return events + + +def _table_names(conn: apsw.Connection) -> set[str]: + return {row[0] for row in conn.execute("SELECT name FROM sqlite_master WHERE type IN ('table', 'virtual')")} + + +def _is_busy_error(exc: BaseException) -> bool: + return isinstance(exc, apsw.BusyError) + + +def _default_embed_fn() -> Callable[[str], list[float]]: + from .embeddings import get_embedding_model + + return get_embedding_model().embed_query + + +def _embedding_enabled() -> bool: + return os.environ.get("BRAINLAYER_DRAIN_EMBED", "1").lower() not in {"0", "false", "no"} + + +def _embed_store_chunks( + conn: apsw.Connection, + chunk_ids: list[str], + embed_fn: Callable[[str], list[float]] | None, +) -> None: + if not chunk_ids or "chunk_vectors" not in _table_names(conn): + return + resolved_embed_fn = embed_fn or _default_embed_fn() + unique_chunk_ids = list(dict.fromkeys(chunk_ids)) + for chunk_id in unique_chunk_ids: + try: + row = conn.execute("SELECT content FROM chunks WHERE id = ?", (chunk_id,)).fetchone() + if not row: + continue + embedding_bytes = serialize_f32(resolved_embed_fn(row[0])) + conn.execute("DELETE FROM chunk_vectors WHERE chunk_id = ?", (chunk_id,)) + conn.execute("INSERT INTO chunk_vectors (chunk_id, embedding) VALUES (?, ?)", (chunk_id, embedding_bytes)) + if "chunk_vectors_binary" in _table_names(conn): + conn.execute("DELETE FROM chunk_vectors_binary WHERE chunk_id = ?", (chunk_id,)) + conn.execute( + "INSERT INTO chunk_vectors_binary (chunk_id, embedding) VALUES (?, vec_quantize_binary(?))", + (chunk_id, embedding_bytes), + ) + except Exception as exc: + logger.warning("Failed to embed drained chunk %s: %s", chunk_id, exc) + + +def _quarantine_file(path: Path, log_path: Path, reason: BaseException) -> None: + target = path.with_name(f"{path.name}.bad") + if target.exists(): + target = path.with_name(f"{path.name}.bad-{int(time.time() * 1000)}-{uuid.uuid4().hex[:8]}") + try: + path.replace(target) + _log(log_path, f"skipped poison queue file {path.name}: {reason}; moved_to={target.name}") + except OSError as exc: + _log(log_path, f"skipped poison queue file {path.name}: {reason}; quarantine_failed={exc}") + + +def _unlink_processed_file(path: Path, log_path: Path) -> None: + try: + path.unlink() + except FileNotFoundError: + pass + except OSError as exc: + _log(log_path, f"drain committed but could not unlink {path}: {exc}") + + +def drain_once( + *, + db_path: Path | None = None, + queue_dir: Path | None = None, + batch_size: int = 250, + log_path: Path | None = None, + embed_fn: Callable[[str], list[float]] | None = None, +) -> int: + db_path = db_path or _default_db_path() + queue_dir = queue_dir or _default_queue_dir() + log_path = log_path or _default_log_path() + queue_dir.mkdir(parents=True, exist_ok=True) + + lock_fd = _acquire_queue_lock(queue_dir) + try: + files = sorted(queue_dir.glob("*.jsonl"))[:batch_size] + if not files: + return 0 + _log(log_path, f"queue_depth={len(files)}") + + drained = 0 + collisions_dropped = 0 + should_embed = _embedding_enabled() + for path in files: + try: + events = _read_events(path) + except (UnicodeDecodeError, OSError) as exc: + _quarantine_file(path, log_path, exc) + continue + if not events: + _unlink_processed_file(path, log_path) + continue + + for attempt in range(5): + conn = _open_connection(db_path) + attempt_drained = 0 + collision_ids: list[str] = [] + store_chunk_ids: list[str] = [] + try: + conn.execute("BEGIN IMMEDIATE") + for event in events: + result = _apply_event(conn, event) + if result.chunk_id: + store_chunk_ids.append(result.chunk_id) + if result.collision_chunk_id: + collision_ids.append(result.collision_chunk_id) + attempt_drained += 1 + if should_embed: + _embed_store_chunks(conn, store_chunk_ids, embed_fn) + conn.execute("COMMIT") + drained += attempt_drained + collisions_dropped += len(collision_ids) + for chunk_id in collision_ids: + _log(log_path, f"WARN: queued chunk_id {chunk_id} collided with existing row, dropped") + _unlink_processed_file(path, log_path) + break + except Exception as exc: + try: + conn.execute("ROLLBACK") + except Exception: + pass + if _is_busy_error(exc) and attempt < 4: + delay = 0.05 * (2**attempt) + _log(log_path, f"drain busy; retrying in {delay:.2f}s") + time.sleep(delay) + continue + _log(log_path, f"drain failed for {path.name}: {exc}") + break + finally: + conn.close() + finally: + fcntl.flock(lock_fd, fcntl.LOCK_UN) + os.close(lock_fd) + + if drained: + _log(log_path, f"drained={drained} collisions_dropped={collisions_dropped}") + return drained + + +def run_daemon(interval: float, batch_size: int) -> None: + while True: + drain_once(batch_size=batch_size) + time.sleep(interval) + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--once", action="store_true") + parser.add_argument("--interval", type=float, default=0.5) + parser.add_argument("--batch-size", type=int, default=250) + args = parser.parse_args() + if args.once: + print(drain_once(batch_size=args.batch_size)) + return 0 + run_daemon(args.interval, args.batch_size) + return 0 diff --git a/src/brainlayer/enrichment_controller.py b/src/brainlayer/enrichment_controller.py index 4226acc5..11530809 100644 --- a/src/brainlayer/enrichment_controller.py +++ b/src/brainlayer/enrichment_controller.py @@ -164,6 +164,39 @@ def _submit_write(store, name: str, callback) -> Any: return _get_store_write_queue(store).submit(name, callback).result() +def _arbitrated_writes_enabled() -> bool: + return os.environ.get("BRAINLAYER_ARBITRATED") == "1" + + +def _enqueue_enrichment_write(chunk: dict[str, Any], enrichment: dict[str, Any]) -> None: + from .queue_io import enqueue_enrichment_update + + content = chunk.get("content", "") + try: + enqueue_enrichment_update( + chunk_id=chunk["id"], + enrichment=enrichment, + content_hash=_content_hash(content) if content else None, + entities=enrichment.get("entities", []), + ) + except Exception: + logger.exception("Failed to enqueue enrichment update for chunk %s", chunk.get("id")) + raise + + +def _enqueue_meta_research_write(chunk: dict[str, Any]) -> None: + tags = _normalize_chunk_tags(chunk.get("tags")) + if "meta-research" not in tags: + tags.append("meta-research") + _enqueue_enrichment_write( + chunk, + { + "summary": None, + "tags": tags, + }, + ) + + def _ensure_enrichment_columns(store) -> None: key = _store_queue_key(store) with _ENRICHMENT_COLUMN_LOCK: @@ -639,7 +672,10 @@ def enrich_single(store, chunk_id: str, max_retries: int = 2) -> dict[str, Any] return None if is_meta_research(chunk.get("content", "")): - _submit_write(store, f"mark-meta:{chunk_id}", lambda: _mark_meta_research(store, chunk)) + if _arbitrated_writes_enabled(): + _enqueue_meta_research_write(chunk) + else: + _submit_write(store, f"mark-meta:{chunk_id}", lambda: _mark_meta_research(store, chunk)) logger.info("enrich_single: tagged %s as meta-research without Gemini", chunk_id) return None @@ -680,7 +716,12 @@ def _call(): return None try: - _submit_write(store, f"apply-enrichment:{chunk_id}", lambda: _apply_enrichment(store, chunk, enrichment)) + if _arbitrated_writes_enabled(): + _enqueue_enrichment_write(chunk, enrichment) + else: + _submit_write( + store, f"apply-enrichment:{chunk_id}", lambda: _apply_enrichment(store, chunk, enrichment) + ) except Exception as exc: logger.warning("enrich_single: apply failed for %s: %s", chunk_id, exc) return None @@ -811,9 +852,12 @@ def is_duplicate(content: str) -> bool: result.skipped += 1 continue if status == "meta": - _submit_write( - store, f"mark-meta:{chunk['id']}", lambda chunk=chunk: _mark_meta_research(store, chunk) - ) + if _arbitrated_writes_enabled(): + _enqueue_meta_research_write(chunk) + else: + _submit_write( + store, f"mark-meta:{chunk['id']}", lambda chunk=chunk: _mark_meta_research(store, chunk) + ) result.skipped += 1 continue if status == "error": @@ -822,11 +866,14 @@ def is_duplicate(content: str) -> bool: _emit_enrichment_error("realtime", chunk["id"], str(data)) continue - _submit_write( - store, - f"apply-enrichment:{chunk['id']}", - lambda chunk=chunk, data=data: _apply_enrichment(store, chunk, data), - ) + if _arbitrated_writes_enabled(): + _enqueue_enrichment_write(chunk, data) + else: + _submit_write( + store, + f"apply-enrichment:{chunk['id']}", + lambda chunk=chunk, data=data: _apply_enrichment(store, chunk, data), + ) result.enriched += 1 duration_ms = (time.monotonic() - start_time) * 1000 @@ -877,7 +924,12 @@ def enrich_batch( for chunk in candidates: if is_meta_research(chunk.get("content", "")): - _submit_write(store, f"mark-meta:{chunk['id']}", lambda chunk=chunk: _mark_meta_research(store, chunk)) + if _arbitrated_writes_enabled(): + _enqueue_meta_research_write(chunk) + else: + _submit_write( + store, f"mark-meta:{chunk['id']}", lambda chunk=chunk: _mark_meta_research(store, chunk) + ) result.skipped += 1 continue if _is_duplicate_content(store, chunk.get("content", "")): @@ -908,11 +960,14 @@ def enrich_batch( result.errors.append(f"{chunk['id']}: invalid_enrichment") _emit_enrichment_error("batch", chunk["id"], "invalid_enrichment") continue - _submit_write( - store, - f"apply-enrichment:{chunk['id']}", - lambda chunk=chunk, enrichment=enrichment: _apply_enrichment(store, chunk, enrichment), - ) + if _arbitrated_writes_enabled(): + _enqueue_enrichment_write(chunk, enrichment) + else: + _submit_write( + store, + f"apply-enrichment:{chunk['id']}", + lambda chunk=chunk, enrichment=enrichment: _apply_enrichment(store, chunk, enrichment), + ) result.enriched += 1 except Exception as exc: result.failed += 1 diff --git a/src/brainlayer/hooks/indexer.py b/src/brainlayer/hooks/indexer.py index b4026843..82522673 100644 --- a/src/brainlayer/hooks/indexer.py +++ b/src/brainlayer/hooks/indexer.py @@ -13,12 +13,13 @@ """ import hashlib -import json import logging import sqlite3 import time from pathlib import Path +from ..queue_io import enqueue_hook_chunk, get_queue_dir + logger = logging.getLogger(__name__) @@ -27,7 +28,7 @@ class RealtimeIndexer: def __init__(self, db_path: str | None = None, queue_dir: str | None = None): self.db_path = db_path - self.queue_dir = queue_dir + self.queue_dir = queue_dir or str(get_queue_dir()) self.pending_prompts: dict[str, dict] = {} self._db: sqlite3.Connection | None = None self._init_db() @@ -204,18 +205,14 @@ def _write_to_queue(self, session_id: str, content: str, content_hash: str, proj """Fallback: write to queue directory when DB is unavailable.""" if not self.queue_dir: return - queue_path = Path(self.queue_dir) - queue_path.mkdir(parents=True, exist_ok=True) - entry = { - "session_id": session_id, - "content": content, - "content_hash": content_hash, - "project": project, - "timestamp": time.time(), - } - queue_file = queue_path / f"{session_id}.jsonl" - with open(queue_file, "a") as f: - f.write(json.dumps(entry) + "\n") + enqueue_hook_chunk( + session_id=session_id, + content=content, + content_hash=content_hash, + project=project, + timestamp=time.time(), + queue_dir=Path(self.queue_dir), + ) def close(self): """Close the database connection.""" diff --git a/src/brainlayer/mcp/store_handler.py b/src/brainlayer/mcp/store_handler.py index bfcea10a..062852ef 100644 --- a/src/brainlayer/mcp/store_handler.py +++ b/src/brainlayer/mcp/store_handler.py @@ -2,6 +2,7 @@ import asyncio import json +import os import threading import apsw @@ -388,6 +389,14 @@ def _queue_store(item: dict) -> None: Enforces _QUEUE_MAX_SIZE: if the file exceeds the limit, oldest lines are dropped to make room. """ + try: + from ..queue_io import enqueue_store + + enqueue_store(**item, source="mcp") + return + except Exception: + logger.debug("Unified queue write failed; falling back to pending-stores.jsonl", exc_info=True) + path = _get_pending_store_path() path.parent.mkdir(parents=True, exist_ok=True) @@ -491,6 +500,42 @@ async def _store( A background task embeds pending chunks after the response is sent. """ try: + if os.environ.get("BRAINLAYER_ARBITRATED") == "1": + from ..pipeline.classify import looks_like_system_prompt + from ..search_repo import clear_hybrid_search_cache + from ..store import VALID_MEMORY_TYPES + + if not content or not content.strip(): + raise ValueError("content must be non-empty") + content = content.strip() + if memory_type not in VALID_MEMORY_TYPES: + raise ValueError(f"type must be one of: {', '.join(VALID_MEMORY_TYPES)}") + if looks_like_system_prompt(content): + raise ValueError("system prompt content is not stored in BrainLayer") + _queue_store( + { + "content": content, + "memory_type": memory_type, + "project": _normalize_project_name(project), + "tags": tags, + "importance": importance, + "confidence_score": confidence_score, + "outcome": outcome, + "reversibility": reversibility, + "files_changed": files_changed, + "entity_id": entity_id, + "status": status, + "severity": severity, + "file_path": file_path, + "function_name": function_name, + "line_number": line_number, + "supersedes": supersedes, + } + ) + clear_hybrid_search_cache() + structured = {"chunk_id": "queued", "related": []} + return ([TextContent(type="text", text=format_store_result("queued", queued=True))], structured) + from ..store import embed_pending_chunks, store_memory store = _get_vector_store() diff --git a/src/brainlayer/queue_io.py b/src/brainlayer/queue_io.py new file mode 100644 index 00000000..15b173d9 --- /dev/null +++ b/src/brainlayer/queue_io.py @@ -0,0 +1,149 @@ +"""Durable JSONL queue for BrainLayer write arbitration.""" + +from __future__ import annotations + +import json +import os +import time +import uuid +from pathlib import Path +from typing import Any + + +def _safe_source_name(source: str) -> str: + safe = "".join(char if char.isalnum() or char in {".", "_", "-"} else "_" for char in source) + return safe.strip("._-") or "queue" + + +def get_queue_dir() -> Path: + env = os.environ.get("BRAINLAYER_QUEUE_DIR") + if env: + return Path(env).expanduser() + return Path.home() / ".brainlayer" / "queue" + + +def enqueue_jsonl(event: dict[str, Any], *, source: str, queue_dir: Path | None = None) -> Path: + """Atomically append a write intent as a one-line JSONL file.""" + resolved_dir = queue_dir or get_queue_dir() + resolved_dir.mkdir(parents=True, exist_ok=True) + now_ms = int(time.time() * 1000) + safe_source = _safe_source_name(source) + event = { + **event, + "source": source, + "queued_at": time.time(), + } + final_path = resolved_dir / f"{safe_source}-{now_ms}-{uuid.uuid4().hex}.jsonl" + tmp_path = final_path.with_suffix(".tmp") + tmp_path.write_text(json.dumps(event, ensure_ascii=True) + "\n", encoding="utf-8") + tmp_path.replace(final_path) + return final_path + + +def enqueue_store( + *, + content: str, + memory_type: str = "note", + project: str | None = None, + tags: list[str] | None = None, + importance: int | None = None, + source: str = "mcp", + queue_dir: Path | None = None, + **metadata: Any, +) -> Path: + supersedes = metadata.pop("supersedes", None) + chunk_id = metadata.pop("chunk_id", None) or f"manual-{uuid.uuid4().hex[:16]}" + return enqueue_jsonl( + { + "kind": "store_memory", + "chunk_id": chunk_id, + "content": content, + "memory_type": memory_type, + "project": project, + "tags": tags, + "importance": importance, + "supersedes": supersedes, + "metadata": {key: value for key, value in metadata.items() if value is not None}, + }, + source=source, + queue_dir=queue_dir, + ) + + +def enqueue_watcher_chunk( + *, + chunk_id: str, + content: str, + metadata: dict[str, Any], + source_file: str, + project: str | None, + content_type: str, + value_type: str, + created_at: str, + conversation_id: str, + sender: str | None = None, + tags: list[str] | None = None, + queue_dir: Path | None = None, +) -> Path: + return enqueue_jsonl( + { + "kind": "watcher_chunk", + "chunk_id": chunk_id, + "content": content, + "metadata": metadata, + "source_file": source_file, + "project": project, + "content_type": content_type, + "value_type": value_type, + "created_at": created_at, + "conversation_id": conversation_id, + "sender": sender, + "tags": tags, + }, + source="watcher", + queue_dir=queue_dir, + ) + + +def enqueue_hook_chunk( + *, + session_id: str, + content: str, + content_hash: str | None = None, + project: str | None = None, + timestamp: float | None = None, + queue_dir: Path | None = None, +) -> Path: + return enqueue_jsonl( + { + "kind": "hook_chunk", + "session_id": session_id, + "content": content, + "content_hash": content_hash, + "project": project, + "timestamp": timestamp or time.time(), + }, + source="hook", + queue_dir=queue_dir, + ) + + +def enqueue_enrichment_update( + *, + chunk_id: str, + enrichment: dict[str, Any], + content_hash: str | None = None, + entities: list[Any] | None = None, + queue_dir: Path | None = None, +) -> Path: + return enqueue_jsonl( + { + "kind": "enrichment_update", + "chunk_id": chunk_id, + "enrichment": enrichment, + "content_hash": content_hash, + "entities": entities, + }, + source="enrichment", + queue_dir=queue_dir, + ) diff --git a/src/brainlayer/vector_store.py b/src/brainlayer/vector_store.py index a7a98c23..3aaf55cd 100644 --- a/src/brainlayer/vector_store.py +++ b/src/brainlayer/vector_store.py @@ -363,14 +363,9 @@ def _init_db(self) -> None: END """) - trigram_count = cursor.execute("SELECT COUNT(*) FROM chunks_fts_trigram").fetchone()[0] - chunk_count = cursor.execute("SELECT COUNT(*) FROM chunks").fetchone()[0] - if trigram_count != chunk_count: - cursor.execute("DELETE FROM chunks_fts_trigram") - cursor.execute(""" - INSERT INTO chunks_fts_trigram(content, summary, tags, resolved_query, key_facts, resolved_queries, chunk_id) - SELECT content, summary, tags, resolved_query, key_facts, resolved_queries, id FROM chunks - """) + self._schema_user_version = cursor.execute("PRAGMA user_version").fetchone()[0] + if os.environ.get("BRAINLAYER_REPAIR") == "1": + self.repair_fts(rebuild_trigram=True) # ── Tag junction table (replaces json_each scanning) ────────── cursor.execute(""" @@ -941,6 +936,41 @@ def _init_db(self) -> None: # MCP tool calls (e.g., brain_search) hit the same VectorStore. self._local = threading.local() + def repair_fts(self, *, rebuild_trigram: bool = True) -> dict[str, int]: + """Run explicit FTS repair work outside normal startup.""" + for attempt in range(5): + cursor = self.conn.cursor() + repaired: dict[str, int] = {} + transaction_started = False + try: + cursor.execute("PRAGMA wal_checkpoint(FULL)") + cursor.execute("BEGIN IMMEDIATE") + transaction_started = True + if rebuild_trigram: + cursor.execute("DELETE FROM chunks_fts_trigram") + cursor.execute(""" + INSERT INTO chunks_fts_trigram(content, summary, tags, resolved_query, key_facts, resolved_queries, chunk_id) + SELECT content, summary, tags, resolved_query, key_facts, resolved_queries, id FROM chunks + """) + repaired["chunks_fts_trigram"] = cursor.execute( + "SELECT COUNT(*) FROM chunks_fts_trigram" + ).fetchone()[0] + cursor.execute("COMMIT") + transaction_started = False + cursor.execute("PRAGMA wal_checkpoint(FULL)") + return repaired + except apsw.BusyError: + if transaction_started: + cursor.execute("ROLLBACK") + if attempt == 4: + raise + time.sleep(0.2 * (2**attempt)) + except Exception: + if transaction_started: + cursor.execute("ROLLBACK") + raise + return {} + def _get_read_conn(self) -> apsw.Connection: """Get or create a per-thread readonly connection.""" conn = getattr(self._local, "read_conn", None) diff --git a/src/brainlayer/watcher_bridge.py b/src/brainlayer/watcher_bridge.py index 8dabe761..5703ebd5 100644 --- a/src/brainlayer/watcher_bridge.py +++ b/src/brainlayer/watcher_bridge.py @@ -13,6 +13,7 @@ import hashlib import json import logging +import os import re from datetime import datetime, timezone from pathlib import Path @@ -22,6 +23,7 @@ from .pipeline.chunk import chunk_content from .pipeline.classify import classify_content from .pipeline.correction_detection import build_correction_tags +from .queue_io import enqueue_watcher_chunk from .vector_store import VectorStore logger = logging.getLogger(__name__) @@ -202,15 +204,15 @@ def create_flush_callback(db_path: Path | None = None) -> callable: Returns a callable that takes a list[dict] of raw JSONL entries and inserts them as chunks into the database (deferred embedding). """ - resolved_db = db_path or get_db_path() - store = VectorStore(resolved_db) + arbitrated = os.environ.get("BRAINLAYER_ARBITRATED") == "1" + store = None if arbitrated else VectorStore(db_path or get_db_path()) def flush_to_db(entries: list[dict[str, Any]]) -> None: """Process raw JSONL entries through pipeline and insert into DB.""" import time as _time flush_start = _time.monotonic() - cursor = store.conn.cursor() + cursor = None if store is None else store.conn.cursor() inserted = 0 skipped = 0 source_files_seen: set[str] = set() @@ -268,34 +270,51 @@ def flush_to_db(entries: list[dict[str, Any]]) -> None: tags = json.dumps(correction_tags) try: - cursor.execute( - """INSERT OR IGNORE INTO chunks - (id, content, metadata, source_file, project, - content_type, value_type, char_count, source, - created_at, conversation_id, sender, tags) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", - ( - chunk_id, - clean_content, - json.dumps(chunk.metadata), - source_file, - project, - chunk.content_type.value, - chunk.value.value, - len(clean_content), - "realtime_watcher", - created_at, - conversation_id, - chunk.metadata.get("sender"), - tags, - ), - ) - if store.conn.changes() > 0: + if arbitrated: + enqueue_watcher_chunk( + chunk_id=chunk_id, + content=clean_content, + metadata=chunk.metadata, + source_file=source_file, + project=project, + content_type=chunk.content_type.value, + value_type=chunk.value.value, + created_at=created_at, + conversation_id=conversation_id, + sender=chunk.metadata.get("sender"), + tags=json.loads(tags) if tags else None, + ) inserted += 1 else: - skipped += 1 + assert cursor is not None and store is not None + cursor.execute( + """INSERT OR IGNORE INTO chunks + (id, content, metadata, source_file, project, + content_type, value_type, char_count, source, + created_at, conversation_id, sender, tags) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + chunk_id, + clean_content, + json.dumps(chunk.metadata), + source_file, + project, + chunk.content_type.value, + chunk.value.value, + len(clean_content), + "realtime_watcher", + created_at, + conversation_id, + chunk.metadata.get("sender"), + tags, + ), + ) + if store.conn.changes() > 0: + inserted += 1 + else: + skipped += 1 except Exception as e: - logger.warning("Insert failed for %s: %s", chunk_id, e) + logger.warning("Queue/write failed for %s: %s", chunk_id, e) skipped += 1 latency_ms = (_time.monotonic() - flush_start) * 1000 diff --git a/tests/test_arbitration.py b/tests/test_arbitration.py new file mode 100644 index 00000000..8bd4f7b1 --- /dev/null +++ b/tests/test_arbitration.py @@ -0,0 +1,418 @@ +import json +import multiprocessing as mp +import re +import sqlite3 +import time +from pathlib import Path + +import apsw +import sqlite_vec + + +def _producer(queue_dir: str, worker_id: int, count: int) -> None: + from brainlayer.queue_io import enqueue_store + + for index in range(count): + enqueue_store( + content=f"arbitration worker={worker_id} item={index}", + memory_type="note", + project="arbitration-test", + tags=["arbitration", f"worker-{worker_id}"], + importance=7, + queue_dir=Path(queue_dir), + source="test", + ) + + +def _create_minimal_db(path: Path) -> None: + conn = sqlite3.connect(path) + try: + conn.executescript( + """ + PRAGMA journal_mode=WAL; + CREATE TABLE chunks ( + id TEXT PRIMARY KEY, + content TEXT NOT NULL, + metadata TEXT NOT NULL, + source_file TEXT NOT NULL, + project TEXT, + content_type TEXT, + value_type TEXT, + char_count INTEGER, + source TEXT, + created_at TEXT, + enriched_at TEXT, + summary TEXT, + tags TEXT, + importance REAL, + superseded_by TEXT + ); + CREATE TABLE kg_entities ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL + ); + CREATE TABLE kg_entity_chunks ( + entity_id TEXT NOT NULL, + chunk_id TEXT NOT NULL, + relevance REAL DEFAULT 1.0, + context TEXT, + PRIMARY KEY (entity_id, chunk_id) + ); + CREATE TABLE chunk_vectors ( + chunk_id TEXT PRIMARY KEY, + embedding BLOB + ); + CREATE TABLE chunk_vectors_binary ( + chunk_id TEXT PRIMARY KEY, + embedding BLOB + ); + CREATE VIRTUAL TABLE chunks_fts USING fts5( + content, summary, tags, resolved_query, key_facts, resolved_queries, chunk_id UNINDEXED + ); + CREATE TRIGGER chunks_fts_insert AFTER INSERT ON chunks BEGIN + INSERT INTO chunks_fts(content, summary, tags, chunk_id) + VALUES (new.content, new.summary, new.tags, new.id); + END; + """ + ) + conn.commit() + finally: + conn.close() + + +def _connect_apsw(path: Path) -> apsw.Connection: + conn = apsw.Connection(str(path)) + conn.enableloadextension(True) + conn.loadextension(sqlite_vec.loadable_path()) + conn.enableloadextension(False) + return conn + + +def _create_vec_db(path: Path) -> None: + conn = _connect_apsw(path) + try: + conn.execute(""" + CREATE TABLE chunks ( + id TEXT PRIMARY KEY, + content TEXT NOT NULL, + metadata TEXT NOT NULL, + source_file TEXT NOT NULL, + project TEXT, + content_type TEXT, + value_type TEXT, + char_count INTEGER, + source TEXT, + created_at TEXT, + enriched_at TEXT, + summary TEXT, + tags TEXT, + importance REAL + ) + """) + conn.execute(""" + CREATE VIRTUAL TABLE chunk_vectors USING vec0( + chunk_id TEXT PRIMARY KEY, + embedding FLOAT[1024] + ) + """) + conn.execute(""" + CREATE VIRTUAL TABLE chunk_vectors_binary USING vec0( + chunk_id TEXT PRIMARY KEY, + embedding BIT[1024] + ) + """) + finally: + conn.close() + + +def test_drain_default_queue_dir_expands_env_tilde(monkeypatch): + from brainlayer.drain import _default_queue_dir + + monkeypatch.setenv("BRAINLAYER_QUEUE_DIR", "~/brainlayer-arbitration-test") + + queue_dir = _default_queue_dir() + + assert "~" not in str(queue_dir) + assert queue_dir == Path.home() / "brainlayer-arbitration-test" + + +def test_drain_daemon_serializes_three_concurrent_producers(tmp_path, monkeypatch): + from brainlayer.drain import drain_once + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + log_path = tmp_path / "drain.log" + _create_minimal_db(db_path) + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + + workers = [mp.Process(target=_producer, args=(str(queue_dir), worker, 1000)) for worker in range(3)] + for worker in workers: + worker.start() + for worker in workers: + worker.join(timeout=20) + assert worker.exitcode == 0 + + deadline = time.monotonic() + 15 + total_drained = 0 + while time.monotonic() < deadline: + total_drained += drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=250, log_path=log_path) + with _connect_apsw(db_path) as conn: + count = conn.execute("SELECT COUNT(*) FROM chunks WHERE project = 'arbitration-test'").fetchone()[0] + fts_count = conn.execute("SELECT COUNT(*) FROM chunks_fts WHERE chunks_fts MATCH 'arbitration'").fetchone()[ + 0 + ] + if count == 3000 and fts_count == 3000: + break + time.sleep(0.05) + + assert total_drained == 3000 + assert count == 3000 + assert fts_count == 3000 + assert not list(queue_dir.glob("*.jsonl")) + assert "database is locked" not in log_path.read_text(encoding="utf-8").lower() + + +def test_queue_sanitizes_source_and_drain_preserves_supersedes(tmp_path): + from brainlayer.drain import drain_once + from brainlayer.queue_io import enqueue_store + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + _create_minimal_db(db_path) + with sqlite3.connect(db_path) as conn: + conn.execute( + """ + INSERT INTO chunks (id, content, metadata, source_file) + VALUES ('old-id', 'old content', '{}', 'seed') + """ + ) + conn.execute("INSERT INTO kg_entities (id, name) VALUES ('person-1', 'Person One')") + conn.commit() + + queued_path = enqueue_store( + content="replacement content", + project="arbitration-test", + source="../unsafe/source", + supersedes="old-id", + entity_id="person-1", + queue_dir=queue_dir, + ) + + assert queued_path.parent == queue_dir + assert ".." not in queued_path.name + assert "/" not in queued_path.name + assert re.fullmatch(r"[A-Za-z0-9_.-]+", queued_path.name) + assert drain_once(db_path=db_path, queue_dir=queue_dir, embed_fn=lambda text: [0.1] * 1024) == 1 + + with _connect_apsw(db_path) as conn: + replacement_id = conn.execute("SELECT id FROM chunks WHERE content = 'replacement content'").fetchone()[0] + superseded_by = conn.execute("SELECT superseded_by FROM chunks WHERE id = 'old-id'").fetchone()[0] + entity_link = conn.execute("SELECT chunk_id FROM kg_entity_chunks WHERE entity_id = 'person-1'").fetchone()[0] + + assert superseded_by == replacement_id + assert entity_link == replacement_id + + +def test_drain_embeds_every_queued_store(tmp_path): + from brainlayer.drain import drain_once + from brainlayer.queue_io import enqueue_store + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + _create_minimal_db(db_path) + + for index in range(3): + enqueue_store( + content=f"queued semantic memory {index}", + project="arbitration-test", + source="mcp", + queue_dir=queue_dir, + ) + + assert drain_once(db_path=db_path, queue_dir=queue_dir, embed_fn=lambda text: [0.1] * 1024) == 3 + + deadline = time.monotonic() + 2 + vector_count = 0 + binary_count = 0 + while time.monotonic() < deadline: + with _connect_apsw(db_path) as conn: + vector_count = conn.execute("SELECT COUNT(*) FROM chunk_vectors").fetchone()[0] + binary_count = conn.execute("SELECT COUNT(*) FROM chunk_vectors_binary").fetchone()[0] + if vector_count == 3 and binary_count == 3: + break + time.sleep(0.05) + + assert vector_count == 3 + assert binary_count == 3 + + +def test_drain_ignores_non_object_store_metadata(tmp_path): + from brainlayer.drain import drain_once + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + queue_dir.mkdir() + _create_minimal_db(db_path) + (queue_dir / "store.jsonl").write_text( + json.dumps( + { + "kind": "store_memory", + "chunk_id": "bad-meta", + "content": "queued memory with bad metadata", + "memory_type": "note", + "metadata": "not-a-dict", + } + ) + + "\n", + encoding="utf-8", + ) + + assert drain_once(db_path=db_path, queue_dir=queue_dir, embed_fn=lambda text: [0.1] * 1024) == 1 + + with _connect_apsw(db_path) as conn: + row = conn.execute("SELECT metadata FROM chunks WHERE id = 'bad-meta'").fetchone() + + assert json.loads(row[0]) == {"memory_type": "note"} + + +def test_drain_loads_sqlite_vec_for_vec0_tables(tmp_path): + from brainlayer.drain import drain_once + from brainlayer.queue_io import enqueue_store + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + _create_vec_db(db_path) + + enqueue_store( + content="queued memory requiring sqlite vec", + project="arbitration-test", + source="mcp", + queue_dir=queue_dir, + ) + + assert drain_once(db_path=db_path, queue_dir=queue_dir, embed_fn=lambda text: [0.1] * 1024) == 1 + + with _connect_apsw(db_path) as conn: + assert conn.execute("SELECT COUNT(*) FROM chunk_vectors").fetchone()[0] == 1 + assert conn.execute("SELECT COUNT(*) FROM chunk_vectors_binary").fetchone()[0] == 1 + + +def test_poison_queue_file_does_not_rollback_good_file(tmp_path): + from brainlayer.drain import drain_once + from brainlayer.queue_io import enqueue_store + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + log_path = tmp_path / "drain.log" + _create_minimal_db(db_path) + + good_path = enqueue_store( + content="good memory survives poison", + project="arbitration-test", + source="mcp", + queue_dir=queue_dir, + ) + poison_path = queue_dir / "poison.jsonl" + poison_path.write_bytes(b"\xff\xff") + + assert drain_once(db_path=db_path, queue_dir=queue_dir, log_path=log_path, embed_fn=lambda text: [0.1] * 1024) == 1 + + with _connect_apsw(db_path) as conn: + stored = conn.execute("SELECT COUNT(*) FROM chunks WHERE content = 'good memory survives poison'").fetchone()[0] + + assert stored == 1 + assert not good_path.exists() + assert not poison_path.exists() + assert list(queue_dir.glob("poison.jsonl.bad*")) + assert "poison" in log_path.read_text(encoding="utf-8").lower() + + +def test_chunk_id_collision_is_logged_and_dropped(tmp_path): + from brainlayer.drain import drain_once + from brainlayer.queue_io import enqueue_store + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + log_path = tmp_path / "drain.log" + _create_minimal_db(db_path) + with _connect_apsw(db_path) as conn: + conn.execute( + """ + INSERT INTO chunks (id, content, metadata, source_file) + VALUES ('coll', 'ORIGINAL', '{}', 'seed') + """ + ) + + enqueue_store( + content="QUEUED REPLACEMENT", + project="arbitration-test", + source="mcp", + queue_dir=queue_dir, + chunk_id="coll", + ) + + assert drain_once(db_path=db_path, queue_dir=queue_dir, log_path=log_path, embed_fn=lambda text: [0.1] * 1024) == 1 + + with _connect_apsw(db_path) as conn: + content = conn.execute("SELECT content FROM chunks WHERE id = 'coll'").fetchone()[0] + count = conn.execute("SELECT COUNT(*) FROM chunks WHERE id = 'coll'").fetchone()[0] + + log_text = log_path.read_text(encoding="utf-8").lower() + assert content == "ORIGINAL" + assert count == 1 + assert not list(queue_dir.glob("*.jsonl")) + assert "collided" in log_text + assert "collisions_dropped=1" in log_text + + +def test_drain_lock_does_not_depend_on_deletable_lock_file(tmp_path, monkeypatch): + from brainlayer.drain import drain_once + from brainlayer.queue_io import enqueue_store + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + _create_minimal_db(db_path) + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + + enqueue_store( + content="lock sentinel memory", + project="arbitration-test", + source="mcp", + queue_dir=queue_dir, + ) + + stale_lock = queue_dir / ".drain.lock" + stale_lock.write_text("stale", encoding="utf-8") + stale_lock.unlink() + + assert drain_once(db_path=db_path, queue_dir=queue_dir) == 1 + assert not stale_lock.exists() + + with _connect_apsw(db_path) as conn: + count = conn.execute("SELECT COUNT(*) FROM chunks WHERE content = 'lock sentinel memory'").fetchone()[0] + assert count == 1 + + +def test_flush_migrates_legacy_pending_stores_idempotently(tmp_path, monkeypatch): + from brainlayer.cli import flush + from brainlayer.paths import get_db_path + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + _create_minimal_db(db_path) + pending_path = db_path.parent / "pending-stores.jsonl" + pending_path.write_text('{"content":"legacy pending memory","memory_type":"note","project":"arbitration-test"}\n') + + monkeypatch.setenv("BRAINLAYER_DB", str(db_path)) + monkeypatch.setenv("BRAINLAYER_QUEUE_DIR", str(queue_dir)) + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + assert get_db_path() == db_path + + flush() + pending_path.write_text('{"content":"legacy pending memory","memory_type":"note","project":"arbitration-test"}\n') + flush() + + with _connect_apsw(db_path) as conn: + rows = conn.execute("SELECT id FROM chunks WHERE content = 'legacy pending memory'").fetchall() + + assert len(rows) == 1 diff --git a/tests/test_search_trigram_fts.py b/tests/test_search_trigram_fts.py index 9275e3c4..e9251668 100644 --- a/tests/test_search_trigram_fts.py +++ b/tests/test_search_trigram_fts.py @@ -61,7 +61,7 @@ def test_hybrid_search_uses_trigram_fts_for_identifier_substrings(tmp_path): store.close() -def test_vector_store_repairs_partial_trigram_backfill_on_startup(tmp_path): +def test_vector_store_repairs_partial_trigram_backfill_only_on_explicit_repair(tmp_path): db_path = tmp_path / "trigram-repair.db" store = VectorStore(db_path) try: @@ -71,10 +71,15 @@ def test_vector_store_repairs_partial_trigram_backfill_on_startup(tmp_path): finally: store.close() - repaired = VectorStore(db_path) + opened = VectorStore(db_path) try: - trigram_count = repaired.conn.cursor().execute("SELECT COUNT(*) FROM chunks_fts_trigram").fetchone()[0] - chunk_count = repaired.conn.cursor().execute("SELECT COUNT(*) FROM chunks").fetchone()[0] - assert trigram_count == chunk_count == 2 + trigram_count = opened.conn.cursor().execute("SELECT COUNT(*) FROM chunks_fts_trigram").fetchone()[0] + chunk_count = opened.conn.cursor().execute("SELECT COUNT(*) FROM chunks").fetchone()[0] + assert trigram_count == 1 + assert chunk_count == 2 + + repaired = opened.repair_fts() + trigram_count = opened.conn.cursor().execute("SELECT COUNT(*) FROM chunks_fts_trigram").fetchone()[0] + assert repaired["chunks_fts_trigram"] == trigram_count == chunk_count == 2 finally: - repaired.close() + opened.close() diff --git a/tests/test_write_queue.py b/tests/test_write_queue.py index 18ad3621..290c45c0 100644 --- a/tests/test_write_queue.py +++ b/tests/test_write_queue.py @@ -27,53 +27,37 @@ class TestQueueStore: """JSONL queue for buffering writes when DB is locked.""" def test_queue_store_writes_jsonl(self, tmp_path): - """_queue_store writes a JSONL line to the pending file.""" - with patch( - "brainlayer.mcp.store_handler._get_pending_store_path", - return_value=tmp_path / "pending-stores.jsonl", - ): + """_queue_store writes to the unified arbitration queue.""" + with patch("brainlayer.queue_io.get_queue_dir", return_value=tmp_path): _queue_store({"content": "test memory", "memory_type": "note"}) - path = tmp_path / "pending-stores.jsonl" - assert path.exists() - lines = path.read_text().strip().splitlines() + files = list(tmp_path.glob("mcp-*.jsonl")) + assert len(files) == 1 + lines = files[0].read_text().strip().splitlines() assert len(lines) == 1 item = json.loads(lines[0]) + assert item["kind"] == "store_memory" assert item["content"] == "test memory" assert item["memory_type"] == "note" def test_queue_store_appends(self, tmp_path): - """Multiple queue calls append, not overwrite.""" - with patch( - "brainlayer.mcp.store_handler._get_pending_store_path", - return_value=tmp_path / "pending-stores.jsonl", - ): + """Multiple queue calls create independent durable files.""" + with patch("brainlayer.queue_io.get_queue_dir", return_value=tmp_path): _queue_store({"content": "first", "memory_type": "note"}) _queue_store({"content": "second", "memory_type": "learning"}) - lines = (tmp_path / "pending-stores.jsonl").read_text().strip().splitlines() - assert len(lines) == 2 + files = sorted(tmp_path.glob("mcp-*.jsonl")) + assert len(files) == 2 + contents = {json.loads(path.read_text())["content"] for path in files} + assert contents == {"first", "second"} - def test_queue_max_size_drops_oldest(self, tmp_path): - """Queue respects max size limit and drops oldest items.""" - pending_path = tmp_path / "pending-stores.jsonl" - with patch( - "brainlayer.mcp.store_handler._get_pending_store_path", - return_value=pending_path, - ): - # Queue 105 items (over the 100 limit) + def test_queue_store_keeps_burst_items(self, tmp_path): + """Unified queue keeps burst items as separate files.""" + with patch("brainlayer.queue_io.get_queue_dir", return_value=tmp_path): for i in range(105): _queue_store({"content": f"item-{i}", "memory_type": "note"}) - lines = pending_path.read_text().strip().splitlines() - # Should be exactly 100, with oldest 5 dropped - assert len(lines) == 100 - # First retained item should be item-5 (oldest 5 dropped) - first_item = json.loads(lines[0]) - assert first_item["content"] == "item-5" - # Last item should be the newest - last_item = json.loads(lines[-1]) - assert last_item["content"] == "item-104" + assert len(list(tmp_path.glob("mcp-*.jsonl"))) == 105 class TestSingleWriterQueue: @@ -230,7 +214,7 @@ async def test_store_queues_on_busy_error(self, tmp_path): """When store_memory raises BusyError, the item is queued to JSONL.""" from brainlayer.mcp.store_handler import _store - pending_path = tmp_path / "pending-stores.jsonl" + queue_dir = tmp_path / "queue" with ( patch("brainlayer.mcp.store_handler._get_vector_store") as mock_vs, @@ -238,10 +222,7 @@ async def test_store_queues_on_busy_error(self, tmp_path): patch("brainlayer.mcp.store_handler._normalize_project_name", return_value="test"), # store_memory is imported inside _store via `from ..store import store_memory` patch("brainlayer.store.store_memory", side_effect=apsw.BusyError("locked")), - patch( - "brainlayer.mcp.store_handler._get_pending_store_path", - return_value=pending_path, - ), + patch("brainlayer.queue_io.get_queue_dir", return_value=queue_dir), ): mock_em.return_value.embed_query.return_value = [0.1] * 1024 @@ -256,11 +237,42 @@ async def test_store_queues_on_busy_error(self, tmp_path): assert structured["chunk_id"] == "queued" assert any("queued" in t.text.lower() for t in texts) - # Should have written to JSONL - assert pending_path.exists() - item = json.loads(pending_path.read_text().strip()) + # Should have written to the unified queue + files = list(queue_dir.glob("mcp-*.jsonl")) + assert len(files) == 1 + item = json.loads(files[0].read_text().strip()) + assert item["kind"] == "store_memory" assert item["content"] == "test memory" + @pytest.mark.asyncio + async def test_arbitrated_store_validates_before_queueing(self, tmp_path, monkeypatch): + """Arbitrated store should not report queued success for invalid content.""" + from brainlayer.mcp.store_handler import _store + + monkeypatch.setenv("BRAINLAYER_ARBITRATED", "1") + with patch("brainlayer.queue_io.get_queue_dir", return_value=tmp_path): + result = await _store(content=" ", memory_type="note", project="test") + + assert result.isError is True + assert "content must be non-empty" in result.content[0].text + assert not list(tmp_path.glob("mcp-*.jsonl")) + + @pytest.mark.asyncio + async def test_arbitrated_store_clears_search_cache(self, tmp_path, monkeypatch): + """Queued stores invalidate local search cache even before the drain writes.""" + from brainlayer.mcp.store_handler import _store + + monkeypatch.setenv("BRAINLAYER_ARBITRATED", "1") + with ( + patch("brainlayer.queue_io.get_queue_dir", return_value=tmp_path), + patch("brainlayer.search_repo.clear_hybrid_search_cache") as clear_cache, + ): + texts, structured = await _store(content="queued cache invalidation", memory_type="note", project="test") + + assert structured["chunk_id"] == "queued" + assert any("queued" in item.text.lower() for item in texts) + clear_cache.assert_called_once_with() + class TestBrainUpdateRetryOnLock: """brain_update should retry on BusyError before failing."""