From c1536ab1cac0cfd45df839d2f906497199f8faf0 Mon Sep 17 00:00:00 2001 From: Jacob Ioffe Date: Wed, 4 Mar 2026 20:34:26 +0000 Subject: [PATCH 1/3] Add dataset recall adapters for earnings and FinanceBench - normalize page-based CSVs into pdf_page recall keys - add financebench JSON adapter with pdf_only matching mode - recurse input globs so nested dataset directories ingest cleanly Signed-off-by: Jacob Ioffe Made-with: Cursor --- nemo_retriever/README.md | 7 +- nemo_retriever/harness/HANDOFF.md | 6 ++ nemo_retriever/harness/nightly_config.yaml | 9 ++ nemo_retriever/harness/test_configs.yaml | 32 +++++++ .../nemo_retriever/examples/batch_pipeline.py | 64 ++++++++++--- .../src/nemo_retriever/harness/config.py | 11 +++ .../nemo_retriever/harness/recall_adapters.py | 89 +++++++++++++++++++ .../src/nemo_retriever/harness/run.py | 18 +++- .../src/nemo_retriever/recall/core.py | 74 ++++++++++++--- nemo_retriever/tests/test_harness_config.py | 60 +++++++++++++ .../tests/test_harness_recall_adapters.py | 36 ++++++++ nemo_retriever/tests/test_harness_run.py | 32 ++++++- nemo_retriever/tests/test_recall_core.py | 21 +++++ 13 files changed, 427 insertions(+), 32 deletions(-) create mode 100644 nemo_retriever/src/nemo_retriever/harness/recall_adapters.py create mode 100644 nemo_retriever/tests/test_harness_recall_adapters.py create mode 100644 nemo_retriever/tests/test_recall_core.py diff --git a/nemo_retriever/README.md b/nemo_retriever/README.md index a6af12cf3..0e47962b6 100644 --- a/nemo_retriever/README.md +++ b/nemo_retriever/README.md @@ -50,7 +50,7 @@ uv pip install -e ./nemo_retriever uv run python nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py /path/to/pdfs ``` -Pass the directory that contains your PDFs as the first argument (`input-dir`). For recall evaluation, the pipeline uses `bo767_query_gt.csv` in the current directory by default; override with `--query-csv `. Recall is skipped if the query CSV file does not exist. By default, per-query details (query, gold, hits) are printed; use `--no-recall-details` to print only the missed-gold summary and recall metrics. To use an existing Ray cluster, pass `--ray-address auto`. If OCR fails with a missing `libcudart.so.13`, install the CUDA 13 runtime and set `LD_LIBRARY_PATH` as shown above. +Pass the directory that contains your PDFs as the first argument (`input-dir`). For recall evaluation, the pipeline uses `bo767_query_gt.csv` in the current directory by default; override with `--query-csv `. For document-level recall, use `--recall-match-mode pdf_only` with `query,expected_pdf` data. Recall is skipped if the query file does not exist. By default, per-query details (query, gold, hits) are printed; use `--no-recall-details` to print only the missed-gold summary and recall metrics. To use an existing Ray cluster, pass `--ray-address auto`. If OCR fails with a missing `libcudart.so.13`, install the CUDA 13 runtime and set `LD_LIBRARY_PATH` as shown above. For **HTML** or **text** ingestion, use `--input-type html` or `--input-type txt` with the same examples (e.g. `batch_pipeline.py --input-type html`). HTML files are converted to markdown via markitdown, then chunked with the same tokenizer as .txt. Staged CLI: `retriever html run --input-dir ` writes `*.html_extraction.json`; then `retriever local stage5 run --input-dir --pattern "*.html_extraction.json"` and `retriever local stage6 run --input-dir `. @@ -64,6 +64,11 @@ For **HTML** or **text** ingestion, use `--input-type html` or `--input-type txt - CLI entrypoint is nested under `retriever harness`. - First pass is LanceDB-only and enforces recall-required pass/fail by default. - Single-run artifact directories default to `_`. +- Dataset-specific recall adapters are supported via config: + - `recall_adapter: none` (default passthrough) + - `recall_adapter: page_plus_one` (convert zero-indexed `page` CSVs to `pdf_page`) + - `recall_adapter: financebench_json` (convert FinanceBench JSON to `query,expected_pdf`) + - `recall_match_mode: pdf_page|pdf_only` controls recall matching mode. ### Single run diff --git a/nemo_retriever/harness/HANDOFF.md b/nemo_retriever/harness/HANDOFF.md index 0f95586a5..ef3b0a2e8 100644 --- a/nemo_retriever/harness/HANDOFF.md +++ b/nemo_retriever/harness/HANDOFF.md @@ -24,6 +24,8 @@ It captures what exists now, what was intentionally chosen, and what to iterate - Stream parsing for ingest/throughput/recall metrics. - `nemo_retriever/src/nemo_retriever/harness/artifacts.py` - Artifact/session directory creation and session summary writing. +- `nemo_retriever/src/nemo_retriever/harness/recall_adapters.py` + - Dataset-specific query normalization adapters for recall inputs. - `nemo_retriever/harness/test_configs.yaml` - Active defaults, presets, dataset presets. - `nemo_retriever/harness/nightly_config.yaml` @@ -36,6 +38,10 @@ It captures what exists now, what was intentionally chosen, and what to iterate - Two main presets are available: - `single_gpu` - `dgx_8gpu` +- Adapter-capable datasets: + - `earnings` uses `recall_adapter: page_plus_one` (`page` -> `pdf_page` conversion). + - `bo10k` wiring is included (adapter + mode), with recall disabled by default until query path is set. + - `financebench` wiring is included for `pdf_only` matching with `financebench_json` adapter, with recall disabled by default until query path is set. ## Current CLI Usage diff --git a/nemo_retriever/harness/nightly_config.yaml b/nemo_retriever/harness/nightly_config.yaml index c67168e71..4e4c37f9b 100644 --- a/nemo_retriever/harness/nightly_config.yaml +++ b/nemo_retriever/harness/nightly_config.yaml @@ -10,3 +10,12 @@ runs: # - name: bo767_single # dataset: bo767 # preset: single_gpu + # - name: earnings_single + # dataset: earnings + # preset: single_gpu + # - name: financebench_single + # dataset: financebench + # preset: single_gpu + # - name: bo10k_single + # dataset: bo10k + # preset: single_gpu diff --git a/nemo_retriever/harness/test_configs.yaml b/nemo_retriever/harness/test_configs.yaml index 31833a3a1..80b5e2e8f 100644 --- a/nemo_retriever/harness/test_configs.yaml +++ b/nemo_retriever/harness/test_configs.yaml @@ -6,6 +6,8 @@ active: query_csv: data/jp20_query_gt.csv input_type: pdf recall_required: true + recall_match_mode: pdf_page + recall_adapter: none artifacts_dir: null ray_address: null lancedb_uri: lancedb @@ -68,3 +70,33 @@ datasets: query_csv: data/jp20_query_gt.csv input_type: pdf recall_required: true + recall_match_mode: pdf_page + recall_adapter: none + + earnings: + path: /datasets/nv-ingest/earnings_consulting + query_csv: data/earnings_consulting_multimodal.csv + input_type: pdf + recall_required: true + recall_match_mode: pdf_page + recall_adapter: page_plus_one + + # Wiring for bo10k-style CSVs with zero-based page numbers. + # Keep recall optional by default until a local query CSV path is configured. + bo10k: + path: /datasets/nv-ingest/bo10k + query_csv: null + input_type: pdf + recall_required: false + recall_match_mode: pdf_page + recall_adapter: page_plus_one + + # FinanceBench is document-level recall. Provide a JSON ground-truth file path + # via query_csv when enabling recall for this dataset. + financebench: + path: /datasets/nv-ingest/financebench + query_csv: null + input_type: pdf + recall_required: false + recall_match_mode: pdf_only + recall_adapter: financebench_json diff --git a/nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py b/nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py index af41b2845..94cc81cc2 100644 --- a/nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py +++ b/nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py @@ -315,11 +315,24 @@ def _gold_to_doc_page(golden_key: str) -> tuple[str, str]: return doc, page -def _is_hit_at_k(golden_key: str, retrieved_keys: list[str], k: int) -> bool: +def _extract_doc_from_key(key: str) -> str: + s = str(key) + if "_" not in s: + return s + doc, _page = s.rsplit("_", 1) + return doc + + +def _is_hit_at_k(golden_key: str, retrieved_keys: list[str], k: int, *, match_mode: str) -> bool: + top = (retrieved_keys or [])[: int(k)] + if match_mode == "pdf_only": + gold_doc = str(golden_key).replace(".pdf", "") + top_docs = [_extract_doc_from_key(key) for key in top] + return gold_doc in top_docs + doc, page = _gold_to_doc_page(golden_key) specific_page = f"{doc}_{page}" entire_document = f"{doc}_-1" - top = (retrieved_keys or [])[: int(k)] return (specific_page in top) or (entire_document in top) @@ -375,6 +388,11 @@ def main( "(current directory). Recall is skipped if the file does not exist." ), ), + recall_match_mode: str = typer.Option( + "pdf_page", + "--recall-match-mode", + help="Recall match mode: 'pdf_page' or 'pdf_only'.", + ), no_recall_details: bool = typer.Option( False, "--no-recall-details", @@ -589,6 +607,9 @@ def main( ) -> None: log_handle, original_stdout, original_stderr = _configure_logging(log_file) try: + if recall_match_mode not in {"pdf_page", "pdf_only"}: + raise ValueError(f"Unsupported --recall-match-mode: {recall_match_mode}") + os.environ["RAY_LOG_TO_DRIVER"] = "1" if ray_log_to_driver else "0" # Use an absolute path so driver and Ray actors resolve the same LanceDB URI. lancedb_uri = str(Path(lancedb_uri).expanduser().resolve()) @@ -617,7 +638,7 @@ def main( input_dir = Path(input_dir) if input_type == "txt": - glob_pattern = str(input_dir / "*.txt") + glob_pattern = str(input_dir / "**" / "*.txt") if input_dir.is_dir() else str(input_dir) ingestor = create_ingestor( run_mode="batch", params=IngestorCreateParams(ray_address=ray_address, ray_log_to_driver=ray_log_to_driver), @@ -648,7 +669,7 @@ def main( ) ) elif input_type == "html": - glob_pattern = str(input_dir / "*.html") + glob_pattern = str(input_dir / "**" / "*.html") if input_dir.is_dir() else str(input_dir) ingestor = create_ingestor( run_mode="batch", params=IngestorCreateParams(ray_address=ray_address, ray_log_to_driver=ray_log_to_driver), @@ -680,7 +701,10 @@ def main( ) elif input_type == "doc": # DOCX/PPTX: same pipeline as PDF; DocToPdfConversionActor converts before split. - doc_globs = [str(input_dir / "*.docx"), str(input_dir / "*.pptx")] + if input_dir.is_dir(): + doc_globs = [str(input_dir / "**" / "*.docx"), str(input_dir / "**" / "*.pptx")] + else: + doc_globs = [str(input_dir)] ingestor = create_ingestor( run_mode="batch", params=IngestorCreateParams(ray_address=ray_address, ray_log_to_driver=ray_log_to_driver), @@ -743,7 +767,7 @@ def main( ) ) else: - pdf_glob = str(input_dir / "*.pdf") + pdf_glob = str(input_dir / "**" / "*.pdf") if input_dir.is_dir() else str(input_dir) ingestor = create_ingestor( run_mode="batch", params=IngestorCreateParams(ray_address=ray_address, ray_log_to_driver=ray_log_to_driver), @@ -875,6 +899,7 @@ def main( top_k=10, ks=(1, 5, 10), hybrid=hybrid, + match_mode=recall_match_mode, ) _df_query, _gold, _raw_hits, _retrieved_keys, metrics = retrieve_and_score(query_csv=query_csv, cfg=cfg) @@ -894,7 +919,10 @@ def main( _raw_hits, ) ): - doc, page = _gold_to_doc_page(g) + if recall_match_mode == "pdf_only": + doc, page = str(g), "" + else: + doc, page = _gold_to_doc_page(g) scored_hits: list[tuple[str, float | None]] = [] for h in hits: @@ -903,11 +931,14 @@ def main( scored_hits.append((key, dist)) top_keys = [k for (k, _d) in scored_hits] - hit = _is_hit_at_k(g, top_keys, cfg.top_k) + hit = _is_hit_at_k(g, top_keys, cfg.top_k, match_mode=recall_match_mode) if not no_recall_details: print(f"\nQuery {i}: {q}") - print(f" Gold: {g} (file: {doc}{ext}, page: {page})") + if recall_match_mode == "pdf_only": + print(f" Gold: {g} (file: {doc}{ext})") + else: + print(f" Gold: {g} (file: {doc}{ext}, page: {page})") print(f" Hit@{cfg.top_k}: {hit}") print(" Top hits:") if not scored_hits: @@ -920,15 +951,24 @@ def main( print(f" {rank:02d}. {key} distance={dist:.6f}") if not hit: - missed_gold.append((f"{doc}{ext}", str(page))) + if recall_match_mode == "pdf_only": + missed_gold.append((f"{doc}{ext}", "")) + else: + missed_gold.append((f"{doc}{ext}", str(page))) missed_unique = sorted(set(missed_gold), key=lambda x: (x[0], x[1])) - print("\nMissed gold (unique doc/page):") + if recall_match_mode == "pdf_only": + print("\nMissed gold (unique docs):") + else: + print("\nMissed gold (unique doc/page):") if not missed_unique: print(" (none)") else: for doc_page, page in missed_unique: - print(f" {doc_page} page {page}") + if recall_match_mode == "pdf_only": + print(f" {doc_page}") + else: + print(f" {doc_page} page {page}") print(f"\nTotal missed: {len(missed_unique)} / {len(_gold)}") print("\nRecall metrics (matching nemo_retriever.recall.core):") diff --git a/nemo_retriever/src/nemo_retriever/harness/config.py b/nemo_retriever/src/nemo_retriever/harness/config.py index 3a88b6ad4..0be2a15aa 100644 --- a/nemo_retriever/src/nemo_retriever/harness/config.py +++ b/nemo_retriever/src/nemo_retriever/harness/config.py @@ -15,6 +15,7 @@ REPO_ROOT = NEMO_RETRIEVER_ROOT.parent DEFAULT_TEST_CONFIG_PATH = NEMO_RETRIEVER_ROOT / "harness" / "test_configs.yaml" DEFAULT_NIGHTLY_CONFIG_PATH = NEMO_RETRIEVER_ROOT / "harness" / "nightly_config.yaml" +VALID_RECALL_ADAPTERS = {"none", "page_plus_one", "financebench_json"} TUNING_FIELDS = { "pdf_extract_workers", @@ -45,6 +46,8 @@ class HarnessConfig: query_csv: str | None = None input_type: str = "pdf" recall_required: bool = True + recall_match_mode: str = "pdf_page" + recall_adapter: str = "none" artifacts_dir: str | None = None ray_address: str | None = None @@ -86,6 +89,12 @@ def validate(self) -> list[str]: if self.input_type not in {"pdf", "txt", "html", "doc"}: errors.append(f"input_type must be one of pdf/txt/html/doc, got '{self.input_type}'") + if self.recall_match_mode not in {"pdf_page", "pdf_only"}: + errors.append("recall_match_mode must be one of pdf_page/pdf_only") + + if self.recall_adapter not in VALID_RECALL_ADAPTERS: + errors.append(f"recall_adapter must be one of {sorted(VALID_RECALL_ADAPTERS)}") + for name in TUNING_FIELDS: val = getattr(self, name) if name.startswith("gpu_") and float(val) < 0.0: @@ -143,6 +152,8 @@ def _apply_env_overrides(config_dict: dict[str, Any]) -> None: "HARNESS_QUERY_CSV": ("query_csv", str), "HARNESS_INPUT_TYPE": ("input_type", str), "HARNESS_RECALL_REQUIRED": ("recall_required", _parse_bool), + "HARNESS_RECALL_MATCH_MODE": ("recall_match_mode", str), + "HARNESS_RECALL_ADAPTER": ("recall_adapter", str), "HARNESS_ARTIFACTS_DIR": ("artifacts_dir", str), "HARNESS_RAY_ADDRESS": ("ray_address", str), "HARNESS_LANCEDB_URI": ("lancedb_uri", str), diff --git a/nemo_retriever/src/nemo_retriever/harness/recall_adapters.py b/nemo_retriever/src/nemo_retriever/harness/recall_adapters.py new file mode 100644 index 000000000..8e11edcdd --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/harness/recall_adapters.py @@ -0,0 +1,89 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import json +from pathlib import Path + +import pandas as pd + +from nemo_retriever.harness.config import VALID_RECALL_ADAPTERS + + +def _normalize_pdf_name(value: object) -> str: + return str(value).replace(".pdf", "") + + +def _adapt_page_plus_one(query_csv: Path, output_csv: Path) -> Path: + df = pd.read_csv(query_csv) + if "gt_page" in df.columns and "page" not in df.columns: + df = df.rename(columns={"gt_page": "page"}) + + required = {"query", "pdf", "page"} + missing = required.difference(df.columns) + if missing: + raise ValueError( + "page_plus_one adapter requires ['query','pdf','page'] columns " + f"(missing: {sorted(missing)}) in {query_csv}" + ) + + page_numbers = pd.to_numeric(df["page"], errors="raise").astype(int) + 1 + normalized = pd.DataFrame( + { + "query": df["query"].astype(str), + "pdf_page": [_normalize_pdf_name(pdf) + f"_{page}" for pdf, page in zip(df["pdf"], page_numbers)], + } + ) + normalized.to_csv(output_csv, index=False) + return output_csv + + +def _adapt_financebench_json(query_json: Path, output_csv: Path) -> Path: + payload = json.loads(query_json.read_text(encoding="utf-8")) + if not isinstance(payload, list): + raise ValueError(f"financebench_json adapter expects a JSON list in {query_json}") + + rows: list[dict[str, str]] = [] + for item in payload: + if not isinstance(item, dict): + continue + question = item.get("question") + contexts = item.get("contexts") + if not isinstance(question, str) or not question.strip(): + continue + if not isinstance(contexts, list) or not contexts: + continue + context0 = contexts[0] + if not isinstance(context0, dict): + continue + filename = context0.get("filename") + if not isinstance(filename, str) or not filename.strip(): + continue + rows.append({"query": question, "expected_pdf": _normalize_pdf_name(filename)}) + + if not rows: + raise ValueError(f"financebench_json adapter found no valid rows in {query_json}") + + pd.DataFrame(rows).to_csv(output_csv, index=False) + return output_csv + + +def prepare_recall_query_file(*, query_csv: Path | None, recall_adapter: str, output_dir: Path) -> Path: + output_dir.mkdir(parents=True, exist_ok=True) + if query_csv is None: + return output_dir / "__query_csv_missing__.csv" + + adapter = str(recall_adapter or "none").strip().lower() + if adapter not in VALID_RECALL_ADAPTERS: + raise ValueError(f"Unknown recall adapter '{recall_adapter}'. Valid adapters: {sorted(VALID_RECALL_ADAPTERS)}") + + source = Path(query_csv) + if adapter == "none": + return source + + if adapter == "page_plus_one": + return _adapt_page_plus_one(source, output_dir / "query_adapter.page_plus_one.csv") + + return _adapt_financebench_json(source, output_dir / "query_adapter.financebench_json.csv") diff --git a/nemo_retriever/src/nemo_retriever/harness/run.py b/nemo_retriever/src/nemo_retriever/harness/run.py index 8e4b98351..62838a1b2 100644 --- a/nemo_retriever/src/nemo_retriever/harness/run.py +++ b/nemo_retriever/src/nemo_retriever/harness/run.py @@ -34,6 +34,7 @@ load_runs_config, ) from nemo_retriever.harness.parsers import StreamMetrics +from nemo_retriever.harness.recall_adapters import prepare_recall_query_file ANSI_ESCAPE_RE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") @@ -48,7 +49,7 @@ def _resolve_lancedb_uri(cfg: HarnessConfig, artifact_dir: Path) -> str: return str(p) -def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple[list[str], Path, Path]: +def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple[list[str], Path, Path, Path]: runtime_dir = artifact_dir / "runtime_metrics" runtime_dir.mkdir(parents=True, exist_ok=True) if cfg.write_detection_file: @@ -56,7 +57,11 @@ def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple else: # Keep detection summary out of top-level artifacts unless explicitly requested. detection_summary_file = runtime_dir / ".detection_summary.json" - query_csv = Path(cfg.query_csv) if cfg.query_csv else (artifact_dir / "__query_csv_missing__.csv") + query_csv = prepare_recall_query_file( + query_csv=Path(cfg.query_csv) if cfg.query_csv else None, + recall_adapter=cfg.recall_adapter, + output_dir=runtime_dir, + ) cmd = [ sys.executable, @@ -67,6 +72,8 @@ def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple cfg.input_type, "--query-csv", str(query_csv), + "--recall-match-mode", + cfg.recall_match_mode, "--no-recall-details", "--pdf-extract-workers", str(cfg.pdf_extract_workers), @@ -117,7 +124,7 @@ def _build_command(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> tuple if cfg.hybrid: cmd += ["--hybrid"] - return cmd, runtime_dir, detection_summary_file + return cmd, runtime_dir, detection_summary_file, query_csv def _evaluate_run_outcome( @@ -206,7 +213,7 @@ def _run_subprocess_with_tty(cmd: list[str], metrics: StreamMetrics) -> int: def _run_single(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> dict[str, Any]: - cmd, runtime_dir, detection_summary_file = _build_command(cfg, artifact_dir, run_id) + cmd, runtime_dir, detection_summary_file, effective_query_csv = _build_command(cfg, artifact_dir, run_id) command_text = " ".join(shlex.quote(token) for token in cmd) (artifact_dir / "command.txt").write_text(command_text + "\n", encoding="utf-8") @@ -243,8 +250,11 @@ def _run_single(cfg: HarnessConfig, artifact_dir: Path, run_id: str) -> dict[str "dataset_dir": cfg.dataset_dir, "preset": cfg.preset, "query_csv": cfg.query_csv, + "effective_query_csv": str(effective_query_csv), "input_type": cfg.input_type, "recall_required": cfg.recall_required, + "recall_match_mode": cfg.recall_match_mode, + "recall_adapter": cfg.recall_adapter, "ray_address": cfg.ray_address, "hybrid": cfg.hybrid, "embed_model_name": cfg.embed_model_name, diff --git a/nemo_retriever/src/nemo_retriever/recall/core.py b/nemo_retriever/src/nemo_retriever/recall/core.py index 9fc80d59f..35c900240 100644 --- a/nemo_retriever/src/nemo_retriever/recall/core.py +++ b/nemo_retriever/src/nemo_retriever/recall/core.py @@ -46,25 +46,52 @@ class RecallConfig: local_hf_device: Optional[str] = None local_hf_cache_dir: Optional[str] = None local_hf_batch_size: int = 64 + # Gold/retrieval comparison mode: + # - pdf_page: compare on "{pdf}_{page}" keys + # - pdf_only: compare on "{pdf}" document keys + match_mode: str = "pdf_page" -def _normalize_query_df(df: pd.DataFrame) -> pd.DataFrame: +def _normalize_pdf_name(value: str) -> str: + return str(value).replace(".pdf", "") + + +def _normalize_query_df(df: pd.DataFrame, *, match_mode: str) -> pd.DataFrame: """ Normalize a query CSV into: - query (string) - golden_answer (string key that should match LanceDB `pdf_page`) - Supported inputs: - - query,pdf_page - - query,pdf,page (or query,pdf,gt_page) + Supported inputs by match mode: + - pdf_page: + - query,pdf_page + - query,pdf,page (or query,pdf,gt_page) + - pdf_only: + - query,expected_pdf + - query,pdf """ + if match_mode not in {"pdf_page", "pdf_only"}: + raise ValueError(f"Unsupported recall match mode: {match_mode}") + df = df.copy() - if "gt_page" in df.columns and "page" not in df.columns: - df = df.rename(columns={"gt_page": "page"}) if "query" not in df.columns: raise KeyError("Query CSV must contain a 'query' column.") + if match_mode == "pdf_only": + if "expected_pdf" in df.columns: + df["golden_answer"] = df["expected_pdf"].astype(str).apply(_normalize_pdf_name) + return df + if "pdf" in df.columns: + df["golden_answer"] = df["pdf"].astype(str).apply(_normalize_pdf_name) + return df + raise KeyError( + "For pdf_only mode, query data must contain ['query','expected_pdf'] or ['query','pdf'] columns." + ) + + if "gt_page" in df.columns and "page" not in df.columns: + df = df.rename(columns={"gt_page": "page"}) + if "pdf_page" in df.columns: df["golden_answer"] = df["pdf_page"].astype(str) return df @@ -250,12 +277,24 @@ def _hits_to_keys(raw_hits: List[List[Dict[str, Any]]]) -> List[List[str]]: return retrieved_keys -def _is_hit(golden_key: str, retrieved: List[str], k: int) -> bool: +def _extract_doc_from_pdf_page(key: str) -> str: + parts = str(key).rsplit("_", 1) + if len(parts) != 2: + return str(key) + return parts[0] + + +def _is_hit(golden_key: str, retrieved: List[str], k: int, *, match_mode: str) -> bool: """Check if a golden key is found in the top-k retrieved keys. Handles filenames with underscores via ``rsplit`` and also accepts whole-document keys (page ``-1``). """ + if match_mode == "pdf_only": + gold_doc = _normalize_pdf_name(str(golden_key)) + top_docs = [_extract_doc_from_pdf_page(r) for r in retrieved[:k]] + return gold_doc in top_docs + parts = golden_key.rsplit("_", 1) if len(parts) != 2: return golden_key in retrieved[:k] @@ -266,8 +305,8 @@ def _is_hit(golden_key: str, retrieved: List[str], k: int) -> bool: return specific_page in top or entire_document in top -def _recall_at_k(gold: List[str], retrieved: List[List[str]], k: int) -> float: - hits = sum(_is_hit(g, r, k) for g, r in zip(gold, retrieved)) +def _recall_at_k(gold: List[str], retrieved: List[List[str]], k: int, *, match_mode: str) -> float: + hits = sum(_is_hit(g, r, k, match_mode=match_mode) for g, r in zip(gold, retrieved)) return hits / max(1, len(gold)) @@ -288,7 +327,7 @@ def retrieve_and_score( - retrieved keys (pdf_page-like) - metrics dict (recall@k) """ - df_query = _normalize_query_df(pd.read_csv(query_csv)) + df_query = _normalize_query_df(pd.read_csv(query_csv), match_mode=str(cfg.match_mode)) if limit is not None: df_query = df_query.head(int(limit)).copy() @@ -324,7 +363,9 @@ def retrieve_and_score( hybrid=bool(cfg.hybrid), ) retrieved_keys = _hits_to_keys(raw_hits) - metrics = {f"recall@{k}": _recall_at_k(gold, retrieved_keys, int(k)) for k in cfg.ks} + metrics = { + f"recall@{k}": _recall_at_k(gold, retrieved_keys, int(k), match_mode=str(cfg.match_mode)) for k in cfg.ks + } return df_query, gold, raw_hits, retrieved_keys, metrics @@ -347,8 +388,15 @@ def evaluate_recall( row = {"query_id": i, "query": q, "golden_answer": g, "top_retrieved": r[: cfg.top_k]} for k in cfg.ks: k = int(k) - row[f"hit@{k}"] = _is_hit(g, r, k) - row[f"rank@{k}"] = (r[: cfg.top_k].index(g) + 1) if (g in r[: cfg.top_k]) else None + row[f"hit@{k}"] = _is_hit(g, r, k, match_mode=str(cfg.match_mode)) + if str(cfg.match_mode) == "pdf_only": + top_docs = [_extract_doc_from_pdf_page(key) for key in r[: cfg.top_k]] + try: + row[f"rank@{k}"] = top_docs.index(_normalize_pdf_name(str(g))) + 1 + except ValueError: + row[f"rank@{k}"] = None + else: + row[f"rank@{k}"] = (r[: cfg.top_k].index(g) + 1) if (g in r[: cfg.top_k]) else None rows.append(row) results_df = pd.DataFrame(rows) diff --git a/nemo_retriever/tests/test_harness_config.py b/nemo_retriever/tests/test_harness_config.py index 0584007ed..87e4b57ef 100644 --- a/nemo_retriever/tests/test_harness_config.py +++ b/nemo_retriever/tests/test_harness_config.py @@ -108,3 +108,63 @@ def test_load_runs_config_parses_runs_list(tmp_path: Path) -> None: assert len(runs) == 2 assert runs[0]["name"] == "r1" assert runs[0]["overrides"]["gpu_embed"] == 0.25 + + +def test_load_harness_config_supports_recall_adapter_and_match_mode(tmp_path: Path) -> None: + dataset_dir = tmp_path / "dataset" + dataset_dir.mkdir() + query_csv = tmp_path / "query.csv" + query_csv.write_text("query,pdf,page\nq,doc,0\n", encoding="utf-8") + cfg_path = tmp_path / "test_configs.yaml" + cfg_path.write_text( + "\n".join( + [ + "active:", + " dataset: tiny", + " preset: base", + "presets:", + " base: {}", + "datasets:", + " tiny:", + f" path: {dataset_dir}", + f" query_csv: {query_csv}", + " recall_required: true", + " recall_adapter: page_plus_one", + " recall_match_mode: pdf_page", + ] + ), + encoding="utf-8", + ) + + cfg = load_harness_config(config_file=str(cfg_path)) + assert cfg.recall_adapter == "page_plus_one" + assert cfg.recall_match_mode == "pdf_page" + + +def test_load_harness_config_rejects_invalid_recall_adapter(tmp_path: Path) -> None: + dataset_dir = tmp_path / "dataset" + dataset_dir.mkdir() + query_csv = tmp_path / "query.csv" + query_csv.write_text("query,pdf_page\nq,doc_1\n", encoding="utf-8") + cfg_path = tmp_path / "test_configs.yaml" + cfg_path.write_text( + "\n".join( + [ + "active:", + " dataset: tiny", + " preset: base", + "presets:", + " base: {}", + "datasets:", + " tiny:", + f" path: {dataset_dir}", + f" query_csv: {query_csv}", + " recall_required: true", + " recall_adapter: unknown_adapter", + ] + ), + encoding="utf-8", + ) + + with pytest.raises(ValueError, match="recall_adapter must be one of"): + load_harness_config(config_file=str(cfg_path)) diff --git a/nemo_retriever/tests/test_harness_recall_adapters.py b/nemo_retriever/tests/test_harness_recall_adapters.py new file mode 100644 index 000000000..1387eb00b --- /dev/null +++ b/nemo_retriever/tests/test_harness_recall_adapters.py @@ -0,0 +1,36 @@ +from pathlib import Path + +import pytest + +from nemo_retriever.harness.recall_adapters import prepare_recall_query_file + + +def test_prepare_recall_query_file_none_adapter_returns_input(tmp_path: Path) -> None: + query_csv = tmp_path / "query.csv" + query_csv.write_text("query,pdf_page\nq,doc_1\n", encoding="utf-8") + + out = prepare_recall_query_file(query_csv=query_csv, recall_adapter="none", output_dir=tmp_path / "out") + assert out == query_csv + + +def test_prepare_recall_query_file_financebench_json(tmp_path: Path) -> None: + query_json = tmp_path / "financebench_train.json" + query_json.write_text( + '[{"question":"What is revenue?","contexts":[{"filename":"AAPL_2023.pdf"}]}]', + encoding="utf-8", + ) + + out = prepare_recall_query_file( + query_csv=query_json, recall_adapter="financebench_json", output_dir=tmp_path / "out" + ) + assert out.exists() + contents = out.read_text(encoding="utf-8") + assert "query,expected_pdf" in contents + assert "What is revenue?,AAPL_2023" in contents + + +def test_prepare_recall_query_file_rejects_unknown_adapter(tmp_path: Path) -> None: + query_csv = tmp_path / "query.csv" + query_csv.write_text("query,pdf_page\nq,doc_1\n", encoding="utf-8") + with pytest.raises(ValueError, match="Unknown recall adapter"): + prepare_recall_query_file(query_csv=query_csv, recall_adapter="bogus", output_dir=tmp_path / "out") diff --git a/nemo_retriever/tests/test_harness_run.py b/nemo_retriever/tests/test_harness_run.py index 31caa8a3d..05fec62b0 100644 --- a/nemo_retriever/tests/test_harness_run.py +++ b/nemo_retriever/tests/test_harness_run.py @@ -45,10 +45,13 @@ def test_build_command_uses_hidden_detection_file_by_default(tmp_path: Path) -> query_csv=str(query_csv), write_detection_file=False, ) - cmd, runtime_dir, detection_file = _build_command(cfg, tmp_path, run_id="r1") + cmd, runtime_dir, detection_file, effective_query_csv = _build_command(cfg, tmp_path, run_id="r1") assert "--detection-summary-file" in cmd + assert "--recall-match-mode" in cmd + assert "pdf_page" in cmd assert detection_file.parent == runtime_dir assert detection_file.name == ".detection_summary.json" + assert effective_query_csv == query_csv def test_build_command_uses_top_level_detection_file_when_enabled(tmp_path: Path) -> None: @@ -64,10 +67,35 @@ def test_build_command_uses_top_level_detection_file_when_enabled(tmp_path: Path query_csv=str(query_csv), write_detection_file=True, ) - cmd, runtime_dir, detection_file = _build_command(cfg, tmp_path, run_id="r1") + cmd, runtime_dir, detection_file, effective_query_csv = _build_command(cfg, tmp_path, run_id="r1") assert "--detection-summary-file" in cmd assert detection_file.parent == tmp_path assert detection_file.name == "detection_summary.json" + assert effective_query_csv == query_csv + + +def test_build_command_applies_page_plus_one_adapter(tmp_path: Path) -> None: + dataset_dir = tmp_path / "dataset" + dataset_dir.mkdir() + query_csv = tmp_path / "query.csv" + query_csv.write_text("query,pdf,page\nq,doc_name.pdf,0\n", encoding="utf-8") + + cfg = HarnessConfig( + dataset_dir=str(dataset_dir), + dataset_label="earnings", + preset="single_gpu", + query_csv=str(query_csv), + recall_adapter="page_plus_one", + ) + cmd, runtime_dir, _detection_file, effective_query_csv = _build_command(cfg, tmp_path, run_id="r1") + + assert effective_query_csv.parent == runtime_dir + assert effective_query_csv.name == "query_adapter.page_plus_one.csv" + assert "--query-csv" in cmd + assert str(effective_query_csv) in cmd + csv_contents = effective_query_csv.read_text(encoding="utf-8") + assert "query,pdf_page" in csv_contents + assert "q,doc_name_1" in csv_contents def test_run_entry_session_artifact_dir_uses_run_name(monkeypatch, tmp_path: Path) -> None: diff --git a/nemo_retriever/tests/test_recall_core.py b/nemo_retriever/tests/test_recall_core.py new file mode 100644 index 000000000..410caef24 --- /dev/null +++ b/nemo_retriever/tests/test_recall_core.py @@ -0,0 +1,21 @@ +import pandas as pd + +from nemo_retriever.recall.core import _is_hit, _normalize_query_df + + +def test_normalize_query_df_pdf_only_uses_expected_pdf() -> None: + df = pd.DataFrame({"query": ["q1"], "expected_pdf": ["Doc_A.pdf"]}) + out = _normalize_query_df(df, match_mode="pdf_only") + assert out["golden_answer"].tolist() == ["Doc_A"] + + +def test_normalize_query_df_pdf_page_uses_pdf_and_page() -> None: + df = pd.DataFrame({"query": ["q1"], "pdf": ["Doc_A.pdf"], "page": [2]}) + out = _normalize_query_df(df, match_mode="pdf_page") + assert out["golden_answer"].tolist() == ["Doc_A_2"] + + +def test_is_hit_pdf_only_matches_any_page_of_document() -> None: + retrieved = ["Doc_A_7", "Doc_B_1"] + assert _is_hit("Doc_A", retrieved, 1, match_mode="pdf_only") is True + assert _is_hit("Doc_B", retrieved, 1, match_mode="pdf_only") is False From a29df5d78deee70bc4dc843df7225eb1fcc4b93f Mon Sep 17 00:00:00 2001 From: Jacob Ioffe Date: Wed, 4 Mar 2026 21:03:41 +0000 Subject: [PATCH 2/3] Simplify recall matching and adapter dispatch - reuse recall-core hit checks from batch pipeline - use adapter registry for cleaner recall adapter routing - parameterize recall-mode tests to reduce duplication Signed-off-by: Jacob Ioffe Made-with: Cursor --- .../nemo_retriever/examples/batch_pipeline.py | 25 +----------- .../nemo_retriever/harness/recall_adapters.py | 15 ++++++-- .../src/nemo_retriever/recall/core.py | 9 ++++- nemo_retriever/tests/test_recall_core.py | 38 +++++++++++-------- 4 files changed, 44 insertions(+), 43 deletions(-) diff --git a/nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py b/nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py index 94cc81cc2..4e454fd20 100644 --- a/nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py +++ b/nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py @@ -27,7 +27,7 @@ from nemo_retriever.params import IngestorCreateParams from nemo_retriever.params import TextChunkParams from nemo_retriever.params import VdbUploadParams -from nemo_retriever.recall.core import RecallConfig, retrieve_and_score +from nemo_retriever.recall.core import RecallConfig, is_hit_at_k, retrieve_and_score app = typer.Typer() @@ -315,27 +315,6 @@ def _gold_to_doc_page(golden_key: str) -> tuple[str, str]: return doc, page -def _extract_doc_from_key(key: str) -> str: - s = str(key) - if "_" not in s: - return s - doc, _page = s.rsplit("_", 1) - return doc - - -def _is_hit_at_k(golden_key: str, retrieved_keys: list[str], k: int, *, match_mode: str) -> bool: - top = (retrieved_keys or [])[: int(k)] - if match_mode == "pdf_only": - gold_doc = str(golden_key).replace(".pdf", "") - top_docs = [_extract_doc_from_key(key) for key in top] - return gold_doc in top_docs - - doc, page = _gold_to_doc_page(golden_key) - specific_page = f"{doc}_{page}" - entire_document = f"{doc}_-1" - return (specific_page in top) or (entire_document in top) - - def _hit_key_and_distance(hit: dict) -> tuple[str | None, float | None]: try: res = json.loads(hit.get("metadata", "{}")) @@ -931,7 +910,7 @@ def main( scored_hits.append((key, dist)) top_keys = [k for (k, _d) in scored_hits] - hit = _is_hit_at_k(g, top_keys, cfg.top_k, match_mode=recall_match_mode) + hit = is_hit_at_k(g, top_keys, cfg.top_k, match_mode=recall_match_mode) if not no_recall_details: print(f"\nQuery {i}: {q}") diff --git a/nemo_retriever/src/nemo_retriever/harness/recall_adapters.py b/nemo_retriever/src/nemo_retriever/harness/recall_adapters.py index 8e11edcdd..6f018eb26 100644 --- a/nemo_retriever/src/nemo_retriever/harness/recall_adapters.py +++ b/nemo_retriever/src/nemo_retriever/harness/recall_adapters.py @@ -6,6 +6,7 @@ import json from pathlib import Path +from typing import Callable import pandas as pd @@ -70,6 +71,12 @@ def _adapt_financebench_json(query_json: Path, output_csv: Path) -> Path: return output_csv +_ADAPTER_HANDLERS: dict[str, tuple[Callable[[Path, Path], Path], str]] = { + "page_plus_one": (_adapt_page_plus_one, "query_adapter.page_plus_one.csv"), + "financebench_json": (_adapt_financebench_json, "query_adapter.financebench_json.csv"), +} + + def prepare_recall_query_file(*, query_csv: Path | None, recall_adapter: str, output_dir: Path) -> Path: output_dir.mkdir(parents=True, exist_ok=True) if query_csv is None: @@ -83,7 +90,9 @@ def prepare_recall_query_file(*, query_csv: Path | None, recall_adapter: str, ou if adapter == "none": return source - if adapter == "page_plus_one": - return _adapt_page_plus_one(source, output_dir / "query_adapter.page_plus_one.csv") + handler = _ADAPTER_HANDLERS.get(adapter) + if handler is None: + raise ValueError(f"Adapter '{adapter}' is valid but not implemented.") - return _adapt_financebench_json(source, output_dir / "query_adapter.financebench_json.csv") + adapter_fn, output_name = handler + return adapter_fn(source, output_dir / output_name) diff --git a/nemo_retriever/src/nemo_retriever/recall/core.py b/nemo_retriever/src/nemo_retriever/recall/core.py index 35c900240..2da3af48a 100644 --- a/nemo_retriever/src/nemo_retriever/recall/core.py +++ b/nemo_retriever/src/nemo_retriever/recall/core.py @@ -305,8 +305,13 @@ def _is_hit(golden_key: str, retrieved: List[str], k: int, *, match_mode: str) - return specific_page in top or entire_document in top +def is_hit_at_k(golden_key: str, retrieved: Sequence[str], k: int, *, match_mode: str) -> bool: + """Public wrapper for top-k hit checks across match modes.""" + return _is_hit(str(golden_key), list(retrieved), int(k), match_mode=str(match_mode)) + + def _recall_at_k(gold: List[str], retrieved: List[List[str]], k: int, *, match_mode: str) -> float: - hits = sum(_is_hit(g, r, k, match_mode=match_mode) for g, r in zip(gold, retrieved)) + hits = sum(is_hit_at_k(g, r, k, match_mode=match_mode) for g, r in zip(gold, retrieved)) return hits / max(1, len(gold)) @@ -388,7 +393,7 @@ def evaluate_recall( row = {"query_id": i, "query": q, "golden_answer": g, "top_retrieved": r[: cfg.top_k]} for k in cfg.ks: k = int(k) - row[f"hit@{k}"] = _is_hit(g, r, k, match_mode=str(cfg.match_mode)) + row[f"hit@{k}"] = is_hit_at_k(g, r, k, match_mode=str(cfg.match_mode)) if str(cfg.match_mode) == "pdf_only": top_docs = [_extract_doc_from_pdf_page(key) for key in r[: cfg.top_k]] try: diff --git a/nemo_retriever/tests/test_recall_core.py b/nemo_retriever/tests/test_recall_core.py index 410caef24..db365dd47 100644 --- a/nemo_retriever/tests/test_recall_core.py +++ b/nemo_retriever/tests/test_recall_core.py @@ -1,21 +1,29 @@ import pandas as pd +import pytest -from nemo_retriever.recall.core import _is_hit, _normalize_query_df +from nemo_retriever.recall.core import _normalize_query_df, is_hit_at_k -def test_normalize_query_df_pdf_only_uses_expected_pdf() -> None: - df = pd.DataFrame({"query": ["q1"], "expected_pdf": ["Doc_A.pdf"]}) - out = _normalize_query_df(df, match_mode="pdf_only") - assert out["golden_answer"].tolist() == ["Doc_A"] +@pytest.mark.parametrize( + "match_mode,df,expected", + [ + ("pdf_only", pd.DataFrame({"query": ["q1"], "expected_pdf": ["Doc_A.pdf"]}), ["Doc_A"]), + ("pdf_page", pd.DataFrame({"query": ["q1"], "pdf": ["Doc_A.pdf"], "page": [2]}), ["Doc_A_2"]), + ], +) +def test_normalize_query_df_modes(match_mode: str, df: pd.DataFrame, expected: list[str]) -> None: + out = _normalize_query_df(df, match_mode=match_mode) + assert out["golden_answer"].tolist() == expected -def test_normalize_query_df_pdf_page_uses_pdf_and_page() -> None: - df = pd.DataFrame({"query": ["q1"], "pdf": ["Doc_A.pdf"], "page": [2]}) - out = _normalize_query_df(df, match_mode="pdf_page") - assert out["golden_answer"].tolist() == ["Doc_A_2"] - - -def test_is_hit_pdf_only_matches_any_page_of_document() -> None: - retrieved = ["Doc_A_7", "Doc_B_1"] - assert _is_hit("Doc_A", retrieved, 1, match_mode="pdf_only") is True - assert _is_hit("Doc_B", retrieved, 1, match_mode="pdf_only") is False +@pytest.mark.parametrize( + "match_mode,golden,retrieved,k,expected", + [ + ("pdf_only", "Doc_A", ["Doc_A_7", "Doc_B_1"], 1, True), + ("pdf_only", "Doc_B", ["Doc_A_7", "Doc_B_1"], 1, False), + ("pdf_page", "Doc_A_2", ["Doc_A_2", "Doc_A_9"], 1, True), + ("pdf_page", "Doc_A_2", ["Doc_A_9", "Doc_A_-1"], 1, False), + ], +) +def test_is_hit_at_k_modes(match_mode: str, golden: str, retrieved: list[str], k: int, expected: bool) -> None: + assert is_hit_at_k(golden, retrieved, k, match_mode=match_mode) is expected From b43b4bc47d1f4feafa045eb3312179ae40a06702 Mon Sep 17 00:00:00 2001 From: Jacob Ioffe Date: Wed, 4 Mar 2026 21:19:42 +0000 Subject: [PATCH 3/3] Fix recall core import for CI test environments - move nv_ingest_api import into NIM embed helper - avoid module import failure when API package is absent Signed-off-by: Jacob Ioffe Made-with: Cursor --- nemo_retriever/src/nemo_retriever/recall/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nemo_retriever/src/nemo_retriever/recall/core.py b/nemo_retriever/src/nemo_retriever/recall/core.py index 2da3af48a..f5dbe1e68 100644 --- a/nemo_retriever/src/nemo_retriever/recall/core.py +++ b/nemo_retriever/src/nemo_retriever/recall/core.py @@ -16,8 +16,6 @@ import numpy as np import pandas as pd -from nv_ingest_api.util.nim import infer_microservice - @dataclass(frozen=True) class RecallConfig: @@ -141,6 +139,8 @@ def _embed_queries_nim( api_key: str, grpc: bool, ) -> List[List[float]]: + from nv_ingest_api.util.nim import infer_microservice + # `infer_microservice` returns a list of embeddings. embeddings = infer_microservice( queries,