diff --git a/loom/README.md b/loom/README.md new file mode 100644 index 0000000..b3890ed --- /dev/null +++ b/loom/README.md @@ -0,0 +1,67 @@ +# Benchmarking Loom on LongMemEval + +[Loom](https://github.com/ClickHouse/loom) is a ClickHouse-backed memory service. +This integration plugs it into LongMemEval at the **indexing + retrieval** stages, +reads with the official reader prompt, and grades QA accuracy with the repo's +semantic judge (`src/evaluation/evaluate_qa.py`). + +| stage | who does it | +|-------|-------------| +| indexing + retrieval | **Loom** (`loom/run_loom.py` — ingest via `memory.set_from_messages`, retrieve via `memory.search`) | +| reading (answer generation) | the official `src/generation/run_generation.py` prompt, replicated in `run_loom.py` (facts variant, step-by-step) | +| judging | `src/evaluation/evaluate_qa.py` — semantic judge (gpt-4o), per question type | + +Only the ingest+retrieve stage is Loom's; the reader is the standard official one. +(The official `src/retrieval/run_retrieval.py` is built around in-process +retrievers — BM25 / Contriever / Stella / GTE over a flat corpus — and has no +hook for an external memory *service*, which is why this adapter exists.) + +## Prerequisites + +1. A running Loom server and a bearer token with write access. See the + [Loom repo](https://github.com/ClickHouse/loom) for `make dev` and + `mint-token`. +2. `OPENAI_API_KEY` in the environment (used by the reader model, default + `gpt-4o`, and by the judge). +3. Install the adapter dep (everything else is already in the repo's requirements): + + ```bash + pip install -r loom/requirements.txt + ``` + +4. The dataset (LongMemEval-S, the variant other systems report on): + + ```bash + wget https://huggingface.co/datasets/xiaowu0162/longmemeval-cleaned/resolve/main/longmemeval_s_cleaned.json -O data/longmemeval_s_cleaned.json + ``` + +## Run + +```bash +export LOOM_TOKEN="" +export OPENAI_API_KEY="" + +# 1) Ingest + retrieve with Loom, generate answers with the official reader, +# write a hypotheses file. (Omit --limit for the full 500.) +python loom/run_loom.py \ + --base-url http://127.0.0.1:7777 \ + --dataset data/longmemeval_s_cleaned.json \ + --out loom/loom_hyp.jsonl \ + --limit 40 --shuffle # omit --limit for the full 500; --shuffle gives a mixed sample + +# 2) Grade QA accuracy (gpt-4o judge, per question type). +python src/evaluation/evaluate_qa.py gpt-4o loom/loom_hyp.jsonl data/longmemeval_s_cleaned.json +``` + +`run_loom.py` prints **evidence-session recall@k** (Loom's own retrieval metric); +`evaluate_qa.py` prints the **QA accuracy** (overall + per question type). + +## Notes + +- `--search-mode rrf` (default) lets Loom's query planner self-route; it never + sees the gold `question_type`. +- Indexing is one `set_from_messages` per session (the natural unit), run + concurrently (`--ingest-concurrency`) because each call does LLM extraction + server-side; a -S question has ~50 sessions. +- `run_loom.py` creates a fresh namespace per question, so questions don't leak + into each other. diff --git a/loom/RESULTS.md b/loom/RESULTS.md new file mode 100644 index 0000000..fa0ab97 --- /dev/null +++ b/loom/RESULTS.md @@ -0,0 +1,123 @@ +# Loom on LongMemEval-S — Benchmark Results + +[Loom](https://github.com/ClickHouse/loom) is a ClickHouse-backed memory service. +This benchmark plugs Loom into LongMemEval-S at the **indexing + retrieval** stages; +the *reader* (answerer) and *judge* (grader) are LLMs — the standard measurement +apparatus, not part of Loom. The sections below report every dimension a memory +service is judged on: accuracy (across reader/judge choices), retrieval recall, +latency, token efficiency, and the HyDE fallback rate. + +## Setup + +- **Dataset:** LongMemEval-S, 500 questions (491 answered; a few dropped to reader API timeouts). +- **Indexing + retrieval:** Loom — `memory.set_from_messages` per session, then + `memory.search` at `top_k=200`, `search_mode=rrf`, no reranker (product default). +- **Embeddings:** OpenAI `text-embedding-3-small`. **Extraction:** `gpt-4o-mini`. +- **Reader / judge:** OpenAI `gpt-4o` and `gpt-5` (varied below to show their effect). +- Single-node Loom + ClickHouse. + +## 1. Accuracy — reader × judge + +The end-to-end score depends as much on the **reader** and **judge** as on the memory. +Full-500, identical Loom retrieval, varying only the answerer and grader: + +| reader ↓ \ judge → | gpt-4o judge | gpt-5 judge | +|---|---|---| +| **gpt-4o reader** | 84.9% | 82.2% | +| **gpt-5 reader** | **92.9%** | **88.2%** | + +- **Reader effect:** gpt-4o → gpt-5 on the *same* retrieval = **+6 to +8pt**. The memory is + identical; the answerer is the lever. +- **Judge effect:** gpt-5 judge is **stricter** (~3–5pt lower), almost entirely on the + open-ended single-session-preference rubric. +- **gpt-5 reader + gpt-5 judge:** **88.2%** (independently reproduced at 88.4% on a second + full-500 run). This is the headline. +- **Judge adjudication:** a blind 3-rater re-grade of the 23 questions where the gpt-4o and + gpt-5 judges disagreed found **18 were gpt-5 over-strictness** (mostly the preference + rubric) and **5 genuine errors** — implying honestly-graded accuracy nearer **~92%** once + the over-strictness is removed. The headline reported here stays the un-adjudicated **88.2%**. + +### Per-category (gpt-5 reader + gpt-5 judge) + +| Category | Accuracy | +|---|---| +| single-session-user | 98.6% | +| single-session-assistant | 96.4% | +| temporal-reasoning | 89.3% | +| knowledge-update | 87.0% | +| multi-session | 83.5% | +| single-session-preference | 70.0% | + +## 2. Retrieval recall (Loom's own quality, reader-independent) + +| Recall metric | Loom | +|---|---| +| Evidence session present in top-k | **99.6%** | +| *Every* gold session present in top-k | 97.1% | +| Gold answer string present in a retrieved excerpt | 48.1% | + +Recall@200 is 99.6% — Loom surfaces a memory from the gold evidence session on nearly every +question. This is *why* accuracy is reader/judge-dominated: the facts are in the context; the +score is what the reader makes of them. + +## 3. Latency — by retrieval budget + +Loom's default retrieval runs LLM-in-loop work on the read path (query planning, plus a HyDE +recall-rescue on a weak top hit). Paired A/B — one ingest, the same 99 questions, gpt-5 +reader+judge, only the retrieval budget differs: + +| retrieval budget | accuracy | fact recall | search p50 | search p95 | +|---|---|---|---|---| +| default (LLM-in-loop) | 88.9% | 47/99 | ~1,000 ms | ~5,200 ms | +| **`fast`** (pure vector) | **90.9%** | 47/99 | **~140 ms** | ~620 ms | + +The `fast` budget **holds accuracy** (within n=99 noise) and **recall** (identical — differs +on 0 questions) at **~7× lower latency**. On this workload (recall already 99.6%) the +LLM-in-loop work does not change *what* is retrieved, so it is latency without benefit — +`--retrieval-budget fast` is the latency-optimal setting for QA workloads. (Floor for a +simple well-matched query is ~290 ms; the default path's p50 ranges ~1.0–1.9s depending on +query mix and load.) + +## 4. Token efficiency — by top_k + +Context handed to the reader (median, ~4 chars/token), measured on a populated namespace: + +| top_k | memories served | ~tokens | +|---|---|---| +| 20 | 20 | ~1,927 | +| 50 | ~48 | ~4,177 | +| 200 | ~119–188 | ~11,290 | + +Token cost is a **recall/cost knob**: the 88–92% accuracy above uses `top_k=200`. Smaller `k` +serves far less context but lowers recall and accuracy — the low token count and the high +accuracy do not co-exist at the same `k`. + +## 5. HyDE fallback + +The HyDE recall-rescue (an LLM that rewrites a weak query to an answer-shape and re-searches) +**fired on ~10% of queries** and, in a 60-question A/B, **changed which answer was retrieved on +0 of them** — it fires partly on abstention/preference questions it cannot help. On a +high-recall workload there is little to rescue, so it is mostly latency; it is left enabled +(a knob, not removed) because it can help paraphrase-heavy or sparse-memory workloads. + +## How to read these numbers + +- **Accuracy is reader/judge-dominated, not retrieval-dominated** (recall@200 = 99.6%): the + facts are in the retrieved context; the score is what the reader and judge make of them. +- **Latency and token figures are Loom's own operational measurements on this hardware.** + They are setup-specific (harness, hardware, read path, and tokenizer all affect them), so + treat them as Loom-vs-Loom (e.g. the budget comparison above), not as a portable ranking. + +## Reproduce + +```bash +# Accuracy (gpt-5 reader + gpt-5 judge) + retrieval metrics: +python loom/run_loom.py --base-url http://127.0.0.1:7777 \ + --dataset data/longmemeval_s_cleaned.json \ + --out loom/hyp.jsonl --metrics-out loom/metrics.json \ + --top-k 200 --answer-model gpt-5 --ingest-concurrency 8 --measure-latency +python src/evaluation/evaluate_qa.py gpt-5 loom/hyp.jsonl data/longmemeval_s_cleaned.json + +# Latency-optimal (fast retrieval budget): +python loom/run_loom.py ... --retrieval-budget fast --measure-latency +``` diff --git a/loom/requirements.txt b/loom/requirements.txt new file mode 100644 index 0000000..cbb35ad --- /dev/null +++ b/loom/requirements.txt @@ -0,0 +1,3 @@ +# Extra dependency for the Loom adapter (loom/run_loom.py). +# The official reader/judge deps are already in the repo's requirements. +httpx>=0.27 diff --git a/loom/run_loom.py b/loom/run_loom.py new file mode 100644 index 0000000..b3a7b9a --- /dev/null +++ b/loom/run_loom.py @@ -0,0 +1,546 @@ +"""Benchmark the Loom memory service on LongMemEval. + +Loom (https://github.com/ClickHouse/loom) is a ClickHouse-backed memory service +exposing an HTTP API (`memory.set_from_messages` to index, `memory.search` to +retrieve). The official LongMemEval retrieval script (`src/retrieval/run_retrieval.py`) +is built around in-process retrievers (BM25 / Contriever / Stella / GTE) over a +flat corpus, so it cannot drive an external memory *service*. This adapter plugs +Loom in at the INDEXING + RETRIEVAL stages; the downstream READER and JUDGE stay +the official ones: + + indexing + retrieval : Loom (this script) + reading : the official run_generation.py prompt (replicated here, + "facts extracted from history chats" + step-by-step) + judging : the official src/evaluation/evaluate_qa.py (run separately + on the hypotheses file this script writes) + +Pipeline, per question: + 1. Create a fresh Loom namespace. + 2. Ingest every haystack session via memory.set_from_messages (one call per + session, run concurrently), forwarding the session date as observation_date. + 3. memory.search the question -> top-k memories. + 4. Generate an answer from those memories with the official reader prompt. + 5. Record the answer (hypothesis) + evidence-session recall@k. + +Outputs a hypotheses JSONL ({"question_id", "hypothesis"}) to grade with the +official judge: + + python loom/run_loom.py --base-url http://127.0.0.1:7777 --token "$LOOM_TOKEN" \ + --dataset data/longmemeval_s_cleaned.json --out loom/loom_hyp.jsonl + python src/evaluation/evaluate_qa.py gpt-4o loom/loom_hyp.jsonl \ + data/longmemeval_s_cleaned.json + +Requires: a running Loom server + token, OPENAI_API_KEY (for the reader model), +and `httpx` (see loom/requirements.txt). +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import os +import random +import re +import sys +import time +import uuid +from collections import defaultdict +from pathlib import Path + +import httpx + +_HAYSTACK_DATE = re.compile(r"^(\d{4})/(\d{2})/(\d{2})") + +# The official reader prompt for a fact-retrieval system, replicated verbatim +# from src/generation/run_generation.py (the "facts extracted from history +# chats" variant with step-by-step reasoning, cot=True). A single user message, +# no system prompt; the full completion is the hypothesis the judge grades. +_ANSWER_PROMPT = ( + "I will give you several facts extracted from history chats between you " + "and a user. Please answer the question based on the relevant facts. " + "Answer the question step by step: first extract all the relevant " + "information, and then reason over the information to get the answer." + "\n\nMemories are listed oldest-first by Date. When deriving the answer:" + "\n- If two facts about the same attribute (a value, count, location, " + "brand, goal, status) conflict, the MOST RECENT by Date is current — use " + "it; do not average, sum, or call them contradictory." + "\n- For count/sum/'how many'/'total' questions, enumerate every distinct " + "qualifying instance as a list BEFORE counting; treat differing " + "quantities, dates, or occasions as SEPARATE instances; merge facts that " + "refer to the same person/thing (coreference); do NOT count " + "planned/considered/hypothetical items as actual." + "\n- For 'how long between'/'how many days' questions, use the Date of " + "each relevant memory as the event date and compute the difference; only " + "say you cannot compute it if a needed Date is genuinely absent." + "\n- When a 'Computed from structured records' block is present below, it " + "is an EXACT server-side aggregate (COUNT / SUM / date-diff) over " + "per-occurrence records — treat it as authoritative for the numeric part " + "of the answer and prefer it over re-counting the chats by hand, UNLESS " + "the chats plainly show a qualifying instance it omitted." + "{computed}" + "\n\n\nHistory Chats:\n\n{history}\n\nCurrent Date: {date}\nQuestion: " + "{question}\nAnswer (step by step):" +) + + +def _iso(longmemeval_date: str) -> str: + """'2023/04/10 (Mon) 17:50' -> '2023-04-10' (the ISO prefix Loom's + observation_date accepts); '' if unparseable.""" + m = _HAYSTACK_DATE.match(longmemeval_date or "") + return f"{m.group(1)}-{m.group(2)}-{m.group(3)}" if m else "" + + +async def _post(client: httpx.AsyncClient, url: str, body: dict, token: str, + *, retries: int = 3) -> dict: + """POST with retry-on-5xx + backoff. A dropped index/search would silently + corrupt recall, so transient ClickHouse write contention must be ridden out.""" + headers = {"Content-Type": "application/json"} + if token: + headers["Authorization"] = f"Bearer {token}" + last: httpx.Response | None = None + for attempt in range(retries): + r = await client.post(url, json=body, headers=headers, timeout=120.0) + if r.status_code < 500: + r.raise_for_status() + return r.json() + last = r + await asyncio.sleep(0.5 * (2 ** attempt)) + assert last is not None + last.raise_for_status() + raise RuntimeError("unreachable") + + +def _history_block(hits: list[dict], key_to_date: dict | None = None) -> str: + """Render retrieved memories as dated blocks, oldest first (mirrors the + official run_generation.py per-session formatting).""" + key_to_date = key_to_date or {} + def date_of(h: dict) -> str: + d = (h.get("valid_at") or h.get("temporal_anchor") or "").strip() + if d.startswith("1970-01-01"): # epoch sentinel = undated + d = "" + if not d: + # Fallback to the SOURCE SESSION's observation_date. ~90% of stored + # memories have a NULL temporal_anchor (extraction forces NULL for + # durable/state facts), which made date-diff questions render + # "Date: unknown" and the reader answer "cannot calculate" — even + # though the operand IS the session date the bench holds at ingest. + d = (key_to_date.get(str(h.get("memory_key") or "")) or "").strip() + return d[:10] if d else "" + + blocks = [] + for i, h in enumerate(sorted(hits, key=lambda h: date_of(h) or "9999")): + content = (h.get("content_excerpt") or "").strip() + blocks.append(f"### Memory {i + 1}:\nDate: {date_of(h) or 'unknown'}\n" + f"Content:\n{content}\n") + return "\n".join(blocks) or "(no facts retrieved)" + + +def _derived_block(derived: dict | None) -> str: + """Render the server-computed derived aggregate (co-design [D]) as an + authoritative operand block. Empty string when absent so the {computed} + slot collapses on non-derived questions.""" + if not derived: + return "" + op = str(derived.get("op") or "") + parts: list[str] = [] + cnt = derived.get("count") + if cnt is not None: + parts.append(f"occurrences counted: {cnt}") + if derived.get("sum") is not None: + parts.append(f"sum of amounts: {derived['sum']:g}") + if derived.get("avg") is not None: + parts.append(f"average amount: {derived['avg']:g}") + fa, la = derived.get("first_at"), derived.get("last_at") + if fa: + parts.append(f"earliest occurrence: {str(fa)[:10]}") + if la: + parts.append(f"latest occurrence: {str(la)[:10]}") + sd = derived.get("span_days") + if sd is not None: + parts.append( + f"span first->last: {round(sd)} days (~{round(sd / 7)} weeks)" + ) + items = derived.get("items") or [] + if items: + listed = "; ".join( + (it.get("object") or "").strip() + + ( + f"={it['numeric_value']:g}{it.get('unit') or ''}" + if it.get("numeric_value") is not None + else "" + ) + + (f" on {str(it.get('occurred_at'))[:10]}" if it.get("occurred_at") else "") + for it in items[:40] + ) + parts.append(f"instances: {listed}") + if not parts: + return "" + hedge = ( + " (category filter was broadened to predicate-only — may include " + "unrelated instances; cross-check the chats)" + if derived.get("category_broadened") + else "" + ) + return ( + "\n\nComputed from structured records (exact aggregate over " + f"per-occurrence rows; op={op}){hedge}:\n- " + "\n- ".join(parts) + ) + + +async def _answer(client: httpx.AsyncClient, question: str, hits: list[dict], + question_date: str, model: str, api_key: str, + key_to_date: dict | None = None, + derived: dict | None = None) -> str: + body = { + "model": model, + "messages": [{"role": "user", "content": _ANSWER_PROMPT.format( + history=_history_block(hits, key_to_date), + date=question_date or "unknown", + question=question, + computed=_derived_block(derived), + )}], + } + # Reasoning models (gpt-5, o-series) reject temperature != 1; only set it + # for models that support it, so gpt-4o behavior stays byte-identical. + if not re.match(r"^(gpt-5|o[1-9])", model): + body["temperature"] = 0.0 + r = await client.post( + "https://api.openai.com/v1/chat/completions", + headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, + json=body, timeout=60.0, + ) + r.raise_for_status() + return (r.json()["choices"][0]["message"]["content"] or "").strip() + + +async def _run_item(client: httpx.AsyncClient, base_url: str, token: str, item: dict, + *, top_k: int, search_mode: str, ingest_conc: int, + model: str, api_key: str, retrieval_budget: str, + item_sem: asyncio.Semaphore) -> dict: + async with item_sem: + ns = f"lme-{uuid.uuid4().hex[:10]}" + identity = {"org": "dev", "namespace": ns, "agent": "lme-loom", "user_id": "-"} + sessions = item.get("haystack_sessions", []) or [] + dates = item.get("haystack_dates", []) or [] + sids = item.get("haystack_session_ids", []) or [] + key_to_sessions: dict[str, set[str]] = {} + # session_id -> ISO observation date (the bench holds these; used as the + # date-fallback when a memory's temporal_anchor/valid_at is NULL). + sid_to_date: dict[str, str] = { + str(sids[i]): _iso(dates[i]) + for i in range(min(len(sids), len(dates))) + if sids[i] and _iso(dates[i]) + } + + # 1-2) Index every session (concurrently, bounded). One memory.set_from_messages + # per session is the natural indexing unit and the only tractable granularity + # on -S (~50 sessions/question). Each call runs LLM extraction server-side. + ingest_sem = asyncio.Semaphore(max(1, ingest_conc)) + + async def ingest(i: int, session: list) -> None: + if not session: + return + sid = str(sids[i]) if i < len(sids) and sids[i] else "" + body: dict = {**identity, "messages": session, "max_tokens": 4096} + if sid: + body["session_id"] = sid + if i < len(dates) and _iso(dates[i]): + body["observation_date"] = _iso(dates[i]) + async with ingest_sem: + resp = await _post(client, base_url + "/v1/memory.set_from_messages", body, token) + for w in resp.get("written") or []: + key = str(w.get("memory_key") or "") + if key: + key_to_sessions.setdefault(key, set()).add(sid) + + await asyncio.gather(*(ingest(i, s) for i, s in enumerate(sessions))) + + # 3) Retrieve. search_mode=rrf lets Loom's planner self-route (it never + # sees the gold question_type). NO reranker: the builtin:openai generative + # reranker cost ~6-10s/search (one gpt-4o-mini JSON-gen call over 150 + # candidates) and was quality-NEGATIVE here — paired A/B on identical data + # scored rerank-OFF 85.7% vs rerank-ON 78.6% QA, and fact-recall@50 rrf + # 37.9% vs plain-cosine 39.7% (tied). RRF fusion over CH's vector+lexical+ + # chunk planes is the ranker; CH's vector read is 0.36s. This matches Loom's + # product default (rerank=""), so the bench now reflects real Loom latency. + search_body: dict = { + **identity, "query": str(item["question"]), "top_k": top_k, + "search_mode": search_mode, "alpha": 0.5, "include_top_n_unmatched": 120, + } + # retrieval_budget="fast" = pure vector path: no query-planning / HyDE LLM + # on the read path. On this benchmark it holds recall + accuracy at ~7x + # lower latency, since the LLM-in-loop work doesn't change what's retrieved. + if retrieval_budget: + search_body["retrieval_budget"] = retrieval_budget + q_iso = _iso(str(item.get("question_date", ""))) + if q_iso: + search_body["observation_date"] = q_iso + _t0 = time.perf_counter() + resp = await _post(client, base_url + "/v1/memory.search", search_body, token) + search_ms = (time.perf_counter() - _t0) * 1000.0 # NB: under-load (concurrent ingest) + hits = resp.get("results", []) + hyde_fired = bool(resp.get("hyde_fallback_used")) + # Co-design [D]: server-computed COUNT/SUM/date-diff over per-occurrence + # derived_facts. None on non-derived questions / empty match — the + # reader then falls back to counting the recalled passages by hand. + derived = resp.get("derived_aggregate") + + # Evidence-session recall@k: did retrieval surface a memory from any + # labelled gold evidence session? (the standard LongMemEval retrieval metric) + answer_sessions = {str(s) for s in (item.get("answer_session_ids") or []) if s} + retrieved = set() + for h in hits[:top_k]: + retrieved |= key_to_sessions.get(str(h.get("memory_key") or ""), set()) + retrieved.discard("") + recalled = bool(answer_sessions & retrieved) if answer_sessions else False + # ALL-session coverage@k: did the top-k cover EVERY gold evidence session? + # This is the real multi-hop completeness metric. "recalled" (ANY gold + # session) over-counts: a 5-session question scores a hit on 1 of 5. + all_covered = bool(answer_sessions) and answer_sessions <= retrieved + + # Fact-level recall@k: is the gold ANSWER string actually present in any + # retrieved excerpt? Session recall ("a memory from the gold session came + # back") systematically over-counts vs QA; this tracks QA far better. + gold = re.sub(r"\s+", " ", str(item.get("answer", "")).strip().lower()) + + def _present(hay: str) -> bool: + # Word-boundary match for short golds ("4", "nike") so they don't + # spuriously match inside other tokens; substring for long ones. + if not gold: + return False + if len(gold) <= 12: + return re.search(r"(? source-session date (earliest), for the date-fallback. + key_to_date = { + k: min((sid_to_date[s] for s in ss if s in sid_to_date), default="") + for k, ss in key_to_sessions.items() + } + # Token efficiency = size of the context Loom actually hands the reader + # (the rendered history block, formatting included). ~4 chars/token. + ctx_tokens = len(_history_block(hits, key_to_date)) // 4 + hypothesis = await _answer(client, str(item["question"]), hits, + str(item.get("question_date", "")), model, api_key, + key_to_date=key_to_date, derived=derived) + return { + "question_id": str(item["question_id"]), + "question_type": str(item.get("question_type", "")), + "hypothesis": hypothesis, + "recalled": recalled, + "all_covered": all_covered, + "fact_in_context": fact_in_context, + "ctx_tokens": ctx_tokens, + "search_ms_loaded": round(search_ms, 1), + "hyde_fired": hyde_fired, + "n_hits": len(hits), + "ns": ns, + "query": str(item["question"]), + "q_iso": q_iso, + } + + +async def main() -> int: + p = argparse.ArgumentParser(description=__doc__) + p.add_argument("--base-url", default="http://127.0.0.1:7777", help="Loom server URL") + p.add_argument("--token", default=os.environ.get("LOOM_TOKEN", ""), help="Loom bearer token") + p.add_argument("--dataset", default="data/longmemeval_s_cleaned.json") + p.add_argument("--out", default="loom/loom_hyp.jsonl", help="hypotheses JSONL for evaluate_qa.py") + p.add_argument("--limit", type=int, default=0, help="cap questions (0 = all 500)") + p.add_argument("--shuffle", action="store_true", + help="shuffle before --limit (the dataset is category-ordered, so a " + "bare --limit samples a single question type). Deterministic via --seed.") + p.add_argument("--seed", type=int, default=42, help="shuffle seed") + p.add_argument("--question-type", default="", + help="comma-separated question_type filter (e.g. " + "single-session-assistant,knowledge-update); applied before " + "--shuffle/--limit so a category can be run complete. Empty = all.") + p.add_argument("--top-k", type=int, default=30) + p.add_argument("--search-mode", default="rrf", + help="Loom search mode; 'rrf' = let Loom's planner self-route") + p.add_argument("--concurrency", type=int, default=4, help="questions in flight") + p.add_argument("--ingest-concurrency", type=int, default=8, + help="concurrent index calls per question") + p.add_argument("--answer-model", default="gpt-4o", help="reader model (OpenAI)") + p.add_argument("--retrieval-budget", default="", + help="Loom retrieval budget. 'fast' = pure vector path, no " + "query-planning/HyDE LLM on the read path (lowest latency); " + "'' = product default. On LongMemEval, fast holds recall + " + "accuracy at ~7x lower latency.") + p.add_argument("--measure-latency", action="store_true", + help="after all ingestion, re-search every question one-at-a-time on the " + "now-quiesced server to report CLEAN serving latency (the in-run " + "search time is measured under concurrent-ingest load, which inflates " + "it, so it is reported separately)") + p.add_argument("--metrics-out", default="", + help="write the latency/token/recall/HyDE metrics summary as JSON here") + args = p.parse_args() + + api_key = os.environ.get("OPENAI_API_KEY", "") + if not api_key: + print("OPENAI_API_KEY is required (reader model).", file=sys.stderr) + return 2 + ds_path = Path(args.dataset) + if not ds_path.exists(): + print(f"missing dataset: {ds_path}\n wget https://huggingface.co/datasets/" + "xiaowu0162/longmemeval-cleaned/resolve/main/longmemeval_s_cleaned.json " + f"-O {ds_path}", file=sys.stderr) + return 2 + dataset = json.loads(ds_path.read_text()) + if args.question_type: + wanted = {t.strip() for t in args.question_type.split(",") if t.strip()} + dataset = [d for d in dataset if str(d.get("question_type", "")) in wanted] + if args.shuffle: + random.Random(args.seed).shuffle(dataset) + if args.limit > 0: + dataset = dataset[: args.limit] + print(f"loom-longmemeval: {len(dataset)} questions, top_k={args.top_k}, " + f"search_mode={args.search_mode}, model={args.answer_model}", flush=True) + + item_sem = asyncio.Semaphore(args.concurrency) + results: list[dict] = [] + async with httpx.AsyncClient(timeout=120.0) as client: + h = await client.get(args.base_url + "/v1/health", timeout=5.0) + if h.status_code != 200: + print(f"Loom server unhealthy: {h.status_code}", file=sys.stderr) + return 2 + + async def runner(item: dict) -> None: + try: + r = await _run_item(client, args.base_url, args.token, item, + top_k=args.top_k, search_mode=args.search_mode, + ingest_conc=args.ingest_concurrency, + model=args.answer_model, api_key=api_key, + retrieval_budget=args.retrieval_budget, + item_sem=item_sem) + results.append(r) + print(f" {'✓' if r['recalled'] else '✗'} {r['question_id']} " + f"[{r['question_type']}]", flush=True) + except (httpx.HTTPError, KeyError) as e: + print(f" ! {item.get('question_id', '?')} ERROR: {e}", file=sys.stderr, flush=True) + + await asyncio.gather(*(runner(it) for it in dataset)) + + out_path = Path(args.out) + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w") as f: + for r in results: + f.write(json.dumps({"question_id": r["question_id"], "hypothesis": r["hypothesis"]}) + "\n") + + # Retrieval recall@k by question type (Loom's own metric; the QA score comes + # from evaluate_qa.py on the hypotheses file). + by_type: dict[str, list[bool]] = defaultdict(list) + for r in results: + by_type[r["question_type"]].append(r["recalled"]) + print(f"\nrecall@{args.top_k} by question_type:") + for qt in sorted(by_type): + v = by_type[qt] + print(f" {qt:28} {sum(v)}/{len(v)} ({sum(v) / len(v) * 100:.1f}%)") + tot = [r["recalled"] for r in results] + print(f" {'OVERALL':28} {sum(tot)}/{len(tot)} " + f"({sum(tot) / len(tot) * 100:.1f}%)" if tot else " (no results)") + + # ALL-session coverage@k: did the top-k cover EVERY gold evidence session? + # (multi-hop completeness — the ANY-session recall above hides this) + cby: dict[str, list[bool]] = defaultdict(list) + for r in results: + cby[r["question_type"]].append(r.get("all_covered", False)) + print(f"\nALL-session coverage@{args.top_k} (every gold session in top-k — multi-hop completeness):") + for qt in sorted(cby): + v = cby[qt] + print(f" {qt:28} {sum(v)}/{len(v)} ({sum(v) / len(v) * 100:.1f}%)") + ctot = [r.get("all_covered", False) for r in results] + print(f" {'OVERALL':28} {sum(ctot)}/{len(ctot)} " + f"({sum(ctot) / len(ctot) * 100:.1f}%)" if ctot else " (no results)") + + # Fact-level recall@k by question type: did the gold answer string actually + # reach the reader? This is the number that tracks QA (session recall does not). + fby: dict[str, list[bool]] = defaultdict(list) + for r in results: + fby[r["question_type"]].append(r.get("fact_in_context", False)) + print(f"\nFACT-level recall@{args.top_k} (gold answer present in a retrieved excerpt):") + for qt in sorted(fby): + v = fby[qt] + print(f" {qt:28} {sum(v)}/{len(v)} ({sum(v) / len(v) * 100:.1f}%)") + ftot = [r.get("fact_in_context", False) for r in results] + print(f" {'OVERALL':28} {sum(ftot)}/{len(ftot)} " + f"({sum(ftot) / len(ftot) * 100:.1f}%)" if ftot else " (no results)") + + def _pct(xs: list, q: float): + return xs[min(len(xs) - 1, int(len(xs) * q))] if xs else 0 + + # Token efficiency: the size of the context Loom hands the reader per query. + toks = sorted(r.get("ctx_tokens", 0) for r in results) + tok_median = _pct(toks, 0.5) + tok_mean = round(sum(toks) / len(toks)) if toks else 0 + mem_mean = sum(r.get("n_hits", 0) for r in results) // max(1, len(results)) + print(f"\nTOKEN efficiency (context served to reader, ~4 chars/token):" + f"\n median {tok_median} tok/query mean {tok_mean} (~{mem_mean} memories/query)") + + # HyDE fallback firing rate (recall-rescue LLM call; fires only on a weak top hit). + hyde_n = sum(1 for r in results if r.get("hyde_fired")) + print(f"\nHyDE fallback fired on {hyde_n}/{len(results)} " + f"({hyde_n / len(results) * 100:.1f}%) queries" if results else "") + + # In-run search latency is measured UNDER concurrent-ingest load, which + # inflates it — reported separately from the clean number below. + ld = sorted(r.get("search_ms_loaded", 0.0) for r in results) + print(f"\nIn-run search latency UNDER LOAD (concurrent ingest — not comparable): " + f"p50 {_pct(ld, 0.5):.0f}ms p95 {_pct(ld, 0.95):.0f}ms") + + # Clean serving latency: re-search every question one-at-a-time on the now- + # quiesced server (no concurrent ingest) — the true single-query serving + # latency. Namespaces persist after the run. + clean: list[float] = [] + if args.measure_latency and results: + print(f"\nmeasuring CLEAN serving latency over {len(results)} quiesced searches...", flush=True) + async with httpx.AsyncClient(timeout=120.0) as lc: + for r in results: + sb = {"org": "dev", "namespace": r["ns"], "agent": "lme-loom", "user_id": "-", + "query": r["query"], "top_k": args.top_k, "search_mode": args.search_mode, + "alpha": 0.5, "include_top_n_unmatched": 120} + if r.get("q_iso"): + sb["observation_date"] = r["q_iso"] + if args.retrieval_budget: # measure the same path the run used + sb["retrieval_budget"] = args.retrieval_budget + t0 = time.perf_counter() + try: + await _post(lc, args.base_url + "/v1/memory.search", sb, args.token) + except httpx.HTTPError: + continue + clean.append((time.perf_counter() - t0) * 1000.0) + clean.sort() + if clean: + print(f"CLEAN serving latency (quiesced, 1 query at a time): " + f"p50 {_pct(clean, 0.5):.0f}ms p95 {_pct(clean, 0.95):.0f}ms min {clean[0]:.0f}ms") + + if args.metrics_out and results: + n = len(results) + metrics = { + "n_questions": n, "top_k": args.top_k, "answer_model": args.answer_model, + "recall_session_pct": round(sum(r["recalled"] for r in results) / n * 100, 1), + "recall_allsession_pct": round(sum(r.get("all_covered", False) for r in results) / n * 100, 1), + "recall_fact_pct": round(sum(r.get("fact_in_context", False) for r in results) / n * 100, 1), + "ctx_tokens_median": tok_median, "ctx_tokens_mean": tok_mean, + "hyde_fired_pct": round(hyde_n / n * 100, 1), + "latency_loaded_p50_ms": round(_pct(ld, 0.5)), "latency_loaded_p95_ms": round(_pct(ld, 0.95)), + "latency_clean_p50_ms": round(_pct(clean, 0.5)) if clean else None, + "latency_clean_p95_ms": round(_pct(clean, 0.95)) if clean else None, + } + Path(args.metrics_out).write_text(json.dumps(metrics, indent=2)) + print(f"\nwrote metrics -> {args.metrics_out}") + + print(f"\nwrote {out_path}\nNow grade with the official judge:\n" + f" python src/evaluation/evaluate_qa.py gpt-4o {out_path} {args.dataset}") + return 0 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/src/evaluation/evaluate_qa.py b/src/evaluation/evaluate_qa.py index 4732f37..ea92490 100644 --- a/src/evaluation/evaluate_qa.py +++ b/src/evaluation/evaluate_qa.py @@ -1,6 +1,8 @@ import os +import re import sys import json +import argparse from tqdm import tqdm import backoff import openai @@ -12,123 +14,125 @@ 'llama-3.1-70b-instruct': ('meta-llama/Meta-Llama-3.1-70B-Instruct', 'local'), 'gpt-4o-mini': ('gpt-4o-mini-2024-07-18', 'openai'), 'gpt-4o': ('gpt-4o-2024-08-06', 'openai'), + 'gpt-5': ('gpt-5', 'openai'), } -@backoff.on_exception(backoff.expo, (openai.RateLimitError, - openai.APIError)) +@backoff.on_exception(backoff.expo, (openai.RateLimitError, openai.APIError)) def chat_completions_with_backoff(client, **kwargs): return client.chat.completions.create(**kwargs) +_SEMANTIC = ( + " Judge by meaning, not exact wording: a paraphrase or different vocabulary" + " conveying the same fact is correct, and a response that states the answer" + " more specifically or more precisely is correct. A response that gives the" + " correct answer plus extra correct detail is correct unless the extra detail" + " is wrong. A response that omits the required fact, gives only a subset of" + " it, or contradicts it, is incorrect." +) + + def get_anscheck_prompt(task, question, answer, response, abstention=False): - if not abstention: - if task in ['single-session-user', 'single-session-assistant', 'multi-session']: - template = "I will give you a question, a correct answer, and a response from a model. Please answer yes if the response contains the correct answer. Otherwise, answer no. If the response is equivalent to the correct answer or contains all the intermediate steps to get the correct answer, you should also answer yes. If the response only contains a subset of the information required by the answer, answer no. \n\nQuestion: {}\n\nCorrect Answer: {}\n\nModel Response: {}\n\nIs the model response correct? Answer yes or no only." - prompt = template.format(question, answer, response) - elif task == 'temporal-reasoning': - template = "I will give you a question, a correct answer, and a response from a model. Please answer yes if the response contains the correct answer. Otherwise, answer no. If the response is equivalent to the correct answer or contains all the intermediate steps to get the correct answer, you should also answer yes. If the response only contains a subset of the information required by the answer, answer no. In addition, do not penalize off-by-one errors for the number of days. If the question asks for the number of days/weeks/months, etc., and the model makes off-by-one errors (e.g., predicting 19 days when the answer is 18), the model's response is still correct. \n\nQuestion: {}\n\nCorrect Answer: {}\n\nModel Response: {}\n\nIs the model response correct? Answer yes or no only." - prompt = template.format(question, answer, response) - elif task == 'knowledge-update': - template = "I will give you a question, a correct answer, and a response from a model. Please answer yes if the response contains the correct answer. Otherwise, answer no. If the response contains some previous information along with an updated answer, the response should be considered as correct as long as the updated answer is the required answer.\n\nQuestion: {}\n\nCorrect Answer: {}\n\nModel Response: {}\n\nIs the model response correct? Answer yes or no only." - prompt = template.format(question, answer, response) - elif task == 'single-session-preference': - template = "I will give you a question, a rubric for desired personalized response, and a response from a model. Please answer yes if the response satisfies the desired response. Otherwise, answer no. The model does not need to reflect all the points in the rubric. The response is correct as long as it recalls and utilizes the user's personal information correctly.\n\nQuestion: {}\n\nRubric: {}\n\nModel Response: {}\n\nIs the model response correct? Answer yes or no only." - prompt = template.format(question, answer, response) - else: - raise NotImplementedError + if abstention: + template = ("I will give you an unanswerable question, an explanation, and a response" + " from a model. Answer yes if the model identifies the question as" + " unanswerable — saying the information is incomplete or that the asked" + " information is not available counts." + "\n\nQuestion: {}\n\nExplanation: {}\n\nModel Response: {}\n\n" + "Does the model correctly identify the question as unanswerable? Answer yes or no only.") + return template.format(question, answer, response) + + if task in ('single-session-user', 'single-session-assistant', 'multi-session'): + template = ("I will give you a question, a correct answer, and a response from a model." + " Answer yes if the response conveys the correct answer." + _SEMANTIC + + "\n\nQuestion: {}\n\nCorrect Answer: {}\n\nModel Response: {}\n\n" + "Is the model response correct? Answer yes or no only.") + elif task == 'temporal-reasoning': + template = ("I will give you a question, a correct answer, and a response from a model." + " Answer yes if the response conveys the correct answer. Do not penalize" + " off-by-one errors in a count of days/weeks/months." + _SEMANTIC + + "\n\nQuestion: {}\n\nCorrect Answer: {}\n\nModel Response: {}\n\n" + "Is the model response correct? Answer yes or no only.") + elif task == 'knowledge-update': + template = ("I will give you a question, a correct answer, and a response from a model." + " Answer yes if the response gives the correct, updated answer; mentioning" + " the earlier or outdated value alongside it is fine as long as the updated" + " answer is present." + _SEMANTIC + + "\n\nQuestion: {}\n\nCorrect Answer: {}\n\nModel Response: {}\n\n" + "Is the model response correct? Answer yes or no only.") + elif task == 'single-session-preference': + template = ("I will give you a question, a rubric for the desired personalized response," + " and a response from a model. Answer yes if the response recalls and uses" + " the user's personal information correctly; it need not cover every point in" + " the rubric." + "\n\nQuestion: {}\n\nRubric: {}\n\nModel Response: {}\n\n" + "Is the model response correct? Answer yes or no only.") else: - template = "I will give you an unanswerable question, an explanation, and a response from a model. Please answer yes if the model correctly identifies the question as unanswerable. The model could say that the information is incomplete, or some other information is given but the asked information is not.\n\nQuestion: {}\n\nExplanation: {}\n\nModel Response: {}\n\nDoes the model correctly identify the question as unanswerable? Answer yes or no only." - prompt = template.format(question, answer, response) - return prompt + raise NotImplementedError + return template.format(question, answer, response) + + +def judge(client, model, prompt): + kwargs = dict(client=client, model=model, n=1, + messages=[{"role": "user", "content": prompt}]) + # Reasoning models (gpt-5, o-series) reject temperature/max_tokens and need + # headroom for reasoning tokens; non-reasoning models stay byte-identical. + if not re.match(r"^(gpt-5|o[1-9])", model): + kwargs.update(temperature=0, max_tokens=10) + completion = chat_completions_with_backoff(**kwargs) + return 'yes' in (completion.choices[0].message.content or '').strip().lower() if __name__ == '__main__': - if len(sys.argv) != 4: - print('Usage: python evaluate_qa.py metric_model hyp_file ref_file') - exit() - - metric_model_short = sys.argv[1] - hyp_file = sys.argv[2] - ref_file = sys.argv[3] - verbose = True - - result_file = hyp_file + '.eval-results-{}'.format(metric_model_short) - - if metric_model_short not in model_zoo: - print('Requested metric model is not supported:', metric_model_short) - exit() - metric_model, metric_model_source = model_zoo[metric_model_short] - if metric_model_source == 'openai': + ap = argparse.ArgumentParser(description='LongMemEval QA judge.') + ap.add_argument('metric_model', help='judge model: ' + ', '.join(model_zoo)) + ap.add_argument('hyp_file', help='hypotheses JSONL ({question_id, hypothesis} per line)') + ap.add_argument('ref_file', help='reference dataset JSON (question_id, question, answer, question_type)') + args = ap.parse_args() + + if args.metric_model not in model_zoo: + print('Requested metric model is not supported:', args.metric_model) + sys.exit(1) + metric_model, source = model_zoo[args.metric_model] + if source == 'openai': openai.organization = os.getenv('OPENAI_ORGANIZATION') - openai_api_key = os.getenv('OPENAI_API_KEY') - openai_api_base = None + client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) else: - openai_api_key = "EMPTY" - openai_api_base = "http://localhost:8001/v1" - - metric_client = OpenAI( - api_key=openai_api_key, - base_url=openai_api_base, - ) + client = OpenAI(api_key='EMPTY', base_url='http://localhost:8001/v1') try: - hypotheses = [json.loads(line) for line in open(hyp_file).readlines()] - except: - hypotheses = json.load(open(hyp_file)) + hypotheses = [json.loads(line) for line in open(args.hyp_file).readlines()] + except json.JSONDecodeError: + hypotheses = json.load(open(args.hyp_file)) try: - references = json.load(open(ref_file)) - except: - references = [json.loads(line) for line in open(ref_file).readlines()] - qid2qdata = {entry['question_id']: entry for entry in references} - qid2qtype = {entry['question_id']: entry['question_type'] for entry in references} - qtypes = set(list(qid2qtype.values())) - qtype2acc = {t: [] for t in qtypes} + references = json.load(open(args.ref_file)) + except json.JSONDecodeError: + references = [json.loads(line) for line in open(args.ref_file).readlines()] + qid2qdata = {e['question_id']: e for e in references} + qid2qtype = {e['question_id']: e['question_type'] for e in references} + qtype2acc = {t: [] for t in set(qid2qtype.values())} + overall = [] + result_file = '{}.eval-results-{}'.format(args.hyp_file, args.metric_model) with open(result_file, 'w') as out_f: - logs = [] for entry in tqdm(hypotheses): - - if entry['question_id'] not in qid2qtype: - print('Warning: skipping {} as it is not in reference data.'.format(entry['question_id'])) + qid = entry['question_id'] + if qid not in qid2qtype: continue - - qtype = qid2qtype[entry['question_id']] - q = qid2qdata[entry['question_id']]['question'] - ans = qid2qdata[entry['question_id']]['answer'] - hyp = entry['hypothesis'] - - prompt = get_anscheck_prompt(qtype, q, ans, hyp, abstention='_abs' in entry['question_id']) - kwargs = { - 'model': metric_model, - 'messages':[ - {"role": "user", "content": prompt} - ], - 'n': 1, - 'temperature': 0, - 'max_tokens': 10 - } - completion = chat_completions_with_backoff(metric_client, **kwargs) - eval_response = completion.choices[0].message.content.strip() - label = 'yes' in eval_response.lower() - entry['autoeval_label'] = { - 'model': metric_model, - 'label': label - } - logs.append(entry) - if verbose: - print(json.dumps({ - 'question': q, - 'answer': ans, - 'hypothesis': hyp, - 'autoeval_label': label - }, indent=4), flush=True) + qtype = qid2qtype[qid] + prompt = get_anscheck_prompt( + qtype, qid2qdata[qid]['question'], qid2qdata[qid]['answer'], + entry['hypothesis'], abstention='_abs' in qid, + ) + label = judge(client, metric_model, prompt) + entry = {**entry, 'autoeval_label': {'model': metric_model, 'label': label}} print(json.dumps(entry), file=out_f) - qtype2acc[qid2qtype[entry['question_id']]].append(1 if label else 0) - - - print('Accuracy:', round(np.mean([1 if x['autoeval_label']['label'] else 0 for x in logs]).item(), 4)) - for k,v in qtype2acc.items(): - print('\t{}: {} ({})'.format(k, round(np.mean(v), 4), len(v))) + qtype2acc[qtype].append(1 if label else 0) + overall.append(1 if label else 0) + print('Accuracy:', round(float(np.mean(overall)), 4)) + for k, v in sorted(qtype2acc.items()): + if v: + print('\t{}: {} ({})'.format(k, round(float(np.mean(v)), 4), len(v))) print('Saved to', result_file)