diff --git a/README.md b/README.md
index be8faa9e..286dda3c 100644
--- a/README.md
+++ b/README.md
@@ -146,6 +146,10 @@ bash brain-bar/build-app.sh # Build, sign, install LaunchAgent
Requires the BrainLayer MCP server. The build script refuses non-canonical checkouts and dirty trees by default ([#265](https://github.com/EtanHey/brainlayer/pull/265)) and stamps each bundle with `GitCommit`, `GitDescribe`, and `BuildTimeUTC` in `Info.plist` ([#264](https://github.com/EtanHey/brainlayer/pull/264)) so a stale install is diagnosable in seconds.
+## Writer Arbitration
+
+Background producers run with `BRAINLAYER_ARBITRATED=1` and append writes to `~/.brainlayer/queue/`; `com.brainlayer.drain.plist` drains that queue every 500ms as the single writer. Trigram FTS maintenance is explicit via `brainlayer repair-fts` and the weekly `com.brainlayer.repair-fts.plist`, not synchronous startup work. See [docs/arbitration.md](docs/arbitration.md).
+
## Recent Hardening (2026-04-15 → 2026-05-02)
Two-week stability sprint behind the next presentation. Every line below traces to a merged PR.
diff --git a/docs/arbitration.md b/docs/arbitration.md
new file mode 100644
index 00000000..c441d9c6
--- /dev/null
+++ b/docs/arbitration.md
@@ -0,0 +1,19 @@
+# BrainLayer Writer Arbitration
+
+BrainLayer uses a single-writer arbitration path for background producers that can otherwise fight over SQLite's write lock.
+
+## Operator Contract
+
+- `BRAINLAYER_ARBITRATED=1` makes producers enqueue writes instead of writing directly to the database. The current launchd templates set this for watch and enrichment.
+- `BRAINLAYER_DRAIN_EMBED=0` disables post-drain embedding, mainly for tests or emergency operator debugging. Production should leave it enabled so queued `brain_store` chunks reach semantic search.
+- The unified durable queue lives at `~/.brainlayer/queue/`. Each event is one JSONL file named by source, timestamp, and UUID.
+- `com.brainlayer.drain.plist` runs `scripts/drain_daemon.py`, which drains the queue every 500ms under `BEGIN IMMEDIATE`.
+- The drain daemon opens SQLite with APSW, loads `sqlite-vec`, writes chunks/enrichment updates, and embeds queued `brain_store` chunks into `chunk_vectors`/`chunk_vectors_binary` before removing their queue files.
+- Legacy `pending-stores.jsonl` is migrated by `brainlayer flush`; migration assigns stable chunk IDs so rerunning after a crash is `INSERT OR IGNORE` safe.
+
+## FTS Repair
+
+- Startup no longer performs large synchronous trigram repairs.
+- Run `brainlayer repair-fts` for an explicit `chunks_fts_trigram` rebuild.
+- `scripts/launchd/com.brainlayer.repair-fts.plist` schedules that repair weekly.
+- Set `BRAINLAYER_REPAIR=1` only for an operator-controlled process that should run the repair during VectorStore initialization.
diff --git a/scripts/drain_daemon.py b/scripts/drain_daemon.py
new file mode 100644
index 00000000..82a1af32
--- /dev/null
+++ b/scripts/drain_daemon.py
@@ -0,0 +1,17 @@
+#!/usr/bin/env python3
+"""Launchd wrapper for BrainLayer's single-writer drain daemon."""
+
+from __future__ import annotations
+
+import sys
+from pathlib import Path
+
+REPO_ROOT = Path(__file__).resolve().parents[1]
+SRC_DIR = REPO_ROOT / "src"
+if str(SRC_DIR) not in sys.path:
+ sys.path.insert(0, str(SRC_DIR))
+
+from brainlayer.drain import drain_once, main, run_daemon # noqa: E402,F401
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/scripts/launchd/com.brainlayer.drain.plist b/scripts/launchd/com.brainlayer.drain.plist
new file mode 100644
index 00000000..d1152d73
--- /dev/null
+++ b/scripts/launchd/com.brainlayer.drain.plist
@@ -0,0 +1,33 @@
+
+
+
+
+ Label
+ com.brainlayer.drain
+
+ ProgramArguments
+
+ __PYTHON_BIN__
+ __REPO_ROOT__/scripts/drain_daemon.py
+ --interval
+ 0.5
+
+
+ StandardOutPath
+ __HOME__/.brainlayer/logs/drain.log
+ StandardErrorPath
+ __HOME__/.brainlayer/logs/drain.err
+
+ KeepAlive
+
+ RunAtLoad
+
+ ThrottleInterval
+ 5
+ Nice
+ 10
+ ProcessType
+ Background
+
+
+
diff --git a/scripts/launchd/com.brainlayer.enrichment.plist b/scripts/launchd/com.brainlayer.enrichment.plist
index f8ac4d34..836e97a9 100644
--- a/scripts/launchd/com.brainlayer.enrichment.plist
+++ b/scripts/launchd/com.brainlayer.enrichment.plist
@@ -36,6 +36,8 @@
__BRAINLAYER_DIR__/src
BRAINLAYER_STALL_TIMEOUT
300
+ BRAINLAYER_ARBITRATED
+ 1
BRAINLAYER_ENRICH_RATE
15
BRAINLAYER_ENRICH_CONCURRENCY
diff --git a/scripts/launchd/com.brainlayer.repair-fts.plist b/scripts/launchd/com.brainlayer.repair-fts.plist
new file mode 100644
index 00000000..10c5057f
--- /dev/null
+++ b/scripts/launchd/com.brainlayer.repair-fts.plist
@@ -0,0 +1,35 @@
+
+
+
+
+ Label
+ com.brainlayer.repair-fts
+
+ ProgramArguments
+
+ __BRAINLAYER_BIN__
+ repair-fts
+
+
+
+ StartCalendarInterval
+
+ Weekday
+ 0
+ Hour
+ 4
+ Minute
+ 30
+
+
+ StandardOutPath
+ __HOME__/.local/share/brainlayer/logs/repair-fts.log
+ StandardErrorPath
+ __HOME__/.local/share/brainlayer/logs/repair-fts.err
+
+ Nice
+ 15
+ ProcessType
+ Background
+
+
diff --git a/scripts/launchd/com.brainlayer.watch.plist b/scripts/launchd/com.brainlayer.watch.plist
new file mode 100644
index 00000000..2fe0c98f
--- /dev/null
+++ b/scripts/launchd/com.brainlayer.watch.plist
@@ -0,0 +1,48 @@
+
+
+
+
+ Label
+ com.brainlayer.watch
+
+ ProgramArguments
+
+ __BRAINLAYER_BIN__
+ watch
+ --poll
+ 1.0
+ --batch-size
+ 10
+ --flush-ms
+ 500
+
+
+ EnvironmentVariables
+
+ PATH
+ /usr/local/bin:/usr/bin:/bin:__HOME__/.local/bin
+ PYTHONUNBUFFERED
+ 1
+ PYTHONPATH
+ __BRAINLAYER_DIR__/src
+ BRAINLAYER_ARBITRATED
+ 1
+
+
+ StandardOutPath
+ __HOME__/.local/share/brainlayer/logs/watch.log
+ StandardErrorPath
+ __HOME__/.local/share/brainlayer/logs/watch.err
+
+ KeepAlive
+
+ RunAtLoad
+
+ ThrottleInterval
+ 5
+ Nice
+ 10
+ ProcessType
+ Background
+
+
diff --git a/scripts/launchd/install.sh b/scripts/launchd/install.sh
index 1ec6ffa4..cc67f889 100755
--- a/scripts/launchd/install.sh
+++ b/scripts/launchd/install.sh
@@ -4,11 +4,14 @@
# Usage:
# ./scripts/launchd/install.sh # Install all
# ./scripts/launchd/install.sh index # Install indexing only
+# ./scripts/launchd/install.sh watch # Install watcher only
# ./scripts/launchd/install.sh enrich # Install enrichment only
+# ./scripts/launchd/install.sh drain # Install queue drain only
# ./scripts/launchd/install.sh decay # Install decay only
# ./scripts/launchd/install.sh load enrichment
# ./scripts/launchd/install.sh unload enrichment
# ./scripts/launchd/install.sh checkpoint # Install WAL checkpoint only
+# ./scripts/launchd/install.sh repair-fts # Install weekly explicit FTS repair
# ./scripts/launchd/install.sh backup # Install daily DB backup only
# ./scripts/launchd/install.sh remove # Unload and remove all
set -euo pipefail
@@ -20,8 +23,15 @@ BRAINLAYER_LOG_DIR="$HOME/.local/share/brainlayer/logs"
BRAINLAYER_LIB_DIR="$HOME/.local/lib/brainlayer"
BRAINLAYER_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)"
BRAINLAYER_BIN="${BRAINLAYER_BIN:-$(which brainlayer 2>/dev/null || echo "$HOME/.local/bin/brainlayer")}"
+PYTHON_BIN="${PYTHON_BIN:-$(command -v python3)}"
GOOGLE_API_KEY="${GOOGLE_API_KEY:-}"
+if [ -z "$PYTHON_BIN" ]; then
+ echo "ERROR: python3 not found in PATH"
+ echo "Install Python 3 or set PYTHON_BIN=/path/to/python3"
+ exit 1
+fi
+
if [ ! -x "$BRAINLAYER_BIN" ]; then
echo "ERROR: brainlayer binary not found at $BRAINLAYER_BIN"
echo "Install with: pip install -e . (from brainlayer repo)"
@@ -30,6 +40,7 @@ if [ ! -x "$BRAINLAYER_BIN" ]; then
fi
mkdir -p "$LAUNCH_DIR" "$LOG_DIR" "$BRAINLAYER_LOG_DIR" "$BRAINLAYER_LIB_DIR"
+mkdir -p "$HOME/.brainlayer/logs" "$HOME/.brainlayer/queue"
resolve_google_api_key() {
if [ -n "${GOOGLE_API_KEY:-}" ]; then
@@ -88,6 +99,8 @@ install_plist() {
-e "s|__HOME__|$HOME|g" \
-e "s|__BRAINLAYER_BIN__|$BRAINLAYER_BIN|g" \
-e "s|__BRAINLAYER_DIR__|$BRAINLAYER_DIR|g" \
+ -e "s|__PYTHON_BIN__|$PYTHON_BIN|g" \
+ -e "s|__REPO_ROOT__|$BRAINLAYER_DIR|g" \
-e "s|__GOOGLE_API_KEY__|$google_api_key|g" \
"$src" > "$dst"
@@ -135,6 +148,9 @@ case "${1:-all}" in
enrichment)
install_plist enrichment
;;
+ watch)
+ install_plist watch
+ ;;
decay)
install_plist decay
;;
@@ -147,15 +163,24 @@ case "${1:-all}" in
checkpoint)
install_plist wal-checkpoint
;;
+ drain)
+ install_plist drain
+ ;;
+ repair-fts)
+ install_plist repair-fts
+ ;;
backup)
install_backup_script
install_plist backup-daily
;;
all)
install_plist index
+ install_plist drain
+ install_plist watch
install_plist enrichment
install_plist decay
install_plist wal-checkpoint
+ install_plist repair-fts
install_backup_script
install_plist backup-daily
# Remove old enrich plist if present
@@ -165,13 +190,16 @@ case "${1:-all}" in
remove_plist index
remove_plist enrich 2>/dev/null || true
remove_plist enrichment 2>/dev/null || true
+ remove_plist watch 2>/dev/null || true
remove_plist decay 2>/dev/null || true
+ remove_plist drain 2>/dev/null || true
remove_plist wal-checkpoint
+ remove_plist repair-fts 2>/dev/null || true
remove_plist backup-daily 2>/dev/null || true
rm -f "$BRAINLAYER_LIB_DIR/backup-daily.sh"
;;
*)
- echo "Usage: $0 [index|enrich|enrichment|decay|load [name]|unload [name]|checkpoint|backup|all|remove]"
+ echo "Usage: $0 [index|watch|enrich|enrichment|decay|drain|repair-fts|load [name]|unload [name]|checkpoint|backup|all|remove]"
exit 1
;;
esac
diff --git a/src/brainlayer/cli/__init__.py b/src/brainlayer/cli/__init__.py
index d612c8fa..e4a643ff 100644
--- a/src/brainlayer/cli/__init__.py
+++ b/src/brainlayer/cli/__init__.py
@@ -1,5 +1,7 @@
"""BrainLayer CLI - Command line interface for the knowledge pipeline."""
+import hashlib
+import json
import re as _re
import sys
import time
@@ -1053,6 +1055,20 @@ def wal_checkpoint(
raise typer.Exit(1)
+@app.command("repair-fts")
+def repair_fts() -> None:
+ """Explicitly rebuild FTS maintenance tables outside normal startup."""
+ from ..paths import get_db_path
+ from ..vector_store import VectorStore
+
+ store = VectorStore(get_db_path())
+ try:
+ result = store.repair_fts(rebuild_trigram=True)
+ finally:
+ store.close()
+ console.print_json(data=result)
+
+
@app.command("enrich-sessions")
def enrich_sessions(
project: str = typer.Option(None, "--project", "-p", help="Only enrich sessions from this project"),
@@ -2077,36 +2093,82 @@ def hooks(
@app.command("flush")
def flush() -> None:
- """Flush pending-stores.jsonl (queued items from DB lock errors)."""
- from ..paths import DEFAULT_DB_PATH
-
- queue_path = DEFAULT_DB_PATH.parent / "pending-stores.jsonl"
- if not queue_path.exists():
- rprint("[dim]No pending stores to flush.[/]")
- return
-
- lines = queue_path.read_text().strip().splitlines()
- if not lines:
- rprint("[dim]Queue is empty.[/]")
- queue_path.unlink(missing_ok=True)
- return
-
- rprint(f"[bold]Flushing {len(lines)} queued store(s)...[/]")
-
- from ..embeddings import get_embedding_model
- from ..mcp import _flush_pending_stores
- from ..vector_store import VectorStore
-
- store = VectorStore(DEFAULT_DB_PATH)
- model = get_embedding_model()
+ """Flush pending store queues and the unified arbitration queue."""
+ from ..drain import drain_once
+ from ..paths import get_db_path
+ from ..queue_io import enqueue_store, get_queue_dir
- flushed = _flush_pending_stores(store, model.embed_query)
- store.close()
+ db_path = get_db_path()
+ queue_path = db_path.parent / "pending-stores.jsonl"
+ migrated = 0
+ skipped = 0
+ try:
+ if queue_path.exists():
+ lines = queue_path.read_text().strip().splitlines()
+ for line in lines:
+ if not line.strip():
+ continue
+ try:
+ item = json.loads(line)
+ except json.JSONDecodeError:
+ skipped += 1
+ continue
+ content = item.get("content")
+ if not content:
+ skipped += 1
+ continue
+ stable_payload = {
+ "content": content,
+ "memory_type": item.get("memory_type", "note"),
+ "project": item.get("project"),
+ "tags": item.get("tags"),
+ "importance": item.get("importance"),
+ }
+ chunk_id = (
+ item.get("chunk_id")
+ or "pending-"
+ + hashlib.sha256(
+ json.dumps(stable_payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
+ ).hexdigest()[:16]
+ )
+ enqueue_store(
+ content=content,
+ memory_type=item.get("memory_type", "note"),
+ project=item.get("project"),
+ tags=item.get("tags"),
+ importance=item.get("importance"),
+ source="pending",
+ confidence_score=item.get("confidence_score"),
+ outcome=item.get("outcome"),
+ reversibility=item.get("reversibility"),
+ files_changed=item.get("files_changed"),
+ entity_id=item.get("entity_id"),
+ status=item.get("status"),
+ severity=item.get("severity"),
+ file_path=item.get("file_path"),
+ function_name=item.get("function_name"),
+ line_number=item.get("line_number"),
+ supersedes=item.get("supersedes"),
+ chunk_id=chunk_id,
+ )
+ migrated += 1
+ queue_path.unlink(missing_ok=True)
+
+ queue_dir = get_queue_dir()
+ drained = 0
+ while True:
+ count = drain_once(db_path=db_path, queue_dir=queue_dir)
+ if count == 0:
+ break
+ drained += count
+ except Exception as exc:
+ rprint(f"[bold red]Error flushing queues:[/] {exc}")
+ raise typer.Exit(1)
- rprint(f"[green]Flushed {flushed} item(s).[/]")
- remaining = len(lines) - flushed
- if remaining > 0:
- rprint(f"[yellow]{remaining} item(s) still pending (DB may still be locked).[/]")
+ rprint(
+ f"[green]Migrated {migrated} pending store(s); drained {drained} unified queue item(s); "
+ f"skipped {skipped} malformed item(s).[/]"
+ )
@app.command("code-intel")
diff --git a/src/brainlayer/drain.py b/src/brainlayer/drain.py
new file mode 100644
index 00000000..561a7b00
--- /dev/null
+++ b/src/brainlayer/drain.py
@@ -0,0 +1,470 @@
+"""Single-writer drain loop for BrainLayer's durable JSONL queue."""
+
+from __future__ import annotations
+
+import argparse
+import fcntl
+import hashlib
+import json
+import logging
+import os
+import time
+import uuid
+from dataclasses import dataclass
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Any, Callable
+
+import apsw
+import sqlite_vec
+
+from ._helpers import serialize_f32
+from .paths import get_db_path
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class ApplyResult:
+ chunk_id: str | None = None
+ collision_chunk_id: str | None = None
+
+
+def _default_db_path() -> Path:
+ return get_db_path()
+
+
+def _default_queue_dir() -> Path:
+ from .queue_io import get_queue_dir
+
+ return get_queue_dir()
+
+
+def _default_log_path() -> Path:
+ return Path.home() / ".brainlayer" / "logs" / "drain.log"
+
+
+def _log(path: Path, message: str) -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ stamp = datetime.now(timezone.utc).isoformat()
+ with path.open("a", encoding="utf-8") as handle:
+ handle.write(f"{stamp} {message}\n")
+
+
+def _open_connection(db_path: Path) -> apsw.Connection:
+ conn = apsw.Connection(str(db_path))
+ conn.setbusytimeout(200)
+ conn.enableloadextension(True)
+ conn.loadextension(sqlite_vec.loadable_path())
+ conn.enableloadextension(False)
+ return conn
+
+
+def _acquire_queue_lock(queue_dir: Path) -> int:
+ fd = os.open(queue_dir, os.O_RDONLY)
+ try:
+ fcntl.flock(fd, fcntl.LOCK_EX)
+ except Exception:
+ os.close(fd)
+ raise
+ return fd
+
+
+def _columns(conn: apsw.Connection, table: str) -> set[str]:
+ return {row[1] for row in conn.execute(f"PRAGMA table_info({table})")}
+
+
+def _insert_chunk(conn: apsw.Connection, values: dict[str, Any]) -> None:
+ cols = _columns(conn, "chunks")
+ row = {key: value for key, value in values.items() if key in cols}
+ if "id" not in row and "chunk_id" in cols:
+ row["chunk_id"] = values["id"]
+ if "chunk_id" not in row and "id" in cols:
+ row["id"] = values["id"]
+ names = list(row)
+ placeholders = ", ".join("?" for _ in names)
+ sql = f"INSERT OR IGNORE INTO chunks ({', '.join(names)}) VALUES ({placeholders})"
+ conn.execute(sql, [row[name] for name in names])
+
+
+def _event_payload(event: dict[str, Any]) -> dict[str, Any]:
+ if "kind" in event:
+ return event
+ if "content" in event and "memory_type" in event:
+ return {"kind": "store_memory", **event}
+ if "session_id" in event and "content" in event:
+ return {"kind": "hook_chunk", **event}
+ return {"kind": "unknown", **event}
+
+
+def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult:
+ raw_content = event.get("content")
+ if raw_content is None:
+ logger.warning("Skipping malformed store event without content")
+ return ApplyResult()
+ content = str(raw_content).strip()
+ if not content:
+ logger.warning("Skipping malformed store event with empty content")
+ return ApplyResult()
+ now = datetime.now(timezone.utc).isoformat()
+ metadata = {"memory_type": event.get("memory_type", "note")}
+ raw_metadata = event.get("metadata")
+ if isinstance(raw_metadata, dict):
+ metadata.update(raw_metadata)
+ elif raw_metadata:
+ logger.warning("Skipping non-object store metadata for chunk_id=%s", event.get("chunk_id"))
+ chunk_id = event.get("chunk_id") or f"manual-{uuid.uuid4().hex[:16]}"
+ existing = conn.execute("SELECT content FROM chunks WHERE id = ?", (chunk_id,)).fetchone()
+ if existing:
+ if str(existing[0]).strip() == content:
+ return ApplyResult(chunk_id=chunk_id)
+ return ApplyResult(collision_chunk_id=chunk_id)
+ tags = event.get("tags")
+ _insert_chunk(
+ conn,
+ {
+ "id": chunk_id,
+ "content": content,
+ "metadata": json.dumps(metadata),
+ "source_file": "brainlayer-queue",
+ "project": event.get("project"),
+ "content_type": event.get("memory_type", "note"),
+ "value_type": "HIGH",
+ "char_count": len(content),
+ "source": event.get("source") or "manual",
+ "created_at": now,
+ "enriched_at": now,
+ "summary": content[:200],
+ "tags": json.dumps(tags) if tags else None,
+ "importance": float(event["importance"]) if event.get("importance") is not None else None,
+ },
+ )
+ supersedes = event.get("supersedes") or metadata.get("supersedes")
+ cols = _columns(conn, "chunks")
+ if supersedes and "superseded_by" in cols:
+ conn.execute("UPDATE chunks SET superseded_by = ? WHERE id = ?", (chunk_id, supersedes))
+ for vector_table in ("chunk_vectors", "chunk_vectors_binary"):
+ if conn.execute(
+ "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,)
+ ).fetchone():
+ conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (supersedes,))
+ entity_id = event.get("entity_id") or metadata.get("entity_id")
+ if entity_id and {"kg_entities", "kg_entity_chunks"}.issubset(_table_names(conn)):
+ if conn.execute("SELECT id FROM kg_entities WHERE id = ?", (entity_id,)).fetchone():
+ conn.execute(
+ """
+ INSERT OR REPLACE INTO kg_entity_chunks(entity_id, chunk_id, relevance, context)
+ VALUES (?, ?, ?, ?)
+ """,
+ (entity_id, chunk_id, 1.0, f"Stored via brain_store: {event.get('memory_type', 'note')}"),
+ )
+ else:
+ logger.warning("Skipping entity link for unknown entity_id=%s chunk_id=%s", entity_id, chunk_id)
+ return ApplyResult(chunk_id=chunk_id)
+
+
+def _apply_watcher(conn: apsw.Connection, event: dict[str, Any]) -> None:
+ chunk_id = event.get("chunk_id")
+ raw_content = event.get("content")
+ if not chunk_id or raw_content is None:
+ logger.warning("Skipping malformed watcher event without chunk_id/content")
+ return
+ content = str(raw_content).strip()
+ if not content:
+ logger.warning("Skipping malformed watcher event with empty content")
+ return
+ tags = event.get("tags")
+ _insert_chunk(
+ conn,
+ {
+ "id": chunk_id,
+ "content": content,
+ "metadata": json.dumps(event.get("metadata") or {}),
+ "source_file": event.get("source_file") or "realtime-watcher",
+ "project": event.get("project"),
+ "content_type": event.get("content_type") or "assistant_text",
+ "value_type": event.get("value_type") or "HIGH",
+ "char_count": len(content),
+ "source": "realtime_watcher",
+ "created_at": event.get("created_at") or datetime.now(timezone.utc).isoformat(),
+ "conversation_id": event.get("conversation_id"),
+ "sender": event.get("sender"),
+ "tags": json.dumps(tags) if tags else None,
+ },
+ )
+
+
+def _apply_hook(conn: apsw.Connection, event: dict[str, Any]) -> None:
+ raw_content = event.get("content")
+ if raw_content is None:
+ logger.warning("Skipping malformed hook event without content")
+ return
+ content = str(raw_content).strip()
+ if not content:
+ logger.warning("Skipping malformed hook event with empty content")
+ return
+ content_hash = event.get("content_hash") or hashlib.sha256(content.encode()).hexdigest()[:16]
+ session_id = event.get("session_id") or "unknown"
+ chunk_id = event.get("chunk_id") or f"rt-{str(session_id)[:8]}-{content_hash}"
+ ts_raw = event.get("timestamp")
+ try:
+ timestamp = float(ts_raw) if ts_raw is not None else time.time()
+ except (TypeError, ValueError):
+ logger.warning("Invalid hook timestamp %r; using current time", ts_raw)
+ timestamp = time.time()
+ _insert_chunk(
+ conn,
+ {
+ "id": chunk_id,
+ "content": content,
+ "metadata": json.dumps({"session_id": session_id, "content_hash": content_hash}),
+ "source_file": "realtime-hook",
+ "project": event.get("project"),
+ "content_type": "assistant_text",
+ "value_type": "HIGH",
+ "char_count": len(content),
+ "source": "realtime",
+ "created_at": datetime.fromtimestamp(timestamp, timezone.utc).isoformat(),
+ "conversation_id": session_id,
+ "importance": 5,
+ },
+ )
+
+
+def _apply_enrichment(conn: apsw.Connection, event: dict[str, Any]) -> None:
+ chunk_id = event.get("chunk_id")
+ if not chunk_id:
+ logger.warning("Skipping malformed enrichment event without chunk_id")
+ return
+ enrichment = event.get("enrichment") or {}
+ cols = _columns(conn, "chunks")
+ updates: dict[str, Any] = {}
+ mappings = {
+ "summary": "summary",
+ "importance": "importance",
+ "intent": "intent",
+ "epistemic_level": "epistemic_level",
+ "version_scope": "version_scope",
+ "debt_impact": "debt_impact",
+ "sentiment_label": "sentiment_label",
+ "sentiment_score": "sentiment_score",
+ }
+ for key, col in mappings.items():
+ if col in cols and key in enrichment:
+ updates[col] = enrichment[key]
+ for key in ("tags", "primary_symbols", "external_deps", "key_facts", "resolved_queries", "sentiment_signals"):
+ if key in cols and enrichment.get(key) is not None:
+ updates[key] = json.dumps(enrichment[key])
+ if "resolved_query" in cols:
+ resolved_query = enrichment.get("resolved_query")
+ if (
+ not resolved_query
+ and isinstance(enrichment.get("resolved_queries"), list)
+ and enrichment["resolved_queries"]
+ ):
+ resolved_query = enrichment["resolved_queries"][0]
+ if resolved_query:
+ updates["resolved_query"] = resolved_query
+ if "raw_entities_json" in cols and event.get("entities") is not None:
+ updates["raw_entities_json"] = json.dumps(event["entities"])
+ if "content_hash" in cols and event.get("content_hash"):
+ updates["content_hash"] = event["content_hash"]
+ if "enriched_at" in cols:
+ updates["enriched_at"] = datetime.now(timezone.utc).isoformat()
+ if not updates:
+ return
+ assignments = ", ".join(f"{col} = ?" for col in updates)
+ conn.execute(f"UPDATE chunks SET {assignments} WHERE id = ?", [*updates.values(), chunk_id])
+
+
+def _apply_event(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult:
+ event = _event_payload(event)
+ kind = event.get("kind")
+ if kind == "store_memory":
+ return _apply_store(conn, event)
+ elif kind == "watcher_chunk":
+ _apply_watcher(conn, event)
+ elif kind == "hook_chunk":
+ _apply_hook(conn, event)
+ elif kind == "enrichment_update":
+ _apply_enrichment(conn, event)
+ return ApplyResult()
+
+
+def _read_events(path: Path) -> list[dict[str, Any]]:
+ events = []
+ for lineno, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
+ if line.strip():
+ try:
+ event = json.loads(line)
+ except json.JSONDecodeError as exc:
+ logger.warning("Skipping malformed JSON in %s:%d: %s", path, lineno, exc)
+ continue
+ if isinstance(event, dict):
+ events.append(event)
+ else:
+ logger.warning("Skipping non-object JSON in %s:%d", path, lineno)
+ return events
+
+
+def _table_names(conn: apsw.Connection) -> set[str]:
+ return {row[0] for row in conn.execute("SELECT name FROM sqlite_master WHERE type IN ('table', 'virtual')")}
+
+
+def _is_busy_error(exc: BaseException) -> bool:
+ return isinstance(exc, apsw.BusyError)
+
+
+def _default_embed_fn() -> Callable[[str], list[float]]:
+ from .embeddings import get_embedding_model
+
+ return get_embedding_model().embed_query
+
+
+def _embedding_enabled() -> bool:
+ return os.environ.get("BRAINLAYER_DRAIN_EMBED", "1").lower() not in {"0", "false", "no"}
+
+
+def _embed_store_chunks(
+ conn: apsw.Connection,
+ chunk_ids: list[str],
+ embed_fn: Callable[[str], list[float]] | None,
+) -> None:
+ if not chunk_ids or "chunk_vectors" not in _table_names(conn):
+ return
+ resolved_embed_fn = embed_fn or _default_embed_fn()
+ unique_chunk_ids = list(dict.fromkeys(chunk_ids))
+ for chunk_id in unique_chunk_ids:
+ try:
+ row = conn.execute("SELECT content FROM chunks WHERE id = ?", (chunk_id,)).fetchone()
+ if not row:
+ continue
+ embedding_bytes = serialize_f32(resolved_embed_fn(row[0]))
+ conn.execute("DELETE FROM chunk_vectors WHERE chunk_id = ?", (chunk_id,))
+ conn.execute("INSERT INTO chunk_vectors (chunk_id, embedding) VALUES (?, ?)", (chunk_id, embedding_bytes))
+ if "chunk_vectors_binary" in _table_names(conn):
+ conn.execute("DELETE FROM chunk_vectors_binary WHERE chunk_id = ?", (chunk_id,))
+ conn.execute(
+ "INSERT INTO chunk_vectors_binary (chunk_id, embedding) VALUES (?, vec_quantize_binary(?))",
+ (chunk_id, embedding_bytes),
+ )
+ except Exception as exc:
+ logger.warning("Failed to embed drained chunk %s: %s", chunk_id, exc)
+
+
+def _quarantine_file(path: Path, log_path: Path, reason: BaseException) -> None:
+ target = path.with_name(f"{path.name}.bad")
+ if target.exists():
+ target = path.with_name(f"{path.name}.bad-{int(time.time() * 1000)}-{uuid.uuid4().hex[:8]}")
+ try:
+ path.replace(target)
+ _log(log_path, f"skipped poison queue file {path.name}: {reason}; moved_to={target.name}")
+ except OSError as exc:
+ _log(log_path, f"skipped poison queue file {path.name}: {reason}; quarantine_failed={exc}")
+
+
+def _unlink_processed_file(path: Path, log_path: Path) -> None:
+ try:
+ path.unlink()
+ except FileNotFoundError:
+ pass
+ except OSError as exc:
+ _log(log_path, f"drain committed but could not unlink {path}: {exc}")
+
+
+def drain_once(
+ *,
+ db_path: Path | None = None,
+ queue_dir: Path | None = None,
+ batch_size: int = 250,
+ log_path: Path | None = None,
+ embed_fn: Callable[[str], list[float]] | None = None,
+) -> int:
+ db_path = db_path or _default_db_path()
+ queue_dir = queue_dir or _default_queue_dir()
+ log_path = log_path or _default_log_path()
+ queue_dir.mkdir(parents=True, exist_ok=True)
+
+ lock_fd = _acquire_queue_lock(queue_dir)
+ try:
+ files = sorted(queue_dir.glob("*.jsonl"))[:batch_size]
+ if not files:
+ return 0
+ _log(log_path, f"queue_depth={len(files)}")
+
+ drained = 0
+ collisions_dropped = 0
+ should_embed = _embedding_enabled()
+ for path in files:
+ try:
+ events = _read_events(path)
+ except (UnicodeDecodeError, OSError) as exc:
+ _quarantine_file(path, log_path, exc)
+ continue
+ if not events:
+ _unlink_processed_file(path, log_path)
+ continue
+
+ for attempt in range(5):
+ conn = _open_connection(db_path)
+ attempt_drained = 0
+ collision_ids: list[str] = []
+ store_chunk_ids: list[str] = []
+ try:
+ conn.execute("BEGIN IMMEDIATE")
+ for event in events:
+ result = _apply_event(conn, event)
+ if result.chunk_id:
+ store_chunk_ids.append(result.chunk_id)
+ if result.collision_chunk_id:
+ collision_ids.append(result.collision_chunk_id)
+ attempt_drained += 1
+ if should_embed:
+ _embed_store_chunks(conn, store_chunk_ids, embed_fn)
+ conn.execute("COMMIT")
+ drained += attempt_drained
+ collisions_dropped += len(collision_ids)
+ for chunk_id in collision_ids:
+ _log(log_path, f"WARN: queued chunk_id {chunk_id} collided with existing row, dropped")
+ _unlink_processed_file(path, log_path)
+ break
+ except Exception as exc:
+ try:
+ conn.execute("ROLLBACK")
+ except Exception:
+ pass
+ if _is_busy_error(exc) and attempt < 4:
+ delay = 0.05 * (2**attempt)
+ _log(log_path, f"drain busy; retrying in {delay:.2f}s")
+ time.sleep(delay)
+ continue
+ _log(log_path, f"drain failed for {path.name}: {exc}")
+ break
+ finally:
+ conn.close()
+ finally:
+ fcntl.flock(lock_fd, fcntl.LOCK_UN)
+ os.close(lock_fd)
+
+ if drained:
+ _log(log_path, f"drained={drained} collisions_dropped={collisions_dropped}")
+ return drained
+
+
+def run_daemon(interval: float, batch_size: int) -> None:
+ while True:
+ drain_once(batch_size=batch_size)
+ time.sleep(interval)
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--once", action="store_true")
+ parser.add_argument("--interval", type=float, default=0.5)
+ parser.add_argument("--batch-size", type=int, default=250)
+ args = parser.parse_args()
+ if args.once:
+ print(drain_once(batch_size=args.batch_size))
+ return 0
+ run_daemon(args.interval, args.batch_size)
+ return 0
diff --git a/src/brainlayer/enrichment_controller.py b/src/brainlayer/enrichment_controller.py
index 4226acc5..11530809 100644
--- a/src/brainlayer/enrichment_controller.py
+++ b/src/brainlayer/enrichment_controller.py
@@ -164,6 +164,39 @@ def _submit_write(store, name: str, callback) -> Any:
return _get_store_write_queue(store).submit(name, callback).result()
+def _arbitrated_writes_enabled() -> bool:
+ return os.environ.get("BRAINLAYER_ARBITRATED") == "1"
+
+
+def _enqueue_enrichment_write(chunk: dict[str, Any], enrichment: dict[str, Any]) -> None:
+ from .queue_io import enqueue_enrichment_update
+
+ content = chunk.get("content", "")
+ try:
+ enqueue_enrichment_update(
+ chunk_id=chunk["id"],
+ enrichment=enrichment,
+ content_hash=_content_hash(content) if content else None,
+ entities=enrichment.get("entities", []),
+ )
+ except Exception:
+ logger.exception("Failed to enqueue enrichment update for chunk %s", chunk.get("id"))
+ raise
+
+
+def _enqueue_meta_research_write(chunk: dict[str, Any]) -> None:
+ tags = _normalize_chunk_tags(chunk.get("tags"))
+ if "meta-research" not in tags:
+ tags.append("meta-research")
+ _enqueue_enrichment_write(
+ chunk,
+ {
+ "summary": None,
+ "tags": tags,
+ },
+ )
+
+
def _ensure_enrichment_columns(store) -> None:
key = _store_queue_key(store)
with _ENRICHMENT_COLUMN_LOCK:
@@ -639,7 +672,10 @@ def enrich_single(store, chunk_id: str, max_retries: int = 2) -> dict[str, Any]
return None
if is_meta_research(chunk.get("content", "")):
- _submit_write(store, f"mark-meta:{chunk_id}", lambda: _mark_meta_research(store, chunk))
+ if _arbitrated_writes_enabled():
+ _enqueue_meta_research_write(chunk)
+ else:
+ _submit_write(store, f"mark-meta:{chunk_id}", lambda: _mark_meta_research(store, chunk))
logger.info("enrich_single: tagged %s as meta-research without Gemini", chunk_id)
return None
@@ -680,7 +716,12 @@ def _call():
return None
try:
- _submit_write(store, f"apply-enrichment:{chunk_id}", lambda: _apply_enrichment(store, chunk, enrichment))
+ if _arbitrated_writes_enabled():
+ _enqueue_enrichment_write(chunk, enrichment)
+ else:
+ _submit_write(
+ store, f"apply-enrichment:{chunk_id}", lambda: _apply_enrichment(store, chunk, enrichment)
+ )
except Exception as exc:
logger.warning("enrich_single: apply failed for %s: %s", chunk_id, exc)
return None
@@ -811,9 +852,12 @@ def is_duplicate(content: str) -> bool:
result.skipped += 1
continue
if status == "meta":
- _submit_write(
- store, f"mark-meta:{chunk['id']}", lambda chunk=chunk: _mark_meta_research(store, chunk)
- )
+ if _arbitrated_writes_enabled():
+ _enqueue_meta_research_write(chunk)
+ else:
+ _submit_write(
+ store, f"mark-meta:{chunk['id']}", lambda chunk=chunk: _mark_meta_research(store, chunk)
+ )
result.skipped += 1
continue
if status == "error":
@@ -822,11 +866,14 @@ def is_duplicate(content: str) -> bool:
_emit_enrichment_error("realtime", chunk["id"], str(data))
continue
- _submit_write(
- store,
- f"apply-enrichment:{chunk['id']}",
- lambda chunk=chunk, data=data: _apply_enrichment(store, chunk, data),
- )
+ if _arbitrated_writes_enabled():
+ _enqueue_enrichment_write(chunk, data)
+ else:
+ _submit_write(
+ store,
+ f"apply-enrichment:{chunk['id']}",
+ lambda chunk=chunk, data=data: _apply_enrichment(store, chunk, data),
+ )
result.enriched += 1
duration_ms = (time.monotonic() - start_time) * 1000
@@ -877,7 +924,12 @@ def enrich_batch(
for chunk in candidates:
if is_meta_research(chunk.get("content", "")):
- _submit_write(store, f"mark-meta:{chunk['id']}", lambda chunk=chunk: _mark_meta_research(store, chunk))
+ if _arbitrated_writes_enabled():
+ _enqueue_meta_research_write(chunk)
+ else:
+ _submit_write(
+ store, f"mark-meta:{chunk['id']}", lambda chunk=chunk: _mark_meta_research(store, chunk)
+ )
result.skipped += 1
continue
if _is_duplicate_content(store, chunk.get("content", "")):
@@ -908,11 +960,14 @@ def enrich_batch(
result.errors.append(f"{chunk['id']}: invalid_enrichment")
_emit_enrichment_error("batch", chunk["id"], "invalid_enrichment")
continue
- _submit_write(
- store,
- f"apply-enrichment:{chunk['id']}",
- lambda chunk=chunk, enrichment=enrichment: _apply_enrichment(store, chunk, enrichment),
- )
+ if _arbitrated_writes_enabled():
+ _enqueue_enrichment_write(chunk, enrichment)
+ else:
+ _submit_write(
+ store,
+ f"apply-enrichment:{chunk['id']}",
+ lambda chunk=chunk, enrichment=enrichment: _apply_enrichment(store, chunk, enrichment),
+ )
result.enriched += 1
except Exception as exc:
result.failed += 1
diff --git a/src/brainlayer/hooks/indexer.py b/src/brainlayer/hooks/indexer.py
index b4026843..82522673 100644
--- a/src/brainlayer/hooks/indexer.py
+++ b/src/brainlayer/hooks/indexer.py
@@ -13,12 +13,13 @@
"""
import hashlib
-import json
import logging
import sqlite3
import time
from pathlib import Path
+from ..queue_io import enqueue_hook_chunk, get_queue_dir
+
logger = logging.getLogger(__name__)
@@ -27,7 +28,7 @@ class RealtimeIndexer:
def __init__(self, db_path: str | None = None, queue_dir: str | None = None):
self.db_path = db_path
- self.queue_dir = queue_dir
+ self.queue_dir = queue_dir or str(get_queue_dir())
self.pending_prompts: dict[str, dict] = {}
self._db: sqlite3.Connection | None = None
self._init_db()
@@ -204,18 +205,14 @@ def _write_to_queue(self, session_id: str, content: str, content_hash: str, proj
"""Fallback: write to queue directory when DB is unavailable."""
if not self.queue_dir:
return
- queue_path = Path(self.queue_dir)
- queue_path.mkdir(parents=True, exist_ok=True)
- entry = {
- "session_id": session_id,
- "content": content,
- "content_hash": content_hash,
- "project": project,
- "timestamp": time.time(),
- }
- queue_file = queue_path / f"{session_id}.jsonl"
- with open(queue_file, "a") as f:
- f.write(json.dumps(entry) + "\n")
+ enqueue_hook_chunk(
+ session_id=session_id,
+ content=content,
+ content_hash=content_hash,
+ project=project,
+ timestamp=time.time(),
+ queue_dir=Path(self.queue_dir),
+ )
def close(self):
"""Close the database connection."""
diff --git a/src/brainlayer/mcp/store_handler.py b/src/brainlayer/mcp/store_handler.py
index bfcea10a..062852ef 100644
--- a/src/brainlayer/mcp/store_handler.py
+++ b/src/brainlayer/mcp/store_handler.py
@@ -2,6 +2,7 @@
import asyncio
import json
+import os
import threading
import apsw
@@ -388,6 +389,14 @@ def _queue_store(item: dict) -> None:
Enforces _QUEUE_MAX_SIZE: if the file exceeds the limit, oldest lines
are dropped to make room.
"""
+ try:
+ from ..queue_io import enqueue_store
+
+ enqueue_store(**item, source="mcp")
+ return
+ except Exception:
+ logger.debug("Unified queue write failed; falling back to pending-stores.jsonl", exc_info=True)
+
path = _get_pending_store_path()
path.parent.mkdir(parents=True, exist_ok=True)
@@ -491,6 +500,42 @@ async def _store(
A background task embeds pending chunks after the response is sent.
"""
try:
+ if os.environ.get("BRAINLAYER_ARBITRATED") == "1":
+ from ..pipeline.classify import looks_like_system_prompt
+ from ..search_repo import clear_hybrid_search_cache
+ from ..store import VALID_MEMORY_TYPES
+
+ if not content or not content.strip():
+ raise ValueError("content must be non-empty")
+ content = content.strip()
+ if memory_type not in VALID_MEMORY_TYPES:
+ raise ValueError(f"type must be one of: {', '.join(VALID_MEMORY_TYPES)}")
+ if looks_like_system_prompt(content):
+ raise ValueError("system prompt content is not stored in BrainLayer")
+ _queue_store(
+ {
+ "content": content,
+ "memory_type": memory_type,
+ "project": _normalize_project_name(project),
+ "tags": tags,
+ "importance": importance,
+ "confidence_score": confidence_score,
+ "outcome": outcome,
+ "reversibility": reversibility,
+ "files_changed": files_changed,
+ "entity_id": entity_id,
+ "status": status,
+ "severity": severity,
+ "file_path": file_path,
+ "function_name": function_name,
+ "line_number": line_number,
+ "supersedes": supersedes,
+ }
+ )
+ clear_hybrid_search_cache()
+ structured = {"chunk_id": "queued", "related": []}
+ return ([TextContent(type="text", text=format_store_result("queued", queued=True))], structured)
+
from ..store import embed_pending_chunks, store_memory
store = _get_vector_store()
diff --git a/src/brainlayer/queue_io.py b/src/brainlayer/queue_io.py
new file mode 100644
index 00000000..15b173d9
--- /dev/null
+++ b/src/brainlayer/queue_io.py
@@ -0,0 +1,149 @@
+"""Durable JSONL queue for BrainLayer write arbitration."""
+
+from __future__ import annotations
+
+import json
+import os
+import time
+import uuid
+from pathlib import Path
+from typing import Any
+
+
+def _safe_source_name(source: str) -> str:
+ safe = "".join(char if char.isalnum() or char in {".", "_", "-"} else "_" for char in source)
+ return safe.strip("._-") or "queue"
+
+
+def get_queue_dir() -> Path:
+ env = os.environ.get("BRAINLAYER_QUEUE_DIR")
+ if env:
+ return Path(env).expanduser()
+ return Path.home() / ".brainlayer" / "queue"
+
+
+def enqueue_jsonl(event: dict[str, Any], *, source: str, queue_dir: Path | None = None) -> Path:
+ """Atomically append a write intent as a one-line JSONL file."""
+ resolved_dir = queue_dir or get_queue_dir()
+ resolved_dir.mkdir(parents=True, exist_ok=True)
+ now_ms = int(time.time() * 1000)
+ safe_source = _safe_source_name(source)
+ event = {
+ **event,
+ "source": source,
+ "queued_at": time.time(),
+ }
+ final_path = resolved_dir / f"{safe_source}-{now_ms}-{uuid.uuid4().hex}.jsonl"
+ tmp_path = final_path.with_suffix(".tmp")
+ tmp_path.write_text(json.dumps(event, ensure_ascii=True) + "\n", encoding="utf-8")
+ tmp_path.replace(final_path)
+ return final_path
+
+
+def enqueue_store(
+ *,
+ content: str,
+ memory_type: str = "note",
+ project: str | None = None,
+ tags: list[str] | None = None,
+ importance: int | None = None,
+ source: str = "mcp",
+ queue_dir: Path | None = None,
+ **metadata: Any,
+) -> Path:
+ supersedes = metadata.pop("supersedes", None)
+ chunk_id = metadata.pop("chunk_id", None) or f"manual-{uuid.uuid4().hex[:16]}"
+ return enqueue_jsonl(
+ {
+ "kind": "store_memory",
+ "chunk_id": chunk_id,
+ "content": content,
+ "memory_type": memory_type,
+ "project": project,
+ "tags": tags,
+ "importance": importance,
+ "supersedes": supersedes,
+ "metadata": {key: value for key, value in metadata.items() if value is not None},
+ },
+ source=source,
+ queue_dir=queue_dir,
+ )
+
+
+def enqueue_watcher_chunk(
+ *,
+ chunk_id: str,
+ content: str,
+ metadata: dict[str, Any],
+ source_file: str,
+ project: str | None,
+ content_type: str,
+ value_type: str,
+ created_at: str,
+ conversation_id: str,
+ sender: str | None = None,
+ tags: list[str] | None = None,
+ queue_dir: Path | None = None,
+) -> Path:
+ return enqueue_jsonl(
+ {
+ "kind": "watcher_chunk",
+ "chunk_id": chunk_id,
+ "content": content,
+ "metadata": metadata,
+ "source_file": source_file,
+ "project": project,
+ "content_type": content_type,
+ "value_type": value_type,
+ "created_at": created_at,
+ "conversation_id": conversation_id,
+ "sender": sender,
+ "tags": tags,
+ },
+ source="watcher",
+ queue_dir=queue_dir,
+ )
+
+
+def enqueue_hook_chunk(
+ *,
+ session_id: str,
+ content: str,
+ content_hash: str | None = None,
+ project: str | None = None,
+ timestamp: float | None = None,
+ queue_dir: Path | None = None,
+) -> Path:
+ return enqueue_jsonl(
+ {
+ "kind": "hook_chunk",
+ "session_id": session_id,
+ "content": content,
+ "content_hash": content_hash,
+ "project": project,
+ "timestamp": timestamp or time.time(),
+ },
+ source="hook",
+ queue_dir=queue_dir,
+ )
+
+
+def enqueue_enrichment_update(
+ *,
+ chunk_id: str,
+ enrichment: dict[str, Any],
+ content_hash: str | None = None,
+ entities: list[Any] | None = None,
+ queue_dir: Path | None = None,
+) -> Path:
+ return enqueue_jsonl(
+ {
+ "kind": "enrichment_update",
+ "chunk_id": chunk_id,
+ "enrichment": enrichment,
+ "content_hash": content_hash,
+ "entities": entities,
+ },
+ source="enrichment",
+ queue_dir=queue_dir,
+ )
diff --git a/src/brainlayer/vector_store.py b/src/brainlayer/vector_store.py
index a7a98c23..3aaf55cd 100644
--- a/src/brainlayer/vector_store.py
+++ b/src/brainlayer/vector_store.py
@@ -363,14 +363,9 @@ def _init_db(self) -> None:
END
""")
- trigram_count = cursor.execute("SELECT COUNT(*) FROM chunks_fts_trigram").fetchone()[0]
- chunk_count = cursor.execute("SELECT COUNT(*) FROM chunks").fetchone()[0]
- if trigram_count != chunk_count:
- cursor.execute("DELETE FROM chunks_fts_trigram")
- cursor.execute("""
- INSERT INTO chunks_fts_trigram(content, summary, tags, resolved_query, key_facts, resolved_queries, chunk_id)
- SELECT content, summary, tags, resolved_query, key_facts, resolved_queries, id FROM chunks
- """)
+ self._schema_user_version = cursor.execute("PRAGMA user_version").fetchone()[0]
+ if os.environ.get("BRAINLAYER_REPAIR") == "1":
+ self.repair_fts(rebuild_trigram=True)
# ── Tag junction table (replaces json_each scanning) ──────────
cursor.execute("""
@@ -941,6 +936,41 @@ def _init_db(self) -> None:
# MCP tool calls (e.g., brain_search) hit the same VectorStore.
self._local = threading.local()
+ def repair_fts(self, *, rebuild_trigram: bool = True) -> dict[str, int]:
+ """Run explicit FTS repair work outside normal startup."""
+ for attempt in range(5):
+ cursor = self.conn.cursor()
+ repaired: dict[str, int] = {}
+ transaction_started = False
+ try:
+ cursor.execute("PRAGMA wal_checkpoint(FULL)")
+ cursor.execute("BEGIN IMMEDIATE")
+ transaction_started = True
+ if rebuild_trigram:
+ cursor.execute("DELETE FROM chunks_fts_trigram")
+ cursor.execute("""
+ INSERT INTO chunks_fts_trigram(content, summary, tags, resolved_query, key_facts, resolved_queries, chunk_id)
+ SELECT content, summary, tags, resolved_query, key_facts, resolved_queries, id FROM chunks
+ """)
+ repaired["chunks_fts_trigram"] = cursor.execute(
+ "SELECT COUNT(*) FROM chunks_fts_trigram"
+ ).fetchone()[0]
+ cursor.execute("COMMIT")
+ transaction_started = False
+ cursor.execute("PRAGMA wal_checkpoint(FULL)")
+ return repaired
+ except apsw.BusyError:
+ if transaction_started:
+ cursor.execute("ROLLBACK")
+ if attempt == 4:
+ raise
+ time.sleep(0.2 * (2**attempt))
+ except Exception:
+ if transaction_started:
+ cursor.execute("ROLLBACK")
+ raise
+ return {}
+
def _get_read_conn(self) -> apsw.Connection:
"""Get or create a per-thread readonly connection."""
conn = getattr(self._local, "read_conn", None)
diff --git a/src/brainlayer/watcher_bridge.py b/src/brainlayer/watcher_bridge.py
index 8dabe761..5703ebd5 100644
--- a/src/brainlayer/watcher_bridge.py
+++ b/src/brainlayer/watcher_bridge.py
@@ -13,6 +13,7 @@
import hashlib
import json
import logging
+import os
import re
from datetime import datetime, timezone
from pathlib import Path
@@ -22,6 +23,7 @@
from .pipeline.chunk import chunk_content
from .pipeline.classify import classify_content
from .pipeline.correction_detection import build_correction_tags
+from .queue_io import enqueue_watcher_chunk
from .vector_store import VectorStore
logger = logging.getLogger(__name__)
@@ -202,15 +204,15 @@ def create_flush_callback(db_path: Path | None = None) -> callable:
Returns a callable that takes a list[dict] of raw JSONL entries and
inserts them as chunks into the database (deferred embedding).
"""
- resolved_db = db_path or get_db_path()
- store = VectorStore(resolved_db)
+ arbitrated = os.environ.get("BRAINLAYER_ARBITRATED") == "1"
+ store = None if arbitrated else VectorStore(db_path or get_db_path())
def flush_to_db(entries: list[dict[str, Any]]) -> None:
"""Process raw JSONL entries through pipeline and insert into DB."""
import time as _time
flush_start = _time.monotonic()
- cursor = store.conn.cursor()
+ cursor = None if store is None else store.conn.cursor()
inserted = 0
skipped = 0
source_files_seen: set[str] = set()
@@ -268,34 +270,51 @@ def flush_to_db(entries: list[dict[str, Any]]) -> None:
tags = json.dumps(correction_tags)
try:
- cursor.execute(
- """INSERT OR IGNORE INTO chunks
- (id, content, metadata, source_file, project,
- content_type, value_type, char_count, source,
- created_at, conversation_id, sender, tags)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
- (
- chunk_id,
- clean_content,
- json.dumps(chunk.metadata),
- source_file,
- project,
- chunk.content_type.value,
- chunk.value.value,
- len(clean_content),
- "realtime_watcher",
- created_at,
- conversation_id,
- chunk.metadata.get("sender"),
- tags,
- ),
- )
- if store.conn.changes() > 0:
+ if arbitrated:
+ enqueue_watcher_chunk(
+ chunk_id=chunk_id,
+ content=clean_content,
+ metadata=chunk.metadata,
+ source_file=source_file,
+ project=project,
+ content_type=chunk.content_type.value,
+ value_type=chunk.value.value,
+ created_at=created_at,
+ conversation_id=conversation_id,
+ sender=chunk.metadata.get("sender"),
+ tags=json.loads(tags) if tags else None,
+ )
inserted += 1
else:
- skipped += 1
+ assert cursor is not None and store is not None
+ cursor.execute(
+ """INSERT OR IGNORE INTO chunks
+ (id, content, metadata, source_file, project,
+ content_type, value_type, char_count, source,
+ created_at, conversation_id, sender, tags)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
+ (
+ chunk_id,
+ clean_content,
+ json.dumps(chunk.metadata),
+ source_file,
+ project,
+ chunk.content_type.value,
+ chunk.value.value,
+ len(clean_content),
+ "realtime_watcher",
+ created_at,
+ conversation_id,
+ chunk.metadata.get("sender"),
+ tags,
+ ),
+ )
+ if store.conn.changes() > 0:
+ inserted += 1
+ else:
+ skipped += 1
except Exception as e:
- logger.warning("Insert failed for %s: %s", chunk_id, e)
+ logger.warning("Queue/write failed for %s: %s", chunk_id, e)
skipped += 1
latency_ms = (_time.monotonic() - flush_start) * 1000
diff --git a/tests/test_arbitration.py b/tests/test_arbitration.py
new file mode 100644
index 00000000..8bd4f7b1
--- /dev/null
+++ b/tests/test_arbitration.py
@@ -0,0 +1,418 @@
+import json
+import multiprocessing as mp
+import re
+import sqlite3
+import time
+from pathlib import Path
+
+import apsw
+import sqlite_vec
+
+
+def _producer(queue_dir: str, worker_id: int, count: int) -> None:
+ from brainlayer.queue_io import enqueue_store
+
+ for index in range(count):
+ enqueue_store(
+ content=f"arbitration worker={worker_id} item={index}",
+ memory_type="note",
+ project="arbitration-test",
+ tags=["arbitration", f"worker-{worker_id}"],
+ importance=7,
+ queue_dir=Path(queue_dir),
+ source="test",
+ )
+
+
+def _create_minimal_db(path: Path) -> None:
+ conn = sqlite3.connect(path)
+ try:
+ conn.executescript(
+ """
+ PRAGMA journal_mode=WAL;
+ CREATE TABLE chunks (
+ id TEXT PRIMARY KEY,
+ content TEXT NOT NULL,
+ metadata TEXT NOT NULL,
+ source_file TEXT NOT NULL,
+ project TEXT,
+ content_type TEXT,
+ value_type TEXT,
+ char_count INTEGER,
+ source TEXT,
+ created_at TEXT,
+ enriched_at TEXT,
+ summary TEXT,
+ tags TEXT,
+ importance REAL,
+ superseded_by TEXT
+ );
+ CREATE TABLE kg_entities (
+ id TEXT PRIMARY KEY,
+ name TEXT NOT NULL
+ );
+ CREATE TABLE kg_entity_chunks (
+ entity_id TEXT NOT NULL,
+ chunk_id TEXT NOT NULL,
+ relevance REAL DEFAULT 1.0,
+ context TEXT,
+ PRIMARY KEY (entity_id, chunk_id)
+ );
+ CREATE TABLE chunk_vectors (
+ chunk_id TEXT PRIMARY KEY,
+ embedding BLOB
+ );
+ CREATE TABLE chunk_vectors_binary (
+ chunk_id TEXT PRIMARY KEY,
+ embedding BLOB
+ );
+ CREATE VIRTUAL TABLE chunks_fts USING fts5(
+ content, summary, tags, resolved_query, key_facts, resolved_queries, chunk_id UNINDEXED
+ );
+ CREATE TRIGGER chunks_fts_insert AFTER INSERT ON chunks BEGIN
+ INSERT INTO chunks_fts(content, summary, tags, chunk_id)
+ VALUES (new.content, new.summary, new.tags, new.id);
+ END;
+ """
+ )
+ conn.commit()
+ finally:
+ conn.close()
+
+
+def _connect_apsw(path: Path) -> apsw.Connection:
+ conn = apsw.Connection(str(path))
+ conn.enableloadextension(True)
+ conn.loadextension(sqlite_vec.loadable_path())
+ conn.enableloadextension(False)
+ return conn
+
+
+def _create_vec_db(path: Path) -> None:
+ conn = _connect_apsw(path)
+ try:
+ conn.execute("""
+ CREATE TABLE chunks (
+ id TEXT PRIMARY KEY,
+ content TEXT NOT NULL,
+ metadata TEXT NOT NULL,
+ source_file TEXT NOT NULL,
+ project TEXT,
+ content_type TEXT,
+ value_type TEXT,
+ char_count INTEGER,
+ source TEXT,
+ created_at TEXT,
+ enriched_at TEXT,
+ summary TEXT,
+ tags TEXT,
+ importance REAL
+ )
+ """)
+ conn.execute("""
+ CREATE VIRTUAL TABLE chunk_vectors USING vec0(
+ chunk_id TEXT PRIMARY KEY,
+ embedding FLOAT[1024]
+ )
+ """)
+ conn.execute("""
+ CREATE VIRTUAL TABLE chunk_vectors_binary USING vec0(
+ chunk_id TEXT PRIMARY KEY,
+ embedding BIT[1024]
+ )
+ """)
+ finally:
+ conn.close()
+
+
+def test_drain_default_queue_dir_expands_env_tilde(monkeypatch):
+ from brainlayer.drain import _default_queue_dir
+
+ monkeypatch.setenv("BRAINLAYER_QUEUE_DIR", "~/brainlayer-arbitration-test")
+
+ queue_dir = _default_queue_dir()
+
+ assert "~" not in str(queue_dir)
+ assert queue_dir == Path.home() / "brainlayer-arbitration-test"
+
+
+def test_drain_daemon_serializes_three_concurrent_producers(tmp_path, monkeypatch):
+ from brainlayer.drain import drain_once
+
+ db_path = tmp_path / "brainlayer.db"
+ queue_dir = tmp_path / "queue"
+ log_path = tmp_path / "drain.log"
+ _create_minimal_db(db_path)
+ monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0")
+
+ workers = [mp.Process(target=_producer, args=(str(queue_dir), worker, 1000)) for worker in range(3)]
+ for worker in workers:
+ worker.start()
+ for worker in workers:
+ worker.join(timeout=20)
+ assert worker.exitcode == 0
+
+ deadline = time.monotonic() + 15
+ total_drained = 0
+ while time.monotonic() < deadline:
+ total_drained += drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=250, log_path=log_path)
+ with _connect_apsw(db_path) as conn:
+ count = conn.execute("SELECT COUNT(*) FROM chunks WHERE project = 'arbitration-test'").fetchone()[0]
+ fts_count = conn.execute("SELECT COUNT(*) FROM chunks_fts WHERE chunks_fts MATCH 'arbitration'").fetchone()[
+ 0
+ ]
+ if count == 3000 and fts_count == 3000:
+ break
+ time.sleep(0.05)
+
+ assert total_drained == 3000
+ assert count == 3000
+ assert fts_count == 3000
+ assert not list(queue_dir.glob("*.jsonl"))
+ assert "database is locked" not in log_path.read_text(encoding="utf-8").lower()
+
+
+def test_queue_sanitizes_source_and_drain_preserves_supersedes(tmp_path):
+ from brainlayer.drain import drain_once
+ from brainlayer.queue_io import enqueue_store
+
+ db_path = tmp_path / "brainlayer.db"
+ queue_dir = tmp_path / "queue"
+ _create_minimal_db(db_path)
+ with sqlite3.connect(db_path) as conn:
+ conn.execute(
+ """
+ INSERT INTO chunks (id, content, metadata, source_file)
+ VALUES ('old-id', 'old content', '{}', 'seed')
+ """
+ )
+ conn.execute("INSERT INTO kg_entities (id, name) VALUES ('person-1', 'Person One')")
+ conn.commit()
+
+ queued_path = enqueue_store(
+ content="replacement content",
+ project="arbitration-test",
+ source="../unsafe/source",
+ supersedes="old-id",
+ entity_id="person-1",
+ queue_dir=queue_dir,
+ )
+
+ assert queued_path.parent == queue_dir
+ assert ".." not in queued_path.name
+ assert "/" not in queued_path.name
+ assert re.fullmatch(r"[A-Za-z0-9_.-]+", queued_path.name)
+ assert drain_once(db_path=db_path, queue_dir=queue_dir, embed_fn=lambda text: [0.1] * 1024) == 1
+
+ with _connect_apsw(db_path) as conn:
+ replacement_id = conn.execute("SELECT id FROM chunks WHERE content = 'replacement content'").fetchone()[0]
+ superseded_by = conn.execute("SELECT superseded_by FROM chunks WHERE id = 'old-id'").fetchone()[0]
+ entity_link = conn.execute("SELECT chunk_id FROM kg_entity_chunks WHERE entity_id = 'person-1'").fetchone()[0]
+
+ assert superseded_by == replacement_id
+ assert entity_link == replacement_id
+
+
+def test_drain_embeds_every_queued_store(tmp_path):
+ from brainlayer.drain import drain_once
+ from brainlayer.queue_io import enqueue_store
+
+ db_path = tmp_path / "brainlayer.db"
+ queue_dir = tmp_path / "queue"
+ _create_minimal_db(db_path)
+
+ for index in range(3):
+ enqueue_store(
+ content=f"queued semantic memory {index}",
+ project="arbitration-test",
+ source="mcp",
+ queue_dir=queue_dir,
+ )
+
+ assert drain_once(db_path=db_path, queue_dir=queue_dir, embed_fn=lambda text: [0.1] * 1024) == 3
+
+ deadline = time.monotonic() + 2
+ vector_count = 0
+ binary_count = 0
+ while time.monotonic() < deadline:
+ with _connect_apsw(db_path) as conn:
+ vector_count = conn.execute("SELECT COUNT(*) FROM chunk_vectors").fetchone()[0]
+ binary_count = conn.execute("SELECT COUNT(*) FROM chunk_vectors_binary").fetchone()[0]
+ if vector_count == 3 and binary_count == 3:
+ break
+ time.sleep(0.05)
+
+ assert vector_count == 3
+ assert binary_count == 3
+
+
+def test_drain_ignores_non_object_store_metadata(tmp_path):
+ from brainlayer.drain import drain_once
+
+ db_path = tmp_path / "brainlayer.db"
+ queue_dir = tmp_path / "queue"
+ queue_dir.mkdir()
+ _create_minimal_db(db_path)
+ (queue_dir / "store.jsonl").write_text(
+ json.dumps(
+ {
+ "kind": "store_memory",
+ "chunk_id": "bad-meta",
+ "content": "queued memory with bad metadata",
+ "memory_type": "note",
+ "metadata": "not-a-dict",
+ }
+ )
+ + "\n",
+ encoding="utf-8",
+ )
+
+ assert drain_once(db_path=db_path, queue_dir=queue_dir, embed_fn=lambda text: [0.1] * 1024) == 1
+
+ with _connect_apsw(db_path) as conn:
+ row = conn.execute("SELECT metadata FROM chunks WHERE id = 'bad-meta'").fetchone()
+
+ assert json.loads(row[0]) == {"memory_type": "note"}
+
+
+def test_drain_loads_sqlite_vec_for_vec0_tables(tmp_path):
+ from brainlayer.drain import drain_once
+ from brainlayer.queue_io import enqueue_store
+
+ db_path = tmp_path / "brainlayer.db"
+ queue_dir = tmp_path / "queue"
+ _create_vec_db(db_path)
+
+ enqueue_store(
+ content="queued memory requiring sqlite vec",
+ project="arbitration-test",
+ source="mcp",
+ queue_dir=queue_dir,
+ )
+
+ assert drain_once(db_path=db_path, queue_dir=queue_dir, embed_fn=lambda text: [0.1] * 1024) == 1
+
+ with _connect_apsw(db_path) as conn:
+ assert conn.execute("SELECT COUNT(*) FROM chunk_vectors").fetchone()[0] == 1
+ assert conn.execute("SELECT COUNT(*) FROM chunk_vectors_binary").fetchone()[0] == 1
+
+
+def test_poison_queue_file_does_not_rollback_good_file(tmp_path):
+ from brainlayer.drain import drain_once
+ from brainlayer.queue_io import enqueue_store
+
+ db_path = tmp_path / "brainlayer.db"
+ queue_dir = tmp_path / "queue"
+ log_path = tmp_path / "drain.log"
+ _create_minimal_db(db_path)
+
+ good_path = enqueue_store(
+ content="good memory survives poison",
+ project="arbitration-test",
+ source="mcp",
+ queue_dir=queue_dir,
+ )
+ poison_path = queue_dir / "poison.jsonl"
+ poison_path.write_bytes(b"\xff\xff")
+
+ assert drain_once(db_path=db_path, queue_dir=queue_dir, log_path=log_path, embed_fn=lambda text: [0.1] * 1024) == 1
+
+ with _connect_apsw(db_path) as conn:
+ stored = conn.execute("SELECT COUNT(*) FROM chunks WHERE content = 'good memory survives poison'").fetchone()[0]
+
+ assert stored == 1
+ assert not good_path.exists()
+ assert not poison_path.exists()
+ assert list(queue_dir.glob("poison.jsonl.bad*"))
+ assert "poison" in log_path.read_text(encoding="utf-8").lower()
+
+
+def test_chunk_id_collision_is_logged_and_dropped(tmp_path):
+ from brainlayer.drain import drain_once
+ from brainlayer.queue_io import enqueue_store
+
+ db_path = tmp_path / "brainlayer.db"
+ queue_dir = tmp_path / "queue"
+ log_path = tmp_path / "drain.log"
+ _create_minimal_db(db_path)
+ with _connect_apsw(db_path) as conn:
+ conn.execute(
+ """
+ INSERT INTO chunks (id, content, metadata, source_file)
+ VALUES ('coll', 'ORIGINAL', '{}', 'seed')
+ """
+ )
+
+ enqueue_store(
+ content="QUEUED REPLACEMENT",
+ project="arbitration-test",
+ source="mcp",
+ queue_dir=queue_dir,
+ chunk_id="coll",
+ )
+
+ assert drain_once(db_path=db_path, queue_dir=queue_dir, log_path=log_path, embed_fn=lambda text: [0.1] * 1024) == 1
+
+ with _connect_apsw(db_path) as conn:
+ content = conn.execute("SELECT content FROM chunks WHERE id = 'coll'").fetchone()[0]
+ count = conn.execute("SELECT COUNT(*) FROM chunks WHERE id = 'coll'").fetchone()[0]
+
+ log_text = log_path.read_text(encoding="utf-8").lower()
+ assert content == "ORIGINAL"
+ assert count == 1
+ assert not list(queue_dir.glob("*.jsonl"))
+ assert "collided" in log_text
+ assert "collisions_dropped=1" in log_text
+
+
+def test_drain_lock_does_not_depend_on_deletable_lock_file(tmp_path, monkeypatch):
+ from brainlayer.drain import drain_once
+ from brainlayer.queue_io import enqueue_store
+
+ db_path = tmp_path / "brainlayer.db"
+ queue_dir = tmp_path / "queue"
+ _create_minimal_db(db_path)
+ monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0")
+
+ enqueue_store(
+ content="lock sentinel memory",
+ project="arbitration-test",
+ source="mcp",
+ queue_dir=queue_dir,
+ )
+
+ stale_lock = queue_dir / ".drain.lock"
+ stale_lock.write_text("stale", encoding="utf-8")
+ stale_lock.unlink()
+
+ assert drain_once(db_path=db_path, queue_dir=queue_dir) == 1
+ assert not stale_lock.exists()
+
+ with _connect_apsw(db_path) as conn:
+ count = conn.execute("SELECT COUNT(*) FROM chunks WHERE content = 'lock sentinel memory'").fetchone()[0]
+ assert count == 1
+
+
+def test_flush_migrates_legacy_pending_stores_idempotently(tmp_path, monkeypatch):
+ from brainlayer.cli import flush
+ from brainlayer.paths import get_db_path
+
+ db_path = tmp_path / "brainlayer.db"
+ queue_dir = tmp_path / "queue"
+ _create_minimal_db(db_path)
+ pending_path = db_path.parent / "pending-stores.jsonl"
+ pending_path.write_text('{"content":"legacy pending memory","memory_type":"note","project":"arbitration-test"}\n')
+
+ monkeypatch.setenv("BRAINLAYER_DB", str(db_path))
+ monkeypatch.setenv("BRAINLAYER_QUEUE_DIR", str(queue_dir))
+ monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0")
+ assert get_db_path() == db_path
+
+ flush()
+ pending_path.write_text('{"content":"legacy pending memory","memory_type":"note","project":"arbitration-test"}\n')
+ flush()
+
+ with _connect_apsw(db_path) as conn:
+ rows = conn.execute("SELECT id FROM chunks WHERE content = 'legacy pending memory'").fetchall()
+
+ assert len(rows) == 1
diff --git a/tests/test_search_trigram_fts.py b/tests/test_search_trigram_fts.py
index 9275e3c4..e9251668 100644
--- a/tests/test_search_trigram_fts.py
+++ b/tests/test_search_trigram_fts.py
@@ -61,7 +61,7 @@ def test_hybrid_search_uses_trigram_fts_for_identifier_substrings(tmp_path):
store.close()
-def test_vector_store_repairs_partial_trigram_backfill_on_startup(tmp_path):
+def test_vector_store_repairs_partial_trigram_backfill_only_on_explicit_repair(tmp_path):
db_path = tmp_path / "trigram-repair.db"
store = VectorStore(db_path)
try:
@@ -71,10 +71,15 @@ def test_vector_store_repairs_partial_trigram_backfill_on_startup(tmp_path):
finally:
store.close()
- repaired = VectorStore(db_path)
+ opened = VectorStore(db_path)
try:
- trigram_count = repaired.conn.cursor().execute("SELECT COUNT(*) FROM chunks_fts_trigram").fetchone()[0]
- chunk_count = repaired.conn.cursor().execute("SELECT COUNT(*) FROM chunks").fetchone()[0]
- assert trigram_count == chunk_count == 2
+ trigram_count = opened.conn.cursor().execute("SELECT COUNT(*) FROM chunks_fts_trigram").fetchone()[0]
+ chunk_count = opened.conn.cursor().execute("SELECT COUNT(*) FROM chunks").fetchone()[0]
+ assert trigram_count == 1
+ assert chunk_count == 2
+
+ repaired = opened.repair_fts()
+ trigram_count = opened.conn.cursor().execute("SELECT COUNT(*) FROM chunks_fts_trigram").fetchone()[0]
+ assert repaired["chunks_fts_trigram"] == trigram_count == chunk_count == 2
finally:
- repaired.close()
+ opened.close()
diff --git a/tests/test_write_queue.py b/tests/test_write_queue.py
index 18ad3621..290c45c0 100644
--- a/tests/test_write_queue.py
+++ b/tests/test_write_queue.py
@@ -27,53 +27,37 @@ class TestQueueStore:
"""JSONL queue for buffering writes when DB is locked."""
def test_queue_store_writes_jsonl(self, tmp_path):
- """_queue_store writes a JSONL line to the pending file."""
- with patch(
- "brainlayer.mcp.store_handler._get_pending_store_path",
- return_value=tmp_path / "pending-stores.jsonl",
- ):
+ """_queue_store writes to the unified arbitration queue."""
+ with patch("brainlayer.queue_io.get_queue_dir", return_value=tmp_path):
_queue_store({"content": "test memory", "memory_type": "note"})
- path = tmp_path / "pending-stores.jsonl"
- assert path.exists()
- lines = path.read_text().strip().splitlines()
+ files = list(tmp_path.glob("mcp-*.jsonl"))
+ assert len(files) == 1
+ lines = files[0].read_text().strip().splitlines()
assert len(lines) == 1
item = json.loads(lines[0])
+ assert item["kind"] == "store_memory"
assert item["content"] == "test memory"
assert item["memory_type"] == "note"
def test_queue_store_appends(self, tmp_path):
- """Multiple queue calls append, not overwrite."""
- with patch(
- "brainlayer.mcp.store_handler._get_pending_store_path",
- return_value=tmp_path / "pending-stores.jsonl",
- ):
+ """Multiple queue calls create independent durable files."""
+ with patch("brainlayer.queue_io.get_queue_dir", return_value=tmp_path):
_queue_store({"content": "first", "memory_type": "note"})
_queue_store({"content": "second", "memory_type": "learning"})
- lines = (tmp_path / "pending-stores.jsonl").read_text().strip().splitlines()
- assert len(lines) == 2
+ files = sorted(tmp_path.glob("mcp-*.jsonl"))
+ assert len(files) == 2
+ contents = {json.loads(path.read_text())["content"] for path in files}
+ assert contents == {"first", "second"}
- def test_queue_max_size_drops_oldest(self, tmp_path):
- """Queue respects max size limit and drops oldest items."""
- pending_path = tmp_path / "pending-stores.jsonl"
- with patch(
- "brainlayer.mcp.store_handler._get_pending_store_path",
- return_value=pending_path,
- ):
- # Queue 105 items (over the 100 limit)
+ def test_queue_store_keeps_burst_items(self, tmp_path):
+ """Unified queue keeps burst items as separate files."""
+ with patch("brainlayer.queue_io.get_queue_dir", return_value=tmp_path):
for i in range(105):
_queue_store({"content": f"item-{i}", "memory_type": "note"})
- lines = pending_path.read_text().strip().splitlines()
- # Should be exactly 100, with oldest 5 dropped
- assert len(lines) == 100
- # First retained item should be item-5 (oldest 5 dropped)
- first_item = json.loads(lines[0])
- assert first_item["content"] == "item-5"
- # Last item should be the newest
- last_item = json.loads(lines[-1])
- assert last_item["content"] == "item-104"
+ assert len(list(tmp_path.glob("mcp-*.jsonl"))) == 105
class TestSingleWriterQueue:
@@ -230,7 +214,7 @@ async def test_store_queues_on_busy_error(self, tmp_path):
"""When store_memory raises BusyError, the item is queued to JSONL."""
from brainlayer.mcp.store_handler import _store
- pending_path = tmp_path / "pending-stores.jsonl"
+ queue_dir = tmp_path / "queue"
with (
patch("brainlayer.mcp.store_handler._get_vector_store") as mock_vs,
@@ -238,10 +222,7 @@ async def test_store_queues_on_busy_error(self, tmp_path):
patch("brainlayer.mcp.store_handler._normalize_project_name", return_value="test"),
# store_memory is imported inside _store via `from ..store import store_memory`
patch("brainlayer.store.store_memory", side_effect=apsw.BusyError("locked")),
- patch(
- "brainlayer.mcp.store_handler._get_pending_store_path",
- return_value=pending_path,
- ),
+ patch("brainlayer.queue_io.get_queue_dir", return_value=queue_dir),
):
mock_em.return_value.embed_query.return_value = [0.1] * 1024
@@ -256,11 +237,42 @@ async def test_store_queues_on_busy_error(self, tmp_path):
assert structured["chunk_id"] == "queued"
assert any("queued" in t.text.lower() for t in texts)
- # Should have written to JSONL
- assert pending_path.exists()
- item = json.loads(pending_path.read_text().strip())
+ # Should have written to the unified queue
+ files = list(queue_dir.glob("mcp-*.jsonl"))
+ assert len(files) == 1
+ item = json.loads(files[0].read_text().strip())
+ assert item["kind"] == "store_memory"
assert item["content"] == "test memory"
+ @pytest.mark.asyncio
+ async def test_arbitrated_store_validates_before_queueing(self, tmp_path, monkeypatch):
+ """Arbitrated store should not report queued success for invalid content."""
+ from brainlayer.mcp.store_handler import _store
+
+ monkeypatch.setenv("BRAINLAYER_ARBITRATED", "1")
+ with patch("brainlayer.queue_io.get_queue_dir", return_value=tmp_path):
+ result = await _store(content=" ", memory_type="note", project="test")
+
+ assert result.isError is True
+ assert "content must be non-empty" in result.content[0].text
+ assert not list(tmp_path.glob("mcp-*.jsonl"))
+
+ @pytest.mark.asyncio
+ async def test_arbitrated_store_clears_search_cache(self, tmp_path, monkeypatch):
+ """Queued stores invalidate local search cache even before the drain writes."""
+ from brainlayer.mcp.store_handler import _store
+
+ monkeypatch.setenv("BRAINLAYER_ARBITRATED", "1")
+ with (
+ patch("brainlayer.queue_io.get_queue_dir", return_value=tmp_path),
+ patch("brainlayer.search_repo.clear_hybrid_search_cache") as clear_cache,
+ ):
+ texts, structured = await _store(content="queued cache invalidation", memory_type="note", project="test")
+
+ assert structured["chunk_id"] == "queued"
+ assert any("queued" in item.text.lower() for item in texts)
+ clear_cache.assert_called_once_with()
+
class TestBrainUpdateRetryOnLock:
"""brain_update should retry on BusyError before failing."""