Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion nemo_retriever/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ uv pip install -e ./nemo_retriever
uv run python nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py /path/to/pdfs
```

Pass the directory that contains your PDFs as the first argument (`input-dir`). For recall evaluation, the pipeline uses `bo767_query_gt.csv` in the current directory by default; override with `--query-csv <path>`. Recall is skipped if the query CSV file does not exist. By default, per-query details (query, gold, hits) are printed; use `--no-recall-details` to print only the missed-gold summary and recall metrics. To use an existing Ray cluster, pass `--ray-address auto`. If OCR fails with a missing `libcudart.so.13`, install the CUDA 13 runtime and set `LD_LIBRARY_PATH` as shown above.
Pass the directory that contains your PDFs as the first argument (`input-dir`). For recall evaluation, the pipeline uses `bo767_query_gt.csv` in the current directory by default; override with `--query-csv <path>`. For document-level recall, use `--recall-match-mode pdf_only` with `query,expected_pdf` data. Recall is skipped if the query file does not exist. By default, per-query details (query, gold, hits) are printed; use `--no-recall-details` to print only the missed-gold summary and recall metrics. To use an existing Ray cluster, pass `--ray-address auto`. If OCR fails with a missing `libcudart.so.13`, install the CUDA 13 runtime and set `LD_LIBRARY_PATH` as shown above.

For **HTML** or **text** ingestion, use `--input-type html` or `--input-type txt` with the same examples (e.g. `batch_pipeline.py <dir> --input-type html`). HTML files are converted to markdown via markitdown, then chunked with the same tokenizer as .txt. Staged CLI: `retriever html run --input-dir <dir>` writes `*.html_extraction.json`; then `retriever local stage5 run --input-dir <dir> --pattern "*.html_extraction.json"` and `retriever local stage6 run --input-dir <dir>`.

Expand All @@ -64,6 +64,11 @@ For **HTML** or **text** ingestion, use `--input-type html` or `--input-type txt
- CLI entrypoint is nested under `retriever harness`.
- First pass is LanceDB-only and enforces recall-required pass/fail by default.
- Single-run artifact directories default to `<dataset>_<timestamp>`.
- Dataset-specific recall adapters are supported via config:
- `recall_adapter: none` (default passthrough)
- `recall_adapter: page_plus_one` (convert zero-indexed `page` CSVs to `pdf_page`)
- `recall_adapter: financebench_json` (convert FinanceBench JSON to `query,expected_pdf`)
- `recall_match_mode: pdf_page|pdf_only` controls recall matching mode.

### Single run

Expand Down
6 changes: 6 additions & 0 deletions nemo_retriever/harness/HANDOFF.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ It captures what exists now, what was intentionally chosen, and what to iterate
- Stream parsing for ingest/throughput/recall metrics.
- `nemo_retriever/src/nemo_retriever/harness/artifacts.py`
- Artifact/session directory creation and session summary writing.
- `nemo_retriever/src/nemo_retriever/harness/recall_adapters.py`
- Dataset-specific query normalization adapters for recall inputs.
- `nemo_retriever/harness/test_configs.yaml`
- Active defaults, presets, dataset presets.
- `nemo_retriever/harness/nightly_config.yaml`
Expand All @@ -36,6 +38,10 @@ It captures what exists now, what was intentionally chosen, and what to iterate
- Two main presets are available:
- `single_gpu`
- `dgx_8gpu`
- Adapter-capable datasets:
- `earnings` uses `recall_adapter: page_plus_one` (`page` -> `pdf_page` conversion).
- `bo10k` wiring is included (adapter + mode), with recall disabled by default until query path is set.
- `financebench` wiring is included for `pdf_only` matching with `financebench_json` adapter, with recall disabled by default until query path is set.

## Current CLI Usage

Expand Down
9 changes: 9 additions & 0 deletions nemo_retriever/harness/nightly_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,12 @@ runs:
# - name: bo767_single
# dataset: bo767
# preset: single_gpu
# - name: earnings_single
# dataset: earnings
# preset: single_gpu
# - name: financebench_single
# dataset: financebench
# preset: single_gpu
# - name: bo10k_single
# dataset: bo10k
# preset: single_gpu
32 changes: 32 additions & 0 deletions nemo_retriever/harness/test_configs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ active:
query_csv: data/jp20_query_gt.csv
input_type: pdf
recall_required: true
recall_match_mode: pdf_page
recall_adapter: none
artifacts_dir: null
ray_address: null
lancedb_uri: lancedb
Expand Down Expand Up @@ -68,3 +70,33 @@ datasets:
query_csv: data/jp20_query_gt.csv
input_type: pdf
recall_required: true
recall_match_mode: pdf_page
recall_adapter: none

earnings:
path: /datasets/nv-ingest/earnings_consulting
query_csv: data/earnings_consulting_multimodal.csv
input_type: pdf
recall_required: true
recall_match_mode: pdf_page
recall_adapter: page_plus_one

# Wiring for bo10k-style CSVs with zero-based page numbers.
# Keep recall optional by default until a local query CSV path is configured.
bo10k:
path: /datasets/nv-ingest/bo10k
query_csv: null
input_type: pdf
recall_required: false
recall_match_mode: pdf_page
recall_adapter: page_plus_one

# FinanceBench is document-level recall. Provide a JSON ground-truth file path
# via query_csv when enabling recall for this dataset.
financebench:
path: /datasets/nv-ingest/financebench
query_csv: null
input_type: pdf
recall_required: false
recall_match_mode: pdf_only
recall_adapter: financebench_json
57 changes: 38 additions & 19 deletions nemo_retriever/src/nemo_retriever/examples/batch_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from nemo_retriever.params import IngestorCreateParams
from nemo_retriever.params import TextChunkParams
from nemo_retriever.params import VdbUploadParams
from nemo_retriever.recall.core import RecallConfig, retrieve_and_score
from nemo_retriever.recall.core import RecallConfig, is_hit_at_k, retrieve_and_score

app = typer.Typer()

Expand Down Expand Up @@ -315,14 +315,6 @@ def _gold_to_doc_page(golden_key: str) -> tuple[str, str]:
return doc, page


def _is_hit_at_k(golden_key: str, retrieved_keys: list[str], k: int) -> bool:
doc, page = _gold_to_doc_page(golden_key)
specific_page = f"{doc}_{page}"
entire_document = f"{doc}_-1"
top = (retrieved_keys or [])[: int(k)]
return (specific_page in top) or (entire_document in top)


def _hit_key_and_distance(hit: dict) -> tuple[str | None, float | None]:
try:
res = json.loads(hit.get("metadata", "{}"))
Expand Down Expand Up @@ -375,6 +367,11 @@ def main(
"(current directory). Recall is skipped if the file does not exist."
),
),
recall_match_mode: str = typer.Option(
"pdf_page",
"--recall-match-mode",
help="Recall match mode: 'pdf_page' or 'pdf_only'.",
),
no_recall_details: bool = typer.Option(
False,
"--no-recall-details",
Expand Down Expand Up @@ -589,6 +586,9 @@ def main(
) -> None:
log_handle, original_stdout, original_stderr = _configure_logging(log_file)
try:
if recall_match_mode not in {"pdf_page", "pdf_only"}:
raise ValueError(f"Unsupported --recall-match-mode: {recall_match_mode}")

os.environ["RAY_LOG_TO_DRIVER"] = "1" if ray_log_to_driver else "0"
# Use an absolute path so driver and Ray actors resolve the same LanceDB URI.
lancedb_uri = str(Path(lancedb_uri).expanduser().resolve())
Expand Down Expand Up @@ -617,7 +617,7 @@ def main(

input_dir = Path(input_dir)
if input_type == "txt":
glob_pattern = str(input_dir / "*.txt")
glob_pattern = str(input_dir / "**" / "*.txt") if input_dir.is_dir() else str(input_dir)
ingestor = create_ingestor(
run_mode="batch",
params=IngestorCreateParams(ray_address=ray_address, ray_log_to_driver=ray_log_to_driver),
Expand Down Expand Up @@ -648,7 +648,7 @@ def main(
)
)
elif input_type == "html":
glob_pattern = str(input_dir / "*.html")
glob_pattern = str(input_dir / "**" / "*.html") if input_dir.is_dir() else str(input_dir)
ingestor = create_ingestor(
run_mode="batch",
params=IngestorCreateParams(ray_address=ray_address, ray_log_to_driver=ray_log_to_driver),
Expand Down Expand Up @@ -680,7 +680,10 @@ def main(
)
elif input_type == "doc":
# DOCX/PPTX: same pipeline as PDF; DocToPdfConversionActor converts before split.
doc_globs = [str(input_dir / "*.docx"), str(input_dir / "*.pptx")]
if input_dir.is_dir():
doc_globs = [str(input_dir / "**" / "*.docx"), str(input_dir / "**" / "*.pptx")]
else:
doc_globs = [str(input_dir)]
ingestor = create_ingestor(
run_mode="batch",
params=IngestorCreateParams(ray_address=ray_address, ray_log_to_driver=ray_log_to_driver),
Expand Down Expand Up @@ -743,7 +746,7 @@ def main(
)
)
else:
pdf_glob = str(input_dir / "*.pdf")
pdf_glob = str(input_dir / "**" / "*.pdf") if input_dir.is_dir() else str(input_dir)
ingestor = create_ingestor(
run_mode="batch",
params=IngestorCreateParams(ray_address=ray_address, ray_log_to_driver=ray_log_to_driver),
Expand Down Expand Up @@ -875,6 +878,7 @@ def main(
top_k=10,
ks=(1, 5, 10),
hybrid=hybrid,
match_mode=recall_match_mode,
)

_df_query, _gold, _raw_hits, _retrieved_keys, metrics = retrieve_and_score(query_csv=query_csv, cfg=cfg)
Expand All @@ -894,7 +898,10 @@ def main(
_raw_hits,
)
):
doc, page = _gold_to_doc_page(g)
if recall_match_mode == "pdf_only":
doc, page = str(g), ""
else:
doc, page = _gold_to_doc_page(g)

scored_hits: list[tuple[str, float | None]] = []
for h in hits:
Expand All @@ -903,11 +910,14 @@ def main(
scored_hits.append((key, dist))

top_keys = [k for (k, _d) in scored_hits]
hit = _is_hit_at_k(g, top_keys, cfg.top_k)
hit = is_hit_at_k(g, top_keys, cfg.top_k, match_mode=recall_match_mode)

if not no_recall_details:
print(f"\nQuery {i}: {q}")
print(f" Gold: {g} (file: {doc}{ext}, page: {page})")
if recall_match_mode == "pdf_only":
print(f" Gold: {g} (file: {doc}{ext})")
else:
print(f" Gold: {g} (file: {doc}{ext}, page: {page})")
print(f" Hit@{cfg.top_k}: {hit}")
print(" Top hits:")
if not scored_hits:
Expand All @@ -920,15 +930,24 @@ def main(
print(f" {rank:02d}. {key} distance={dist:.6f}")

if not hit:
missed_gold.append((f"{doc}{ext}", str(page)))
if recall_match_mode == "pdf_only":
missed_gold.append((f"{doc}{ext}", ""))
else:
missed_gold.append((f"{doc}{ext}", str(page)))

missed_unique = sorted(set(missed_gold), key=lambda x: (x[0], x[1]))
print("\nMissed gold (unique doc/page):")
if recall_match_mode == "pdf_only":
print("\nMissed gold (unique docs):")
else:
print("\nMissed gold (unique doc/page):")
if not missed_unique:
print(" (none)")
else:
for doc_page, page in missed_unique:
print(f" {doc_page} page {page}")
if recall_match_mode == "pdf_only":
print(f" {doc_page}")
else:
print(f" {doc_page} page {page}")
print(f"\nTotal missed: {len(missed_unique)} / {len(_gold)}")

print("\nRecall metrics (matching nemo_retriever.recall.core):")
Expand Down
11 changes: 11 additions & 0 deletions nemo_retriever/src/nemo_retriever/harness/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
REPO_ROOT = NEMO_RETRIEVER_ROOT.parent
DEFAULT_TEST_CONFIG_PATH = NEMO_RETRIEVER_ROOT / "harness" / "test_configs.yaml"
DEFAULT_NIGHTLY_CONFIG_PATH = NEMO_RETRIEVER_ROOT / "harness" / "nightly_config.yaml"
VALID_RECALL_ADAPTERS = {"none", "page_plus_one", "financebench_json"}

TUNING_FIELDS = {
"pdf_extract_workers",
Expand Down Expand Up @@ -45,6 +46,8 @@ class HarnessConfig:
query_csv: str | None = None
input_type: str = "pdf"
recall_required: bool = True
recall_match_mode: str = "pdf_page"
recall_adapter: str = "none"

artifacts_dir: str | None = None
ray_address: str | None = None
Expand Down Expand Up @@ -86,6 +89,12 @@ def validate(self) -> list[str]:
if self.input_type not in {"pdf", "txt", "html", "doc"}:
errors.append(f"input_type must be one of pdf/txt/html/doc, got '{self.input_type}'")

if self.recall_match_mode not in {"pdf_page", "pdf_only"}:
errors.append("recall_match_mode must be one of pdf_page/pdf_only")

if self.recall_adapter not in VALID_RECALL_ADAPTERS:
errors.append(f"recall_adapter must be one of {sorted(VALID_RECALL_ADAPTERS)}")

for name in TUNING_FIELDS:
val = getattr(self, name)
if name.startswith("gpu_") and float(val) < 0.0:
Expand Down Expand Up @@ -143,6 +152,8 @@ def _apply_env_overrides(config_dict: dict[str, Any]) -> None:
"HARNESS_QUERY_CSV": ("query_csv", str),
"HARNESS_INPUT_TYPE": ("input_type", str),
"HARNESS_RECALL_REQUIRED": ("recall_required", _parse_bool),
"HARNESS_RECALL_MATCH_MODE": ("recall_match_mode", str),
"HARNESS_RECALL_ADAPTER": ("recall_adapter", str),
"HARNESS_ARTIFACTS_DIR": ("artifacts_dir", str),
"HARNESS_RAY_ADDRESS": ("ray_address", str),
"HARNESS_LANCEDB_URI": ("lancedb_uri", str),
Expand Down
98 changes: 98 additions & 0 deletions nemo_retriever/src/nemo_retriever/harness/recall_adapters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import json
from pathlib import Path
from typing import Callable

import pandas as pd

from nemo_retriever.harness.config import VALID_RECALL_ADAPTERS


def _normalize_pdf_name(value: object) -> str:
return str(value).replace(".pdf", "")


def _adapt_page_plus_one(query_csv: Path, output_csv: Path) -> Path:
df = pd.read_csv(query_csv)
if "gt_page" in df.columns and "page" not in df.columns:
df = df.rename(columns={"gt_page": "page"})

required = {"query", "pdf", "page"}
missing = required.difference(df.columns)
if missing:
raise ValueError(
"page_plus_one adapter requires ['query','pdf','page'] columns "
f"(missing: {sorted(missing)}) in {query_csv}"
)

page_numbers = pd.to_numeric(df["page"], errors="raise").astype(int) + 1
normalized = pd.DataFrame(
{
"query": df["query"].astype(str),
"pdf_page": [_normalize_pdf_name(pdf) + f"_{page}" for pdf, page in zip(df["pdf"], page_numbers)],
}
)
normalized.to_csv(output_csv, index=False)
return output_csv


def _adapt_financebench_json(query_json: Path, output_csv: Path) -> Path:
payload = json.loads(query_json.read_text(encoding="utf-8"))
if not isinstance(payload, list):
raise ValueError(f"financebench_json adapter expects a JSON list in {query_json}")

rows: list[dict[str, str]] = []
for item in payload:
if not isinstance(item, dict):
continue
question = item.get("question")
contexts = item.get("contexts")
if not isinstance(question, str) or not question.strip():
continue
if not isinstance(contexts, list) or not contexts:
continue
context0 = contexts[0]
if not isinstance(context0, dict):
continue
filename = context0.get("filename")
if not isinstance(filename, str) or not filename.strip():
continue
rows.append({"query": question, "expected_pdf": _normalize_pdf_name(filename)})

if not rows:
raise ValueError(f"financebench_json adapter found no valid rows in {query_json}")

pd.DataFrame(rows).to_csv(output_csv, index=False)
return output_csv


_ADAPTER_HANDLERS: dict[str, tuple[Callable[[Path, Path], Path], str]] = {
"page_plus_one": (_adapt_page_plus_one, "query_adapter.page_plus_one.csv"),
"financebench_json": (_adapt_financebench_json, "query_adapter.financebench_json.csv"),
}


def prepare_recall_query_file(*, query_csv: Path | None, recall_adapter: str, output_dir: Path) -> Path:
output_dir.mkdir(parents=True, exist_ok=True)
if query_csv is None:
return output_dir / "__query_csv_missing__.csv"

adapter = str(recall_adapter or "none").strip().lower()
if adapter not in VALID_RECALL_ADAPTERS:
raise ValueError(f"Unknown recall adapter '{recall_adapter}'. Valid adapters: {sorted(VALID_RECALL_ADAPTERS)}")

source = Path(query_csv)
if adapter == "none":
return source

handler = _ADAPTER_HANDLERS.get(adapter)
if handler is None:
raise ValueError(f"Adapter '{adapter}' is valid but not implemented.")

adapter_fn, output_name = handler
return adapter_fn(source, output_dir / output_name)
Loading
Loading