Skip to content

fix: arbitrate brainlayer writes through queue drain#282

Merged
EtanHey merged 5 commits into
mainfrom
fix/brainlayer-writer-arbitration
May 14, 2026
Merged

fix: arbitrate brainlayer writes through queue drain#282
EtanHey merged 5 commits into
mainfrom
fix/brainlayer-writer-arbitration

Conversation

@EtanHey
Copy link
Copy Markdown
Owner

@EtanHey EtanHey commented May 14, 2026

Summary

  • Adds the hybrid writer-arbitration path: non-BrainBar Python producers can enqueue durable JSONL write intents under BRAINLAYER_ARBITRATED=1, while a single drain daemon drains the unified queue with BEGIN IMMEDIATE batches.
  • Adds packaged drain logic plus launchd plists for com.brainlayer.drain, com.brainlayer.watch, and weekly com.brainlayer.repair-fts; installer now loads drain before watch/enrichment and creates required queue/log dirs.
  • Removes startup-time trigram FTS count-check/rebuild from VectorStore._init_db; explicit repair now lives behind BRAINLAYER_REPAIR=1 or brainlayer repair-fts.
  • Extends brainlayer flush to migrate legacy pending-stores.jsonl into the unified queue and drain unified queue files.

Scope notes / deviations

  • The watcher write path in this repo is implemented in watcher_bridge.py, not watcher.py; launchd sets BRAINLAYER_ARBITRATED=1 for the new watch agent.
  • Enrichment still uses VectorStore for read-side candidate selection, but enrichment/meta-research writes enqueue when arbitration is enabled.
  • Hook fallback had already been using ~/.brainlayer/queue; this PR moves hooks/indexer.py to the shared helper and makes the drain support both the legacy hook shape and the new typed queue shape.
  • LOC is larger than the rough estimate because the drain now handles store, watcher, hook, enrichment, legacy pending-store migration, malformed queue records, supersedes, source filename sanitization, launchd setup, and tests.

Tests

  • Initial TDD failure before implementation: pytest tests/test_arbitration.py -q failed with ModuleNotFoundError: No module named 'scripts.drain_daemon'.
  • Required 3 consecutive 3-writer load test runs after final fixes:
    • 1 passed in 1.89s
    • 1 passed in 1.72s
    • 1 passed in 1.78s
  • Focused regression: pytest tests/test_arbitration.py tests/test_write_queue.py tests/test_search_trigram_fts.py -q -> 18 passed in 6.04s.
  • Lint on touched files: ruff check ... -> All checks passed!.
  • Full gate before commit: ./scripts/run_tests.sh -> BrainLayer test gate passed with 1829 passed, 9 skipped, 75 deselected, 1 xfailed, 102 warnings, plus MCP registration, isolated eval/hook routing, bun, and FTS regression passed.
  • Pre-push gate reran: ./scripts/run_tests.sh -> BrainLayer test gate passed with 1829 passed, 9 skipped, 75 deselected, 1 xfailed, 102 warnings, plus MCP registration, isolated eval/hook routing, bun, and FTS regression passed.

Review

  • Local CodeRabbit completed once and reported actionable issues; this PR fixes source filename sanitization, arbitrated supersedes, log directory creation, PYTHON_BIN validation, enqueue error propagation, malformed queue handling, and the drain file-deletion lock race.
  • A second local CodeRabbit run stalled during setup for >6 minutes after those fixes; GitHub CodeRabbit should run fresh on this PR.

Note

Medium Risk
Changes core write paths to optionally enqueue and drain SQLite writes; misconfiguration (e.g., drain daemon not running) could cause write backlog or delayed embeddings/FTS repair.

Overview
Introduces a unified durable per-file JSONL write queue (via queue_io.py) and a new single-writer drain implementation (drain.py) that applies queued store/watcher/hook/enrichment events under BEGIN IMMEDIATE, with retries, poison-file quarantine, collision logging, and optional post-drain embedding.

Updates background producers to support arbitration mode (BRAINLAYER_ARBITRATED=1): MCP brain_store can validate then enqueue (and still falls back to legacy pending-stores.jsonl on queue failure), the watcher and enrichment paths enqueue writes instead of writing directly, and brainlayer flush migrates legacy pending stores into the new queue and drains it.

Removes automatic trigram FTS backfill/repair from VectorStore startup, replacing it with an explicit brainlayer repair-fts command and a VectorStore.repair_fts() method (also gated by BRAINLAYER_REPAIR=1), plus new/updated launchd plists and installer support for watch, drain, and weekly repair-fts jobs; adds documentation and tests covering concurrency, embedding, collisions, and migration behavior.

Reviewed by Cursor Bugbot for commit 64110e7. Bugbot is set up for automated code reviews on this repo. Configure here.

Note

Arbitrate BrainLayer DB writes through a durable single-writer queue and drain daemon

  • Introduces a per-event JSONL queue (queue_io.py) and a drain daemon (drain.py) so all DB writes (MCP store, watcher, hooks, enrichment) are serialized through a single writer instead of racing directly against SQLite.
  • The drain daemon processes queued events transactionally with exponential backoff on busy errors, handles collisions and supersedes, and optionally embeds vectors post-commit.
  • A launchd plist (com.brainlayer.drain.plist) runs the drain loop every 500 ms; producers gate on BRAINLAYER_ARBITRATED=1 to switch from direct writes to enqueuing.
  • Trigram FTS rebuild is removed from startup and moved to an explicit brainlayer repair-fts CLI command and a weekly launchd job (com.brainlayer.repair-fts.plist).
  • brainlayer flush now migrates legacy pending-stores.jsonl entries into the unified queue and drains in-process.
  • Risk: any process with BRAINLAYER_ARBITRATED=1 no longer writes to the DB synchronously; writes are lost if the queue directory is unavailable and enqueue throws.

Macroscope summarized 64110e7.

Summary by CodeRabbit

  • New Features

    • Durable JSONL write queue and queued-mode for background writers (opt-in via env).
    • Background drain daemon to process queued operations and optional embedding.
    • New CLI command to run explicit FTS repair and expanded flush migration for legacy pending stores.
    • Enhanced install script with new service subcommands and multiple launchd service configs.
  • Bug Fixes / Reliability

    • Improved DB-lock resilience and safer processing (quarantining/bad-file handling, retries).
  • Tests

    • Expanded tests for concurrency, queue integrity, embedding, and repair workflows.
  • Docs

    • Added detailed arbitration documentation and README updates.

Review Change Stack

Copy link
Copy Markdown

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@EtanHey
Copy link
Copy Markdown
Owner Author

EtanHey commented May 14, 2026

@codex review
@cursor review
@BugBot review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 14, 2026

📝 Walkthrough

Walkthrough

This PR adds a durable JSONL write-arbitration queue and a single-writer drain daemon that applies queued events into SQLite under exclusive locks; arbitration is enabled via BRAINLAYER_ARBITRATED and integrated into enrichment, MCP, watcher, and hook paths, plus launchd and CLI support.

Changes

Write Arbitration Queue and Drain Integration

Layer / File(s) Summary
Queue I/O Foundation
src/brainlayer/queue_io.py
Atomic JSONL write module with source sanitization, configurable queue directory, and event-type wrappers (enqueue_store, enqueue_watcher_chunk, enqueue_hook_chunk, enqueue_enrichment_update).
Drain Loop and Queue Processing
src/brainlayer/drain.py
Single-writer daemon processes batched JSONL files under exclusive lock, normalizes events by kind, applies to SQLite inside IMMEDIATE transactions, handles supersession and vector-table cleanup, and deletes drained files on success.
CLI Commands and Flush Migration
src/brainlayer/cli/__init__.py
New repair-fts command rebuilds trigram index; flush refactored to migrate pending-stores.jsonl entries to unified queue then drain until empty.
Vector Store FTS Repair
src/brainlayer/vector_store.py
Removes automatic trigram repair on startup; adds explicit repair_fts() method and optional BRAINLAYER_REPAIR env-gated repair during init.
Enrichment Controller Arbitration
src/brainlayer/enrichment_controller.py
Routes enrichment/meta-research updates through queue when BRAINLAYER_ARBITRATED=1 via internal _enqueue_enrichment_write and _enqueue_meta_research_write helpers; skips column setup when arbitrated; affects enrich_single, enrich_realtime, enrich_batch.
MCP Store Handler Arbitration
src/brainlayer/mcp/store_handler.py
Unified queue fast-path in _queue_store with fallback to pending-stores.jsonl; early-exit in _store for BRAINLAYER_ARBITRATED=1 enqueues and returns "queued" response.
Watcher and Hook Chunk Arbitration
src/brainlayer/watcher_bridge.py, src/brainlayer/hooks/indexer.py
Watcher bridge conditionally enqueues chunks via enqueue_watcher_chunk when arbitrated; hook indexer defaults queue directory and replaces manual JSONL writes with enqueue_hook_chunk calls.
launchd Daemon Integration
scripts/drain_daemon.py, scripts/launchd/*.plist, scripts/launchd/install.sh
New drain daemon entrypoint, plist configs for drain/watch/repair-fts jobs, and updated install script with Python binary resolution, queue/log directory setup, and CLI subcommands.
Queue and Drain Testing
tests/test_arbitration.py, tests/test_search_trigram_fts.py, tests/test_write_queue.py
New concurrent producer/drain integration tests; trigram test updated to assert explicit repair only; MCP queue tests rewritten for unified queue validation.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • EtanHey/brainlayer#234: Related work on routing enrichment/meta-research write operations through a queued single-writer mechanism.
  • EtanHey/brainlayer#65: Related MCP store-handler changes for queuing/lock-resilient store behavior and tests.
  • EtanHey/brainlayer#86: Related hook/indexer fallback-to-queue behavior for realtime hook writes.

Poem

🐰 I nibble lines and stamp them true,
I queue each note the whole night through,
A drain at dawn writes all anew,
With tidy hashes, IDs too —
The warren hums, the data grew.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 28.38% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: introducing write arbitration through a queue drain mechanism, which is the primary objective of this PR.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/brainlayer-writer-arbitration

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 6d13e57008

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

"tags": tags,
"importance": importance,
"supersedes": supersedes,
"metadata": {key: value for key, value in metadata.items() if value is not None},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve entity links when draining stores

When BRAINLAYER_ARBITRATED=1, brain_store(entity_id=...) now goes through this queue path, but entity_id is only serialized into the chunk metadata and _apply_store never writes kg_entity_chunks or validates that the entity exists. That regresses the documented per-person memory contract: brain_get_person/entity-scoped searches read kg_entity_chunks, so memories stored with an entity in arbitrated mode become invisible to those flows even though the tool reports success.

Useful? React with 👍 / 👎.

Comment thread src/brainlayer/drain.py Outdated
Comment on lines +104 to +106
for vector_table in ("chunk_vectors_dense", "chunk_vectors_binary"):
if conn.execute("SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,)).fetchone():
conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (supersedes,))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Delete superseded vectors from the actual vector table

For queued brain_store(..., supersedes=...) writes, this loop never removes the old dense vector because the schema uses chunk_vectors, not chunk_vectors_dense (the existing VectorStore._delete_chunk_vector deletes chunk_vectors and chunk_vectors_binary). The old superseded chunk can therefore remain in the KNN table and consume vector candidates before the superseded_by IS NULL filter removes it, degrading retrieval after supersedes in arbitrated mode.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Low

fts_count = list(cursor.execute("SELECT COUNT(*) FROM chunks_fts"))[0][0]

When chunks_fts_trigram is first created on an existing database (lines 280–285), existing chunks are never indexed because the backfill at lines 925–931 only populates chunks_fts. Trigram searches will silently return empty results for all pre-existing data unless the user manually sets BRAINLAYER_REPAIR=1. Consider adding chunks_fts_trigram to the same backfill block, or creating it with the same IF NOT EXISTS pattern that prevents empty tables from going unnoticed.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file src/brainlayer/vector_store.py around line 925:

When `chunks_fts_trigram` is first created on an existing database (lines 280–285), existing chunks are never indexed because the backfill at lines 925–931 only populates `chunks_fts`. Trigram searches will silently return empty results for all pre-existing data unless the user manually sets `BRAINLAYER_REPAIR=1`. Consider adding `chunks_fts_trigram` to the same backfill block, or creating it with the same `IF NOT EXISTS` pattern that prevents empty tables from going unnoticed.

Evidence trail:
src/brainlayer/vector_store.py lines 280-285: `chunks_fts_trigram` created with `IF NOT EXISTS`.
src/brainlayer/vector_store.py lines 925-931: backfill only populates `chunks_fts`, no backfill for `chunks_fts_trigram`.
src/brainlayer/vector_store.py lines 306-318: triggers for `chunks_fts_trigram_insert` only fire on new inserts.
src/brainlayer/vector_store.py lines 940-950: `repair_fts` method rebuilds `chunks_fts_trigram` but is only called when `BRAINLAYER_REPAIR=1` (line 367).

Comment thread src/brainlayer/drain.py
Comment thread src/brainlayer/queue_io.py
Comment thread src/brainlayer/enrichment_controller.py
Comment thread src/brainlayer/drain.py Outdated
Copy link
Copy Markdown

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/brainlayer/cli/__init__.py`:
- Around line 1057-1064: The repair_fts CLI command currently instantiates
VectorStore with DEFAULT_DB_PATH which bypasses runtime DB path resolution;
change it to call get_db_path() from paths (import get_db_path) and pass that
result into VectorStore so the command uses the configured runtime database path
instead of the hardcoded DEFAULT_DB_PATH; update the import and the
VectorStore(DEFAULT_DB_PATH) usage in repair_fts accordingly.
- Around line 2097-2140: The code is hardcoding DEFAULT_DB_PATH for
migration/drain; change path resolution to use get_db_path() from ..paths
everywhere: replace queue_path = DEFAULT_DB_PATH.parent / "pending-stores.jsonl"
with queue_path = get_db_path().parent / "pending-stores.jsonl", and pass
db_path=get_db_path() into drain_once (and any other calls that currently use
DEFAULT_DB_PATH); ensure get_db_path is imported from ..paths at the top of the
file and leave enqueue_store and get_queue_dir usage unchanged.
- Around line 2109-2113: Wrap the json.loads(line) call in a try/except that
catches JSONDecodeError (json.decoder.JSONDecodeError) so a single malformed
JSONL row does not abort the whole flush migration; on decode error increment
the existing skipped counter (and optionally log/debug the bad line) and
continue to the next line, then proceed with the existing content =
item.get("content") / if not content handling as before.

In `@src/brainlayer/drain.py`:
- Around line 21-22: Replace the duplicated DB path resolution in
_default_db_path with a call to the shared helper: import and return
get_db_path() from paths.py instead of reconstructing Path(os.environ.get(...)).
Update the top of the file to import get_db_path and change the _default_db_path
function to simply return get_db_path() so all DB path logic is centralized in
get_db_path().
- Around line 140-168: In _apply_hook, the timestamp parsing uses
float(event.get("timestamp") or time.time()) which can raise ValueError for
non-numeric strings; instead, first extract ts_raw = event.get("timestamp"), try
to convert ts = float(ts_raw) in a try/except (catch ValueError/TypeError) and
fall back to ts = time.time() on error or None, optionally logging a warning
about the invalid timestamp, then use datetime.fromtimestamp(ts,
timezone.utc).isoformat() when building the chunk payload.
- Around line 237-282: The drain_once function currently aborts on any exception
(including sqlite3.OperationalError(SQLITE_BUSY)) during BEGIN IMMEDIATE and
returns 0; change it to detect SQLITE_BUSY (sqlite3.OperationalError with
"database is locked" or sqlite3.busy_error) and retry the transaction with
exponential backoff (e.g., backoff loop with sleep increasing and a max
attempts) when that specific error occurs while still rolling back on other
exceptions; keep using a fresh sqlite3.connect per drain_once invocation,
perform BEGIN IMMEDIATE and the read/apply loop (_read_events, _apply_event)
inside the retryable block, log retry attempts and final failures via
_log(log_path, ...), and only return 0 after exhausting retries or on
non-SQLITE_BUSY exceptions.

In `@src/brainlayer/enrichment_controller.py`:
- Around line 667-668: The code currently skips calling
_ensure_enrichment_columns(store) when _arbitrated_writes_enabled() is true,
which allows _is_duplicate_content() to read chunks.content_hash on DBs that
lack the column and silently disable deduping; fix this by guaranteeing the
content_hash schema is bootstrapped before any dedup reads: either call
_ensure_enrichment_columns(store) unconditionally at process startup or move the
call into the single-writer path (the same place where arbitration is
initialized) so that _ensure_enrichment_columns(store) runs before any
_is_duplicate_content() calls; make the change around the sites referencing
_arbitrated_writes_enabled() (the checks that currently skip
_ensure_enrichment_columns) and add an explicit comment/flag describing the
DB/schema-risk and that this is intentionally run in the single-writer/migration
path.

In `@src/brainlayer/mcp/store_handler.py`:
- Around line 503-526: The arbitrated fast path currently calls
enqueue_store(...) directly which can fail and bypass the durable fallback in
_queue_store; replace the direct call with a call to _queue_store(...) using the
same arguments (include supersedes, source="mcp", and all other kwargs like
content, memory_type, project, tags, importance, confidence_score, outcome,
reversibility, files_changed, entity_id, status, severity, file_path,
function_name, line_number) so the arbitrated branch reuses the durable
queuing/lock behavior; keep the existing return path that constructs structured
= {"chunk_id": "queued", "related": []} and returns ([TextContent(type="text",
text=format_store_result("queued", queued=True))], structured) after
_queue_store completes.

In `@src/brainlayer/vector_store.py`:
- Around line 939-950: The repair_fts function performs large delete/insert
writes and must be made contention-safe: wrap the delete/insert in an explicit
transaction (e.g., BEGIN IMMEDIATE / COMMIT) using the connection on the current
worker (ensure each worker uses its own sqlite3 connection), run PRAGMA
wal_checkpoint(FULL) before starting and after finishing the bulk writes, and
add a SQLITE_BUSY retry/backoff loop around the entire transaction (retry on
sqlite3.OperationalError with "database is locked" or SQLITE_BUSY, with a short
exponential backoff and a bounded number of attempts) so the cursor.execute
calls inside repair_fts (the DELETE, INSERT, and COUNT statements) are retried
atomically until success or max attempts. Ensure you call conn.commit() on
success and rollback on failure.

In `@tests/test_arbitration.py`:
- Around line 116-118: Add an assertion that the sanitized filename matches the
expected safe format: use queued_path.name and assert it fully matches a
whitelist regex (e.g., import re and assert re.fullmatch(r'^[A-Za-z0-9_.-]+$',
queued_path.name)) to ensure unsafe characters are replaced with allowed
characters; optionally also assert an exact sanitized string if you know the
replacement rules.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 0c32d0d9-9e8b-42c4-97d4-df734afa2085

📥 Commits

Reviewing files that changed from the base of the PR and between 353ab3d and 6d13e57.

📒 Files selected for processing (17)
  • scripts/drain_daemon.py
  • scripts/launchd/com.brainlayer.drain.plist
  • scripts/launchd/com.brainlayer.enrichment.plist
  • scripts/launchd/com.brainlayer.repair-fts.plist
  • scripts/launchd/com.brainlayer.watch.plist
  • scripts/launchd/install.sh
  • src/brainlayer/cli/__init__.py
  • src/brainlayer/drain.py
  • src/brainlayer/enrichment_controller.py
  • src/brainlayer/hooks/indexer.py
  • src/brainlayer/mcp/store_handler.py
  • src/brainlayer/queue_io.py
  • src/brainlayer/vector_store.py
  • src/brainlayer/watcher_bridge.py
  • tests/test_arbitration.py
  • tests/test_search_trigram_fts.py
  • tests/test_write_queue.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Cursor Bugbot
  • GitHub Check: Macroscope - Correctness Check
  • GitHub Check: test (3.11)
  • GitHub Check: test (3.13)
  • GitHub Check: test (3.12)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

**/*.py: Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior
Enforce one-write-at-a-time concurrency constraint; reads are safe but brain_digest is write-heavy and must not run in parallel with other MCP work
Run pytest before claiming behavior changed safely; current test suite has 929 tests

**/*.py: Use paths.py:get_db_path() for all database path resolution; all scripts and CLI must use this function rather than hardcoding paths
When performing bulk database operations: stop enrichment workers first, checkpoint WAL before and after, drop FTS triggers before bulk deletes, batch deletes in 5-10K chunks, and checkpoint every 3 batches

Files:

  • tests/test_arbitration.py
  • scripts/drain_daemon.py
  • tests/test_write_queue.py
  • src/brainlayer/watcher_bridge.py
  • src/brainlayer/hooks/indexer.py
  • src/brainlayer/mcp/store_handler.py
  • src/brainlayer/queue_io.py
  • tests/test_search_trigram_fts.py
  • src/brainlayer/enrichment_controller.py
  • src/brainlayer/drain.py
  • src/brainlayer/cli/__init__.py
  • src/brainlayer/vector_store.py
src/brainlayer/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/brainlayer/**/*.py: Use retry logic on SQLITE_BUSY errors; each worker must use its own database connection to handle concurrency safely
Classification must preserve ai_code, stack_trace, and user_message verbatim; skip noise entries entirely and summarize build_log and dir_listing entries (structure only)
Use AST-aware chunking via tree-sitter; never split stack traces; mask large tool output
For enrichment backend selection: use Groq as primary backend (cloud, configured in launchd plist), Gemini as fallback via enrichment_controller.py, and Ollama as offline last-resort; allow override via BRAINLAYER_ENRICH_BACKEND env var
Configure enrichment rate via BRAINLAYER_ENRICH_RATE environment variable (default 0.2 = 12 RPM)
Implement chunk lifecycle columns: superseded_by, aggregated_into, archived_at on chunks table; exclude lifecycle-managed chunks from default search; allow include_archived=True to show history
Implement brain_supersede with safety gate for personal data (journals, notes, health/finance); use soft-delete for brain_archive with timestamp
Add supersedes parameter to brain_store for atomic store-and-replace operations
Run linting and formatting with: ruff check src/ && ruff format src/
Run tests with pytest
Use PRAGMA wal_checkpoint(FULL) before and after bulk database operations to prevent WAL bloat

Files:

  • src/brainlayer/watcher_bridge.py
  • src/brainlayer/hooks/indexer.py
  • src/brainlayer/mcp/store_handler.py
  • src/brainlayer/queue_io.py
  • src/brainlayer/enrichment_controller.py
  • src/brainlayer/drain.py
  • src/brainlayer/cli/__init__.py
  • src/brainlayer/vector_store.py
🪛 OpenGrep (1.20.0)
src/brainlayer/drain.py

[ERROR] 41-41: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.

(coderabbit.sql-injection.python-fstring-execute)


[ERROR] 106-106: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.

(coderabbit.sql-injection.python-fstring-execute)


[ERROR] 210-210: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.

(coderabbit.sql-injection.python-fstring-execute)

🔇 Additional comments (17)
tests/test_arbitration.py (1)

59-90: LGTM!

tests/test_search_trigram_fts.py (1)

64-85: LGTM!

tests/test_write_queue.py (1)

29-61: LGTM!

Also applies to: 213-246

src/brainlayer/enrichment_controller.py (1)

171-197: LGTM!

Also applies to: 676-679, 720-723, 855-860, 869-876, 928-933, 964-971

src/brainlayer/mcp/store_handler.py (1)

392-399: LGTM!

src/brainlayer/watcher_bridge.py (1)

16-16: LGTM!

Also applies to: 26-26, 207-208, 215-317

src/brainlayer/hooks/indexer.py (1)

21-22: LGTM!

Also applies to: 31-31, 208-215

src/brainlayer/queue_io.py (4)

13-22: LGTM!


25-40: LGTM!


43-103: LGTM!


106-147: LGTM!

src/brainlayer/drain.py (6)

33-41: LGTM!


44-54: LGTM!


57-107: LGTM!


171-223: LGTM!


226-234: LGTM!


285-302: LGTM!

Comment thread src/brainlayer/cli/__init__.py
Comment thread src/brainlayer/cli/__init__.py Outdated
Comment thread src/brainlayer/cli/__init__.py Outdated
Comment thread src/brainlayer/drain.py Outdated
Comment thread src/brainlayer/drain.py Outdated
Comment thread src/brainlayer/drain.py
Comment thread src/brainlayer/enrichment_controller.py Outdated
Comment thread src/brainlayer/vector_store.py Outdated
Comment thread tests/test_arbitration.py
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 41917c2975

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

source="mcp",
)
structured = {"chunk_id": "queued", "related": []}
return ([TextContent(type="text", text=format_store_result("queued", queued=True))], structured)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Run the post-store pipeline for queued memories

When BRAINLAYER_ARBITRATED=1, this early return reports success before the non-arbitrated path starts the background worker that calls embed_pending_chunks and enrich_single. The new drain path only inserts the row and sets enriched_at immediately, so queued brain_store memories never receive chunk_vectors and are also excluded from the normal enrichment candidate query (enriched_at IS NULL), leaving them permanently missing from semantic recall and enrichment-derived tags/entities unless an operator runs a separate manual backfill.

Useful? React with 👍 / 👎.

Comment thread src/brainlayer/drain.py
Copy link
Copy Markdown

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 139ff034af

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/brainlayer/drain.py
if conn.execute(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,)
).fetchone():
conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (supersedes,))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Load sqlite-vec before deleting superseded vectors

When an arbitrated brain_store(..., supersedes=...) is drained against a normal BrainLayer DB, chunk_vectors is the vec0 virtual table created by VectorStore, but this drain path opens the DB with stdlib sqlite3 and never loads the sqlite-vec extension. This DELETE FROM chunk_vectors raises OperationalError: no such module: vec0, causing drain_once to roll back, return 0, and leave the same queue file to poison subsequent drain batches.

Useful? React with 👍 / 👎.

Comment thread src/brainlayer/drain.py Outdated
Comment thread src/brainlayer/drain.py Outdated
)


def _apply_hook(conn: sqlite3.Connection, event: dict[str, Any]) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Low brainlayer/drain.py:156

When event["timestamp"] is a string that parses as float("inf") or an extremely large number (e.g., 1e50), the float() conversion on line 170 succeeds, but datetime.fromtimestamp() on line 186 raises OSError or OverflowError — neither of which are caught. This crashes the entire batch instead of falling back to the current time.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file src/brainlayer/drain.py around line 156:

When `event["timestamp"]` is a string that parses as `float("inf")` or an extremely large number (e.g., `1e50`), the `float()` conversion on line 170 succeeds, but `datetime.fromtimestamp()` on line 186 raises `OSError` or `OverflowError` — neither of which are caught. This crashes the entire batch instead of falling back to the current time.

Evidence trail:
src/brainlayer/drain.py lines 156-190: `_apply_hook` function with try/except catching only TypeError/ValueError at float() conversion (lines ~170-173), then calling `datetime.fromtimestamp(timestamp, timezone.utc)` unprotected (line ~186). src/brainlayer/drain.py lines 296-327: outer drain loop where uncaught exceptions cause `conn.rollback()` and batch failure.

Comment thread src/brainlayer/drain.py Outdated
Copy link
Copy Markdown

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 922aafd18d

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +503 to +525
if os.environ.get("BRAINLAYER_ARBITRATED") == "1":
_queue_store(
{
"content": content,
"memory_type": memory_type,
"project": _normalize_project_name(project),
"tags": tags,
"importance": importance,
"confidence_score": confidence_score,
"outcome": outcome,
"reversibility": reversibility,
"files_changed": files_changed,
"entity_id": entity_id,
"status": status,
"severity": severity,
"file_path": file_path,
"function_name": function_name,
"line_number": line_number,
"supersedes": supersedes,
}
)
structured = {"chunk_id": "queued", "related": []}
return ([TextContent(type="text", text=format_store_result("queued", queued=True))], structured)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Validate content before enqueueing stores

When BRAINLAYER_ARBITRATED=1, this branch bypasses the store_memory validation that rejects blank content and system-prompt-looking content. The drain path only skips blank content after the tool has already returned a queued success, and it does not run the system-prompt filter at all, so arbitrated brain_store can either report a successful write that is silently dropped or persist content the normal path intentionally blocks.

Useful? React with 👍 / 👎.

Comment on lines +524 to +525
structured = {"chunk_id": "queued", "related": []}
return ([TextContent(type="text", text=format_store_result("queued", queued=True))], structured)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Clear cached search results for queued stores

In arbitrated mode the MCP process returns after enqueueing without invalidating the in-process hybrid search cache, unlike the normal store_memory path which clears it after writes. If a user searches for a query, then stores a matching memory and the drain writes it, repeating the same brain_search in the same MCP process can keep returning the pre-store cached results for up to the cache TTL, making the newly stored memory appear missing.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/brainlayer/mcp/store_handler.py (1)

392-422: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Serialize the legacy fallback queue writes.

If enqueue_store(...) fails, this falls back to pending-stores.jsonl, but the append/trim/rewrite path is still unlocked. Under concurrent MCP callers, one fallback writer can rename a trimmed file over another writer’s append and drop queued stores. The fallback needs a file lock around the full write+trim sequence, or it is not durable under the same failure that triggered the fallback.

As per coding guidelines, "Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/brainlayer/mcp/store_handler.py` around lines 392 - 422, The fallback
writing to pending-stores.jsonl must be serialized: wrap the entire append +
read/trim/rewrite sequence with a file lock to prevent concurrent writers from
losing entries; after computing path = _get_pending_store_path() and ensuring
path.parent exists, obtain an exclusive lock (e.g., on path or
path.with_suffix(".lock")) before opening for append, writing json.dumps(item) +
"\n", reading path.read_text(), trimming to _QUEUE_MAX_SIZE, writing to a
temporary file and renaming, and always release the lock in a finally block;
update the code around enqueue_store, path, and _QUEUE_MAX_SIZE and preserve
existing logging (logger.debug/warning) for errors.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/brainlayer/cli/__init__.py`:
- Around line 2120-2133: The current chunk_id generation uses a truncated SHA256
of stable_payload built from only content, memory_type, project, tags, and
importance, causing distinct legacy rows to collide; update the stable_payload
used for the hash (the object built just before chunk_id) to include all unique
legacy identifying fields from item (e.g., severity, file_path, line_number,
entity_id and any other provenance fields present on item) so the hash covers
those attributes before computing chunk_id for the chunk_id construction code
that references stable_payload and item.

In `@src/brainlayer/drain.py`:
- Around line 416-418: Currently the code calls _embed_store_chunks(conn,
store_chunk_ids, embed_fn) while the transaction is still open, holding the
SQLite write lock; change the order so the transaction is committed before
invoking the embedder: move or duplicate conn.execute("COMMIT") to occur before
calling _embed_store_chunks (i.e., call conn.execute("COMMIT") when should_embed
is true and only then call _embed_store_chunks), ensuring the BEGIN
IMMEDIATE/transaction is closed before any slow/remote embedding work; update
any surrounding logic that assumes conn is usable after commit and add a brief
comment flagging this as a risky DB/concurrency change.
- Around line 107-145: The code currently does float(event["importance"]) during
chunk insertion which will raise on malformed input and abort the drain; update
the logic around the "importance" field (in the block that builds the dict for
_insert_chunk in drain.py) to coerce defensively: attempt to parse
event.get("importance") inside a try/except catching ValueError/TypeError and on
failure set importance to None (or simply omit the key) so a bad value won't
throw and roll back the transaction; ensure this change is applied where
_insert_chunk is called and does not re-raise so only the bad event is skipped
or inserted with no importance instead of poisoning the queue.

In `@src/brainlayer/enrichment_controller.py`:
- Around line 171-181: The queued enrichment helper _enqueue_enrichment_write
currently drops the chunk's storage context so the drain cannot pick the correct
SQLite DB; modify the call to enqueue_enrichment_update to pass through the
store/db_path from the chunk (e.g., chunk.get("store") or chunk.get("db_path"))
alongside chunk_id, enrichment, content_hash and entities so the queued intent
includes the target DB; update any callers or the enqueue_enrichment_update
signature if needed to accept and forward this store/db_path field to the drain
to ensure writes go to the originating store.

In `@src/brainlayer/mcp/store_handler.py`:
- Around line 503-525: The queued-item path must preserve the supersedes field
so a fallback replay preserves atomic replace; update the replay path in
_flush_pending_stores to read the stored record's "supersedes" and pass it
through to store_memory (and any intermediate call like brain_store/store_memory
wrappers) when replaying pending-stores.jsonl entries; ensure function
signatures used in replay accept and forward supersedes (e.g., store_memory,
brain_store) so the atomic store-and-replace semantics are retained on failure
paths.

In `@src/brainlayer/queue_io.py`:
- Around line 117-125: The code currently sets the "timestamp" field using
"timestamp or time.time()", which treats 0 (and other falsy but valid values) as
missing; change this to use an explicit None check so falsy timestamps like 0
are preserved (e.g., set "timestamp" to timestamp if timestamp is not None else
time.time()); update the dict passed to enqueue_jsonl (the "timestamp" key)
accordingly so enqueue_jsonl receives the correct timestamp value.
- Around line 36-39: The current write uses tmp_path.write_text(...) then
tmp_path.replace(final_path) which can lose data on crash because neither the
file contents nor the containing directory are fsynced; modify the sequence
around tmp_path and final_path so you open the temp file (e.g.
tmp_path.open("wb")), write bytes, call file.flush() and
os.fsync(file.fileno()), then atomically move the temp to final (os.replace or
tmp_path.replace), and afterward open the parent directory
(os.open(final_path.parent, os.O_DIRECTORY)) and call os.fsync(dir_fd) before
closing it; update the code paths that currently call tmp_path.write_text and
tmp_path.replace to use this safe write+fsync+dir fsync pattern.

In `@tests/test_arbitration.py`:
- Around line 343-348: The test currently creates stale_lock then immediately
unlinks it so drain_once never sees a sentinel; remove the premature deletion
and let drain_once handle the stale file: create stale_lock = queue_dir /
".drain.lock", write the sentinel, do not call stale_lock.unlink() before
invoking drain_once(db_path=db_path, queue_dir=queue_dir), then assert
drain_once(...) == 1 and assert not stale_lock.exists() afterwards so the test
verifies that drain_once detects and removes a stale .drain.lock; reference the
stale_lock variable and the drain_once(...) call to locate the change.
- Around line 82-87: The helper _connect_apsw currently returns a raw
apsw.Connection which used in tests as "with _connect_apsw(...) as conn:" only
manages transactions and does not close the connection, leaking SQLite handles;
change _connect_apsw into a proper closing context manager (e.g., use
contextlib.contextmanager or return an object with __enter__/__exit__) that
creates the apsw.Connection, enables/loads extensions as before, yields the
connection, and always calls conn.close() in the exit/finally path; update any
helper callers like _create_vec_db (and all tests already using "with
_connect_apsw(...) as conn:") to rely on the new context manager so connections
are closed after each with-block.

---

Outside diff comments:
In `@src/brainlayer/mcp/store_handler.py`:
- Around line 392-422: The fallback writing to pending-stores.jsonl must be
serialized: wrap the entire append + read/trim/rewrite sequence with a file lock
to prevent concurrent writers from losing entries; after computing path =
_get_pending_store_path() and ensuring path.parent exists, obtain an exclusive
lock (e.g., on path or path.with_suffix(".lock")) before opening for append,
writing json.dumps(item) + "\n", reading path.read_text(), trimming to
_QUEUE_MAX_SIZE, writing to a temporary file and renaming, and always release
the lock in a finally block; update the code around enqueue_store, path, and
_QUEUE_MAX_SIZE and preserve existing logging (logger.debug/warning) for errors.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 03fa3947-adb4-44a8-accd-dad769e65d7d

📥 Commits

Reviewing files that changed from the base of the PR and between 6d13e57 and 922aafd.

📒 Files selected for processing (9)
  • README.md
  • docs/arbitration.md
  • src/brainlayer/cli/__init__.py
  • src/brainlayer/drain.py
  • src/brainlayer/enrichment_controller.py
  • src/brainlayer/mcp/store_handler.py
  • src/brainlayer/queue_io.py
  • src/brainlayer/vector_store.py
  • tests/test_arbitration.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: test (3.13)
  • GitHub Check: test (3.12)
  • GitHub Check: test (3.11)
  • GitHub Check: Macroscope - Correctness Check
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

**/*.py: Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior
Enforce one-write-at-a-time concurrency constraint; reads are safe but brain_digest is write-heavy and must not run in parallel with other MCP work
Run pytest before claiming behavior changed safely; current test suite has 929 tests

**/*.py: Use paths.py:get_db_path() for all database path resolution; all scripts and CLI must use this function rather than hardcoding paths
When performing bulk database operations: stop enrichment workers first, checkpoint WAL before and after, drop FTS triggers before bulk deletes, batch deletes in 5-10K chunks, and checkpoint every 3 batches

Files:

  • src/brainlayer/vector_store.py
  • src/brainlayer/mcp/store_handler.py
  • src/brainlayer/enrichment_controller.py
  • tests/test_arbitration.py
  • src/brainlayer/queue_io.py
  • src/brainlayer/cli/__init__.py
  • src/brainlayer/drain.py
src/brainlayer/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/brainlayer/**/*.py: Use retry logic on SQLITE_BUSY errors; each worker must use its own database connection to handle concurrency safely
Classification must preserve ai_code, stack_trace, and user_message verbatim; skip noise entries entirely and summarize build_log and dir_listing entries (structure only)
Use AST-aware chunking via tree-sitter; never split stack traces; mask large tool output
For enrichment backend selection: use Groq as primary backend (cloud, configured in launchd plist), Gemini as fallback via enrichment_controller.py, and Ollama as offline last-resort; allow override via BRAINLAYER_ENRICH_BACKEND env var
Configure enrichment rate via BRAINLAYER_ENRICH_RATE environment variable (default 0.2 = 12 RPM)
Implement chunk lifecycle columns: superseded_by, aggregated_into, archived_at on chunks table; exclude lifecycle-managed chunks from default search; allow include_archived=True to show history
Implement brain_supersede with safety gate for personal data (journals, notes, health/finance); use soft-delete for brain_archive with timestamp
Add supersedes parameter to brain_store for atomic store-and-replace operations
Run linting and formatting with: ruff check src/ && ruff format src/
Run tests with pytest
Use PRAGMA wal_checkpoint(FULL) before and after bulk database operations to prevent WAL bloat

Files:

  • src/brainlayer/vector_store.py
  • src/brainlayer/mcp/store_handler.py
  • src/brainlayer/enrichment_controller.py
  • src/brainlayer/queue_io.py
  • src/brainlayer/cli/__init__.py
  • src/brainlayer/drain.py
🪛 OpenGrep (1.20.0)
src/brainlayer/drain.py

[ERROR] 72-72: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.

(coderabbit.sql-injection.python-fstring-execute)


[ERROR] 144-144: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.

(coderabbit.sql-injection.python-fstring-execute)


[ERROR] 271-271: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.

(coderabbit.sql-injection.python-fstring-execute)

🔇 Additional comments (1)
tests/test_arbitration.py (1)

220-234: ⚡ Quick win

The polling loop after drain_once is necessary due to WAL visibility timing, not async embedding writes.

drain_once commits embeddings synchronously within its transaction (line 417–418 of drain.py) before returning. The _embed_store_chunks() call executes INSERT statements directly on the connection before COMMIT, so embeddings are part of the committed WAL transaction, not fire-and-forget or asynchronous.

The 2-second polling loop is likely defensive code accounting for WAL checkpoint visibility behavior when mixing sqlite3.connect() (used in test setup) with APSW connections (used for queries). Similar polling appears in other tests like test_drain_daemon_serializes_three_concurrent_producers. If the polling is consistently unnecessary in CI, consider removing it; if it occasionally times out on slow machines, document the WAL visibility behavior instead.

Comment on lines +2120 to +2133
stable_payload = {
"content": content,
"memory_type": item.get("memory_type", "note"),
"project": item.get("project"),
"tags": item.get("tags"),
"importance": item.get("importance"),
}
chunk_id = (
item.get("chunk_id")
or "pending-"
+ hashlib.sha256(
json.dumps(stable_payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
).hexdigest()[:16]
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Legacy migration ID hash can collapse distinct records.

Line 2120–Line 2133 builds chunk_id from only five fields. Distinct legacy rows (e.g., different severity, file_path, line_number, entity_id) can map to the same ID and be merged/dropped downstream.

Suggested fix
-                stable_payload = {
-                    "content": content,
-                    "memory_type": item.get("memory_type", "note"),
-                    "project": item.get("project"),
-                    "tags": item.get("tags"),
-                    "importance": item.get("importance"),
-                }
+                stable_payload = {
+                    "content": content,
+                    "memory_type": item.get("memory_type", "note"),
+                    "project": item.get("project"),
+                    "tags": item.get("tags"),
+                    "importance": item.get("importance"),
+                    "confidence_score": item.get("confidence_score"),
+                    "outcome": item.get("outcome"),
+                    "reversibility": item.get("reversibility"),
+                    "files_changed": item.get("files_changed"),
+                    "entity_id": item.get("entity_id"),
+                    "status": item.get("status"),
+                    "severity": item.get("severity"),
+                    "file_path": item.get("file_path"),
+                    "function_name": item.get("function_name"),
+                    "line_number": item.get("line_number"),
+                    "supersedes": item.get("supersedes"),
+                }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/brainlayer/cli/__init__.py` around lines 2120 - 2133, The current
chunk_id generation uses a truncated SHA256 of stable_payload built from only
content, memory_type, project, tags, and importance, causing distinct legacy
rows to collide; update the stable_payload used for the hash (the object built
just before chunk_id) to include all unique legacy identifying fields from item
(e.g., severity, file_path, line_number, entity_id and any other provenance
fields present on item) so the hash covers those attributes before computing
chunk_id for the chunk_id construction code that references stable_payload and
item.

Comment thread src/brainlayer/drain.py
Comment on lines +107 to +145
now = datetime.now(timezone.utc).isoformat()
metadata = {"memory_type": event.get("memory_type", "note")}
metadata.update(event.get("metadata") or {})
chunk_id = event.get("chunk_id") or f"manual-{uuid.uuid4().hex[:16]}"
existing = conn.execute("SELECT content FROM chunks WHERE id = ?", (chunk_id,)).fetchone()
if existing:
if str(existing[0]).strip() == content:
return ApplyResult(chunk_id=chunk_id)
return ApplyResult(collision_chunk_id=chunk_id)
tags = event.get("tags")
_insert_chunk(
conn,
{
"id": chunk_id,
"content": content,
"metadata": json.dumps(metadata),
"source_file": "brainlayer-queue",
"project": event.get("project"),
"content_type": event.get("memory_type", "note"),
"value_type": "HIGH",
"char_count": len(content),
"source": event.get("source") or "manual",
"created_at": now,
"enriched_at": now,
"summary": content[:200],
"tags": json.dumps(tags) if tags else None,
"importance": float(event["importance"]) if event.get("importance") is not None else None,
},
)
supersedes = event.get("supersedes") or metadata.get("supersedes")
cols = _columns(conn, "chunks")
if supersedes and "superseded_by" in cols:
conn.execute("UPDATE chunks SET superseded_by = ? WHERE id = ?", (chunk_id, supersedes))
for vector_table in ("chunk_vectors", "chunk_vectors_binary"):
if conn.execute(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,)
).fetchone():
conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (supersedes,))
entity_id = event.get("entity_id") or metadata.get("entity_id")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't let a bad importance value poison the queue file.

float(event["importance"]) will raise on malformed producer input, roll back the transaction, and leave the same .jsonl on disk to fail again on every drain pass. This path should coerce defensively and either drop the field or skip just that event.

Suggested fix
     metadata = {"memory_type": event.get("memory_type", "note")}
     metadata.update(event.get("metadata") or {})
     chunk_id = event.get("chunk_id") or f"manual-{uuid.uuid4().hex[:16]}"
+    raw_importance = event.get("importance")
+    importance = None
+    if raw_importance is not None:
+        try:
+            importance = float(raw_importance)
+        except (TypeError, ValueError):
+            logger.warning("Ignoring invalid importance=%r for chunk_id=%s", raw_importance, chunk_id)
     existing = conn.execute("SELECT content FROM chunks WHERE id = ?", (chunk_id,)).fetchone()
@@
-            "importance": float(event["importance"]) if event.get("importance") is not None else None,
+            "importance": importance,
🧰 Tools
🪛 OpenGrep (1.20.0)

[ERROR] 144-144: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.

(coderabbit.sql-injection.python-fstring-execute)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/brainlayer/drain.py` around lines 107 - 145, The code currently does
float(event["importance"]) during chunk insertion which will raise on malformed
input and abort the drain; update the logic around the "importance" field (in
the block that builds the dict for _insert_chunk in drain.py) to coerce
defensively: attempt to parse event.get("importance") inside a try/except
catching ValueError/TypeError and on failure set importance to None (or simply
omit the key) so a bad value won't throw and roll back the transaction; ensure
this change is applied where _insert_chunk is called and does not re-raise so
only the bad event is skipped or inserted with no importance instead of
poisoning the queue.

Comment thread src/brainlayer/drain.py
Comment on lines +416 to +418
if should_embed:
_embed_store_chunks(conn, store_chunk_ids, embed_fn)
conn.execute("COMMIT")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Commit the batch before calling the embedder.

_embed_store_chunks() runs while BEGIN IMMEDIATE is still open, so any slow or remote embedding call holds the SQLite write lock for the full embedding latency. Under arbitration, that turns embed slowness into write contention for every producer.

Suggested fix
-                    if should_embed:
-                        _embed_store_chunks(conn, store_chunk_ids, embed_fn)
                     conn.execute("COMMIT")
+                    if should_embed and store_chunk_ids:
+                        embed_conn = _open_connection(db_path)
+                        try:
+                            _embed_store_chunks(embed_conn, store_chunk_ids, embed_fn)
+                        finally:
+                            embed_conn.close()

As per coding guidelines, "Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/brainlayer/drain.py` around lines 416 - 418, Currently the code calls
_embed_store_chunks(conn, store_chunk_ids, embed_fn) while the transaction is
still open, holding the SQLite write lock; change the order so the transaction
is committed before invoking the embedder: move or duplicate
conn.execute("COMMIT") to occur before calling _embed_store_chunks (i.e., call
conn.execute("COMMIT") when should_embed is true and only then call
_embed_store_chunks), ensuring the BEGIN IMMEDIATE/transaction is closed before
any slow/remote embedding work; update any surrounding logic that assumes conn
is usable after commit and add a brief comment flagging this as a risky
DB/concurrency change.

Comment on lines +171 to +181
def _enqueue_enrichment_write(chunk: dict[str, Any], enrichment: dict[str, Any]) -> None:
from .queue_io import enqueue_enrichment_update

content = chunk.get("content", "")
try:
enqueue_enrichment_update(
chunk_id=chunk["id"],
enrichment=enrichment,
content_hash=_content_hash(content) if content else None,
entities=enrichment.get("entities", []),
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Preserve the target store in the queued enrichment intent.

This helper drops the store/db_path context that the direct write path had. In arbitrated mode, enrich_single, enrich_realtime, and enrich_batch can still read from an arbitrary store, but the queued payload here only contains chunk_id and enrichment fields, so the drain has no way to know which SQLite DB should be updated. That can silently route writes to the default DB instead of the store that produced the chunk.

As per coding guidelines, "Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/brainlayer/enrichment_controller.py` around lines 171 - 181, The queued
enrichment helper _enqueue_enrichment_write currently drops the chunk's storage
context so the drain cannot pick the correct SQLite DB; modify the call to
enqueue_enrichment_update to pass through the store/db_path from the chunk
(e.g., chunk.get("store") or chunk.get("db_path")) alongside chunk_id,
enrichment, content_hash and entities so the queued intent includes the target
DB; update any callers or the enqueue_enrichment_update signature if needed to
accept and forward this store/db_path field to the drain to ensure writes go to
the originating store.

Comment on lines +503 to +525
if os.environ.get("BRAINLAYER_ARBITRATED") == "1":
_queue_store(
{
"content": content,
"memory_type": memory_type,
"project": _normalize_project_name(project),
"tags": tags,
"importance": importance,
"confidence_score": confidence_score,
"outcome": outcome,
"reversibility": reversibility,
"files_changed": files_changed,
"entity_id": entity_id,
"status": status,
"severity": severity,
"file_path": file_path,
"function_name": function_name,
"line_number": line_number,
"supersedes": supersedes,
}
)
structured = {"chunk_id": "queued", "related": []}
return ([TextContent(type="text", text=format_store_result("queued", queued=True))], structured)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep supersedes intact when the arbitrated path falls back.

This branch now queues supersedes, but _queue_store(...) can still fall back to pending-stores.jsonl, and _flush_pending_stores() replays those items via store_memory(...) without forwarding supersedes. That means the new arbitrated fast path loses atomic store-and-replace semantics exactly on the failure path it is supposed to make durable.

As per coding guidelines, "Add supersedes parameter to brain_store for atomic store-and-replace operations".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/brainlayer/mcp/store_handler.py` around lines 503 - 525, The queued-item
path must preserve the supersedes field so a fallback replay preserves atomic
replace; update the replay path in _flush_pending_stores to read the stored
record's "supersedes" and pass it through to store_memory (and any intermediate
call like brain_store/store_memory wrappers) when replaying pending-stores.jsonl
entries; ensure function signatures used in replay accept and forward supersedes
(e.g., store_memory, brain_store) so the atomic store-and-replace semantics are
retained on failure paths.

Comment on lines +36 to +39
final_path = resolved_dir / f"{safe_source}-{now_ms}-{uuid.uuid4().hex}.jsonl"
tmp_path = final_path.with_suffix(".tmp")
tmp_path.write_text(json.dumps(event, ensure_ascii=True) + "\n", encoding="utf-8")
tmp_path.replace(final_path)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

replace() alone does not make this queue durable.

A crash after the rename can still lose the just-enqueued event because neither the temp file nor the parent directory is fsynced. Since this queue is now the write buffer for arbitrated writes, that breaks the “durable JSONL” guarantee.

Suggested fix
-    tmp_path.write_text(json.dumps(event, ensure_ascii=True) + "\n", encoding="utf-8")
-    tmp_path.replace(final_path)
+    payload = json.dumps(event, ensure_ascii=True) + "\n"
+    with tmp_path.open("w", encoding="utf-8") as handle:
+        handle.write(payload)
+        handle.flush()
+        os.fsync(handle.fileno())
+    tmp_path.replace(final_path)
+    dir_fd = os.open(resolved_dir, os.O_RDONLY)
+    try:
+        os.fsync(dir_fd)
+    finally:
+        os.close(dir_fd)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
final_path = resolved_dir / f"{safe_source}-{now_ms}-{uuid.uuid4().hex}.jsonl"
tmp_path = final_path.with_suffix(".tmp")
tmp_path.write_text(json.dumps(event, ensure_ascii=True) + "\n", encoding="utf-8")
tmp_path.replace(final_path)
final_path = resolved_dir / f"{safe_source}-{now_ms}-{uuid.uuid4().hex}.jsonl"
tmp_path = final_path.with_suffix(".tmp")
payload = json.dumps(event, ensure_ascii=True) + "\n"
with tmp_path.open("w", encoding="utf-8") as handle:
handle.write(payload)
handle.flush()
os.fsync(handle.fileno())
tmp_path.replace(final_path)
dir_fd = os.open(resolved_dir, os.O_RDONLY)
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/brainlayer/queue_io.py` around lines 36 - 39, The current write uses
tmp_path.write_text(...) then tmp_path.replace(final_path) which can lose data
on crash because neither the file contents nor the containing directory are
fsynced; modify the sequence around tmp_path and final_path so you open the temp
file (e.g. tmp_path.open("wb")), write bytes, call file.flush() and
os.fsync(file.fileno()), then atomically move the temp to final (os.replace or
tmp_path.replace), and afterward open the parent directory
(os.open(final_path.parent, os.O_DIRECTORY)) and call os.fsync(dir_fd) before
closing it; update the code paths that currently call tmp_path.write_text and
tmp_path.replace to use this safe write+fsync+dir fsync pattern.

Comment on lines +117 to +125
return enqueue_jsonl(
{
"kind": "hook_chunk",
"session_id": session_id,
"content": content,
"content_hash": content_hash,
"project": project,
"timestamp": timestamp or time.time(),
},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Preserve explicit 0 timestamps.

timestamp or time.time() treats 0.0 as missing and rewrites the event time. Use an is None check here so valid falsy timestamps survive round-tripping.

Suggested fix
-            "timestamp": timestamp or time.time(),
+            "timestamp": time.time() if timestamp is None else timestamp,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return enqueue_jsonl(
{
"kind": "hook_chunk",
"session_id": session_id,
"content": content,
"content_hash": content_hash,
"project": project,
"timestamp": timestamp or time.time(),
},
return enqueue_jsonl(
{
"kind": "hook_chunk",
"session_id": session_id,
"content": content,
"content_hash": content_hash,
"project": project,
"timestamp": time.time() if timestamp is None else timestamp,
},
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/brainlayer/queue_io.py` around lines 117 - 125, The code currently sets
the "timestamp" field using "timestamp or time.time()", which treats 0 (and
other falsy but valid values) as missing; change this to use an explicit None
check so falsy timestamps like 0 are preserved (e.g., set "timestamp" to
timestamp if timestamp is not None else time.time()); update the dict passed to
enqueue_jsonl (the "timestamp" key) accordingly so enqueue_jsonl receives the
correct timestamp value.

Comment thread tests/test_arbitration.py
Comment on lines +82 to +87
def _connect_apsw(path: Path) -> apsw.Connection:
conn = apsw.Connection(str(path))
conn.enableloadextension(True)
conn.loadextension(sqlite_vec.loadable_path())
conn.enableloadextension(False)
return conn
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

APSW Connection context manager __exit__ close connection or only commit transaction

💡 Result:

In APSW, using a Connection as a context manager (with conn:) does not close the database connection on exit; it manages the pending transaction only: if the with-block exits without an exception, the pending transaction is committed; if an exception occurred, the transaction is rolled back. [1][2] So, exit commits/rolls back but connection.close is not implied by the context-manager exit. You should close the connection yourself (or use contextlib.closing / an equivalent) if you need it closed deterministically. [3][2]

Citations:


🏁 Script executed:

# First, verify the _connect_apsw function exists and matches the snippet
head -100 tests/test_arbitration.py | tail -30

Repository: EtanHey/brainlayer

Length of output: 970


🏁 Script executed:

# Find all usages of _connect_apsw to verify the with-statement pattern
rg "_connect_apsw" tests/test_arbitration.py -A 1 -B 1 -n

Repository: EtanHey/brainlayer

Length of output: 1808


🏁 Script executed:

# Check _create_vec_db to see how it uses _connect_apsw
rg "_create_vec_db" tests/test_arbitration.py -A 15 -B 2

Repository: EtanHey/brainlayer

Length of output: 1189


APSW with statement does not close the connection — leaks across every test.

_connect_apsw returns a raw apsw.Connection, and all call sites use it as with _connect_apsw(...) as conn: (lines 147, 195, 226, 254, 279, 297, 315, 350, 374). Per APSW documentation, the connection context manager only manages a transaction (commit on success, rollback on exception); it does not close the connection. Each invocation therefore leaks the underlying SQLite handle and any WAL/SHM mappings until GC eventually runs __del__. In WAL mode this can keep read locks and file descriptors live across tests, mask the very concurrency behavior under test, and pollute later tests (especially the 3-producer load test).

Recommend converting the helper into a proper closing context manager so all existing with ... as conn: usages do the right thing.

🛠️ Proposed fix — wrap as a closing context manager
+from contextlib import contextmanager
@@
-def _connect_apsw(path: Path) -> apsw.Connection:
-    conn = apsw.Connection(str(path))
-    conn.enableloadextension(True)
-    conn.loadextension(sqlite_vec.loadable_path())
-    conn.enableloadextension(False)
-    return conn
+@contextmanager
+def _connect_apsw(path: Path):
+    conn = apsw.Connection(str(path))
+    try:
+        conn.enableloadextension(True)
+        conn.loadextension(sqlite_vec.loadable_path())
+        conn.enableloadextension(False)
+        yield conn
+    finally:
+        conn.close()

_create_vec_db (line 91) will need updating to with _connect_apsw(path) as conn: as well.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/test_arbitration.py` around lines 82 - 87, The helper _connect_apsw
currently returns a raw apsw.Connection which used in tests as "with
_connect_apsw(...) as conn:" only manages transactions and does not close the
connection, leaking SQLite handles; change _connect_apsw into a proper closing
context manager (e.g., use contextlib.contextmanager or return an object with
__enter__/__exit__) that creates the apsw.Connection, enables/loads extensions
as before, yields the connection, and always calls conn.close() in the
exit/finally path; update any helper callers like _create_vec_db (and all tests
already using "with _connect_apsw(...) as conn:") to rely on the new context
manager so connections are closed after each with-block.

Comment thread tests/test_arbitration.py
Comment on lines +343 to +348
stale_lock = queue_dir / ".drain.lock"
stale_lock.write_text("stale", encoding="utf-8")
stale_lock.unlink()

assert drain_once(db_path=db_path, queue_dir=queue_dir) == 1
assert not stale_lock.exists()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Test does not exercise the scenario its name describes.

stale_lock is written and then immediately unlink()ed before drain_once runs, so the drain never observes a lock sentinel at all. The subsequent assert not stale_lock.exists() is trivially true (the test itself just deleted it). As written, this is indistinguishable from a plain "drain processes one item" test and provides no signal about lock-file robustness.

If the intent is "drain still works when a previous stale .drain.lock file is present", leave the file in place and let drain handle it. If the intent is "drain does not depend on a file at that path for mutual exclusion" (i.e., uses flock/in-memory locking on a different artifact), the test needs to assert that drain succeeds even while another holder believes it owns the lock — which requires a second process/thread, not a write-then-delete dance.

🛠️ Suggested fix — actually leave the stale lock in place
     stale_lock = queue_dir / ".drain.lock"
     stale_lock.write_text("stale", encoding="utf-8")
-    stale_lock.unlink()
 
     assert drain_once(db_path=db_path, queue_dir=queue_dir) == 1
     assert not stale_lock.exists()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
stale_lock = queue_dir / ".drain.lock"
stale_lock.write_text("stale", encoding="utf-8")
stale_lock.unlink()
assert drain_once(db_path=db_path, queue_dir=queue_dir) == 1
assert not stale_lock.exists()
stale_lock = queue_dir / ".drain.lock"
stale_lock.write_text("stale", encoding="utf-8")
assert drain_once(db_path=db_path, queue_dir=queue_dir) == 1
assert not stale_lock.exists()
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/test_arbitration.py` around lines 343 - 348, The test currently creates
stale_lock then immediately unlinks it so drain_once never sees a sentinel;
remove the premature deletion and let drain_once handle the stale file: create
stale_lock = queue_dir / ".drain.lock", write the sentinel, do not call
stale_lock.unlink() before invoking drain_once(db_path=db_path,
queue_dir=queue_dir), then assert drain_once(...) == 1 and assert not
stale_lock.exists() afterwards so the test verifies that drain_once detects and
removes a stale .drain.lock; reference the stale_lock variable and the
drain_once(...) call to locate the change.

"tags": tags,
},
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meta-research arbitration incorrectly clears entity JSON data

Medium Severity

_enqueue_meta_research_write calls _enqueue_enrichment_write which always passes entities=enrichment.get("entities", []). For meta-research, the enrichment dict lacks an "entities" key, so an empty list is passed. The drain's _apply_enrichment then sets raw_entities_json = "[]" because the value is not None. The direct-write path (_mark_meta_research) never touches raw_entities_json, so the arbitrated path introduces unintended data loss of existing entity associations.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 922aafd. Configure here.

Copy link
Copy Markdown

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Low

When BRAINLAYER_ARBITRATED=1, the supersedes parameter is queued in the pending store dict but _flush_pending_stores never extracts it, so the supersede relationship is silently lost and the old chunk remains unmarked when the queue flushes.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file src/brainlayer/mcp/store_handler.py around line 447:

When `BRAINLAYER_ARBITRATED=1`, the `supersedes` parameter is queued in the pending store dict but `_flush_pending_stores` never extracts it, so the supersede relationship is silently lost and the old chunk remains unmarked when the queue flushes.

Evidence trail:
src/brainlayer/mcp/store_handler.py lines 515-537 (queuing with supersedes at line 532), src/brainlayer/mcp/store_handler.py lines 425-470 (_flush_pending_stores calling store_memory without supersedes), src/brainlayer/store.py lines 47-64 (store_memory signature lacks supersedes param), src/brainlayer/mcp/store_handler.py lines 567-570 (supersede_chunk called separately in non-queued path)

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 64110e7e8d

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/brainlayer/drain.py
Comment on lines +408 to +414
for attempt in range(5):
conn = _open_connection(db_path)
attempt_drained = 0
collision_ids: list[str] = []
store_chunk_ids: list[str] = []
try:
conn.execute("BEGIN IMMEDIATE")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Batch queue files in one SQLite transaction

When the queue contains many normal enqueue_jsonl writes, each event is its own .jsonl file, but this loop opens a fresh APSW connection, loads sqlite-vec, and runs BEGIN IMMEDIATE/COMMIT once per file instead of once per batch_size. In bursty producer scenarios the drain becomes transaction/setup-bound and falls behind even with batch_size=250 (the concurrent producer test only drained 2000 of 3000 items before its deadline), leaving arbitrated writes queued much longer than intended.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 2 total unresolved issues (including 1 from previous review).

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 64110e7. Configure here.

Comment thread src/brainlayer/drain.py
time.sleep(delay)
continue
_log(log_path, f"drain failed for {path.name}: {exc}")
break
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-BusyError drain failures leave queue files permanently stuck

Medium Severity

When a non-BusyError exception occurs during event processing inside drain_once (e.g., ValueError from a non-numeric importance value, or any data-level error), the except branch logs the failure and breaks without quarantining or deleting the file. The file stays as *.jsonl in the queue directory and is re-selected by sorted(queue_dir.glob("*.jsonl")) on every subsequent drain cycle, failing identically each time. Read errors (UnicodeDecodeError/OSError) are correctly quarantined via _quarantine_file, but processing-level exceptions are not, creating a permanently stuck file.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 64110e7. Configure here.

@EtanHey EtanHey merged commit 2cc24cc into main May 14, 2026
7 checks passed
@EtanHey EtanHey deleted the fix/brainlayer-writer-arbitration branch May 14, 2026 13:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant