diff --git a/nemo_retriever/Dockerfile b/nemo_retriever/Dockerfile index e73077559..48250d1fc 100644 --- a/nemo_retriever/Dockerfile +++ b/nemo_retriever/Dockerfile @@ -89,7 +89,9 @@ COPY src src COPY api api COPY client client -# Use conda env's uv; create venv and install retriever in editable mode (path deps: ../src, ../api, ../client) +# Use conda env's uv; create venv and install retriever with uv pip install (no lock; path deps: ../src, ../api, ../client). +# INSTALL_ASR=1 installs the [asr] extra (transformers>=5, soundfile, scipy) for local Parakeet ASR; omit for vLLM/embedding-only images. +ARG INSTALL_ASR=0 SHELL ["/bin/bash", "-c"] RUN --mount=type=cache,target=/root/.cache/pip \ --mount=type=cache,target=/root/.cache/uv \ @@ -97,7 +99,7 @@ RUN --mount=type=cache,target=/root/.cache/pip \ && conda activate retriever_libcudart \ && uv venv .retriever \ && . .retriever/bin/activate \ - && uv pip install -e ./retriever + && if [ "$INSTALL_ASR" = "1" ]; then uv pip install -e "./retriever[asr]"; else uv pip install -e ./retriever; fi # Default: run in-process pipeline (help if no args) ENTRYPOINT ["/workspace/.retriever/bin/python", "-m", "retriever.examples.inprocess_pipeline"] diff --git a/nemo_retriever/README.md b/nemo_retriever/README.md index 6edd125e8..f5dc84e87 100644 --- a/nemo_retriever/README.md +++ b/nemo_retriever/README.md @@ -10,7 +10,7 @@ RAG ingestion pipeline for PDFs: extract structure (text, tables, charts, infogr ## Installation -Installation is done with **UV** from the **nv-ingest root**. UV manages the environment and dependencies; pip is not supported. +Installation is done with **UV** from the **nv-ingest root** using **uv pip install** (no lockfile/sync so optional extras stay independent). Pip is not supported. From the repo root: @@ -21,7 +21,17 @@ source .retriever/bin/activate uv pip install -e ./nemo_retriever ``` -This installs the retriever in editable mode and its in-repo dependencies. Core dependencies (see `nemo_retriever/pyproject.toml`) include Ray, pypdfium2, pandas, LanceDB, PyYAML, torch, transformers, and the Nemotron packages (page-elements, graphic-elements, table-structure). The retriever also depends on the sibling packages `nv-ingest`, `nv-ingest-api`, and `nv-ingest-client` in this repo. +This installs the retriever in editable mode and its in-repo dependencies. Core dependencies (see `nemo_retriever/pyproject.toml`) include Ray, pypdfium2, pandas, LanceDB, PyYAML, torch, transformers (4.x), vLLM 0.16, and the Nemotron packages (page-elements, graphic-elements, table-structure). The retriever also depends on the sibling packages `nv-ingest`, `nv-ingest-api`, and `nv-ingest-client` in this repo. + +### Optional: ASR extra (local Parakeet) + +For **local ASR** (nvidia/parakeet-ctc-1.1b with `audio_endpoints` unset), install the `[asr]` extra. This pulls in `transformers>=5`, `soundfile`, and `scipy` and is mutually exclusive with the default stack (vLLM 0.16 uses transformers<5): + +```bash +uv pip install -e "./nemo_retriever[asr]" +``` + +Docker: build with ASR support using `--build-arg INSTALL_ASR=1`. ### OCR and CUDA 13 runtime @@ -122,3 +132,65 @@ To stop and remove both stacks: docker compose -p ingest-gpu0 down docker compose -p ingest-gpu1 down ``` + +## Embedding backends + +Embeddings can be served by a **remote HTTP endpoint** (NIM, vLLM, or any OpenAI-compatible server) or by a **local HuggingFace model** when no endpoint is configured. + +- **Config**: Set `embedding_nim_endpoint` in `ingest-config.yaml` or stage config (e.g. `http://localhost:8000/v1`). Leave empty or null to use the local HF embedder. +- **CLI**: Use `--embed-invoke-url` (inprocess/batch pipelines) or `--embedding-endpoint` / `--embedding-http-endpoint` (recall CLI) to point at a remote server. + +### Using vLLM for embeddings + +You can serve an embedding model with [vLLM](https://docs.vllm.ai/) and point the retriever at it. vLLM exposes an OpenAI-compatible `/v1/embeddings` API. Set the embedding endpoint to the vLLM base URL (e.g. `http://localhost:8000/v1`). + +**vLLM compatibility**: The default NIM-style client sends `input_type` and `truncate` in the request body; some vLLM versions or configs may not accept these. When using a **vLLM** server, enable the vLLM-compatible payload: + +- **Ingest**: `--embed-use-vllm-compat` (inprocess pipeline) or set `embed_use_vllm_compat: true` in `EmbedParams`. +- **Recall**: `--embedding-use-vllm-compat` (recall CLI). + +This sends only `model`, `input`, and `encoding_format` (minimal OpenAI-compatible payload). + +### llama-nemotron-embed-1b-v2 with vLLM + +For **nvidia/llama-nemotron-embed-1b-v2**, follow the model’s official vLLM instructions: + +1. Use **vllm==0.11.0**. +2. Clone the [model repo](https://huggingface.co/nvidia/llama-nemotron-embed-1b-v2) and **overwrite `config.json` with `config_vllm.json`** from that repo. +3. Start the server (replace `` and ``): + + ```bash + vllm serve \ + \ + --trust-remote-code \ + --runner pooling \ + --model-impl vllm \ + --override-pooler-config '{"pooling_type": "MEAN"}' \ + --data-parallel-size \ + --dtype float32 \ + --port 8000 + ``` + +4. Set the retriever embedding endpoint to `http://localhost:8000/v1` and use `--embed-use-vllm-compat` / `--embedding-use-vllm-compat` as above. + +See the [model README](https://huggingface.co/nvidia/llama-nemotron-embed-1b-v2) for the canonical vLLM setup and client example. + +### Using vLLM offline batched inference + +You can run the same embedding model (e.g. llama-nemotron-embed-1b-v2) **without a vLLM server** by using vLLM’s Python API for batched inference. This loads the model in-process and runs `LLM.embed()` in batches. + +- **When to use**: No server to run; same model and behavior as vLLM server; good for batch ingest or recall in a single process. +- **Install**: vLLM is an optional dependency. Install with `pip install -e ".[vllm]"` or `uv pip install -e ".[vllm]"` (requires vllm>=0.11.0 for llama-nemotron-embed-1b-v2). +- **Model path**: You can pass a HuggingFace model id (e.g. `nvidia/llama-nemotron-embed-1b-v2`) or a **local path**. For llama-nemotron-embed-1b-v2, a local clone with `config.json` replaced by `config_vllm.json` (from the model repo) may be required for vLLM to load it correctly. +- **Ingest**: Set `embed_use_vllm_offline: true` in `EmbedParams` or use `--embed-use-vllm-offline` in the inprocess pipeline. Optionally set `embed_model_path` (or `--embed-model-path`) to a local model path. +- **Recall**: Use `--embedding-use-vllm-offline` (recall CLI). Optionally `--embedding-vllm-model-path` to override the model path. + +### Optional: vLLM attention extras (flash-attn, flashinfer-cubin, xformers) + +To add prebuilt attention-related packages without changing the project’s torch version, install the `[vllm-attention]` extra: + +```bash +uv pip install -e "./nemo_retriever[vllm-attention]" +``` + +This installs **flashinfer-cubin** (match version to flashinfer-python), **flash-attn** (prebuilt wheel for torch 2.9 + cu12, Linux py312), and **xformers** (0.0.33.x compatible with torch 2.9). It can reduce vLLM startup time (e.g. CUDA graph capture). The `flash-attn` wheel is sourced from GitHub releases; on other platforms you may need to install it separately. diff --git a/nemo_retriever/pyproject.toml b/nemo_retriever/pyproject.toml index 0e40fb5ad..c39eadbc3 100644 --- a/nemo_retriever/pyproject.toml +++ b/nemo_retriever/pyproject.toml @@ -44,10 +44,9 @@ dependencies = [ "numpy>=1.26.0", "debugpy>=1.8.0", "python-multipart>=0.0.9", - # transformers>=5 enables loading nvidia/parakeet-ctc-1.1b via pipeline (see - # parakeet-ctc-1.1b README). If using llama_nemotron_embed_1b_v2, verify - # compatibility with transformers 5 (it previously relied on HybridCache). - "transformers>=5.0.0", + # transformers 4.x for vLLM 0.16 and embedding models (e.g. llama_nemotron_embed_1b_v2). + # Versions 4.54.0-4.55.x have a flash attention bug; exclude that range. + "transformers>=4.49.0,<5.0.0,!=4.54.*,!=4.55.*", "tokenizers>=0.20.3", "accelerate>=1.1.0", "torch~=2.9.1", @@ -64,12 +63,23 @@ dependencies = [ "accelerate==1.12.0", "albumentations==2.0.8", "open-clip-torch==3.2.0", - # Local ASR (Parakeet): read chunk files and resample to 16 kHz mono - "soundfile>=0.12.0", - "scipy>=1.11.0", + "vllm==0.16.0", ] [project.optional-dependencies] +asr = [ + # Local ASR (Parakeet nvidia/parakeet-ctc-1.1b): transformers>=5 required by model. + "transformers>=5.0.0", + "soundfile>=0.12.0", + "scipy>=1.11.0", +] +# Optional: prebuilt FlashInfer cubins + flash-attn + xformers for vLLM (faster startup / attention). +# Keeps existing torch~=2.9.1; flash-attn uses a Linux py3.12 wheel from GitHub. +vllm-attention = [ + "flashinfer-cubin==0.6.3", + "flash-attn>=2.8.3", + "xformers>=0.0.33,<0.0.34", +] dev = [ "build>=1.2.2", "pytest>=8.0.2", @@ -89,8 +99,10 @@ nemotron-page-elements-v3 = { index = "test-pypi" } nemotron-graphic-elements-v1 = { index = "test-pypi" } nemotron-table-structure-v1 = { index = "test-pypi" } nemotron-ocr = { index = "test-pypi" } -torch = { index = "torch-cuda"} -torchvision = { index ="torch-cuda"} +torch = { index = "torch-cuda" } +torchvision = { index = "torch-cuda" } +# Prebuilt wheel for torch 2.9 + cu12 (Linux x86_64, py312). Avoids source build and keeps torch as-is. +flash-attn = { url = "https://github.com/Dao-AILab/flash-attention/releases/download/v2.8.3/flash_attn-2.8.3+cu12torch2.9cxx11abiTRUE-cp312-cp312-linux_x86_64.whl" } [[tool.uv.index]] name = "test-pypi" diff --git a/nemo_retriever/scripts/README.md b/nemo_retriever/scripts/README.md new file mode 100644 index 000000000..98159fe17 --- /dev/null +++ b/nemo_retriever/scripts/README.md @@ -0,0 +1,142 @@ +# vLLM vs baseline embedding comparison scripts + +Scripts for comparing **local HF (baseline)** and **vLLM offline** embedding on pre-embedded parquet: ingest time and recall. Use them to run a single comparison or a parameter sweep and plot results. + +**Assumptions:** Run from the **repository root** (parent of `nemo_retriever/`) with the nemo_retriever env (e.g. `uv run`). The machine (or Ray cluster) must have **at least one GPU** for the embed service. + +--- + +## 1. Single-run comparison (`vllm_embedding_comparison.py`) + +Compares baseline (HF) and vLLM offline on a pre-embed parquet dir: starts a long-lived embed service per backend, warms it up, runs a **timed** ingest (model load excluded), then recall. + +### Usage + +```bash +# From repo root; unset RAY_ADDRESS so we start our own Ray cluster +unset RAY_ADDRESS + +uv run python nemo_retriever/scripts/vllm_embedding_comparison.py compare-from-pre-embed \ + --pre-embed-dir /path/to/pre_embed_dir \ + --query-csv /path/to/query_gt.csv \ + --embed-model-path /path/to/llama-nemotron-embed-1b-v2/main +``` + +### Options (often used) + +| Option | Description | +|--------|-------------| +| `--max-rows N` | Use only N rows from pre-embed (faster; e.g. 1000). | +| `--gpu-memory-utilization 0.55` | vLLM GPU memory fraction (default 0.55). | +| `--enforce-eager` | vLLM: disable CUDA graphs (slower, avoids some env issues). | +| `--sort-key COL` | Sort by column before limit so baseline and vLLM see the same rows. | +| `--output-csv FILE` | Append one row of metrics to CSV for later plotting. | + +### Example (1K rows, with output for sweep) + +```bash +unset RAY_ADDRESS +uv run python nemo_retriever/scripts/vllm_embedding_comparison.py compare-from-pre-embed \ + --pre-embed-dir /path/to/bo767_pre_embed \ + --query-csv data/bo767_query_gt.csv \ + --embed-model-path /path/to/llama-nemotron-embed-1b-v2/main \ + --max-rows 1000 \ + --output-csv comparison_sweep.csv +``` + +--- + +## 2. Sweep: grid of runs (`run_comparison_sweep.py`) + +Runs `compare-from-pre-embed` over a grid of **gpu_memory_utilization** × **max_rows**, appending one CSV row per run. Useful for collecting data to plot ingest time vs scale and GPU util. + +### Grid (defaults) + +- **gpu_utils:** 0.4, 0.5, 0.6, 0.7, 0.8 +- **max_rows:** 1000, 2000, 5000, 10000 + +Override with `--gpu-utils 0.4,0.6,0.8` and `--max-rows-list 1000,5000`. To sweep **embed_batch_size** (e.g. 256, 512, 768) with an env that has **flashinfer-cubin**, see [embed_batch_size_sweep_setup.md](embed_batch_size_sweep_setup.md). + +### FlashInfer / flashinfer-cubin + +By default the sweep **requires** an environment with **flashinfer-cubin** (e.g. `uv sync --extra vllm-attention` in `nemo_retriever`). If it is not installed, the script exits with install instructions. Use `--no-require-flashinfer-cubin` to run anyway (e.g. to collect flashinfer_cubin=false rows). + +`flashinfer_cubin` is **detected at runtime** and written to the CSV. To get both true/false in one CSV: + +1. **With FlashInfer:** run the sweep with `flashinfer-cubin` installed (e.g. `[vllm-attention]` extra). +2. **Without FlashInfer:** `uv pip uninstall flashinfer-cubin`, then run the **same** command and **same** `--output-csv` (with `--no-require-flashinfer-cubin`) so rows are appended. + +### Usage + +```bash +unset RAY_ADDRESS + +uv run python nemo_retriever/scripts/run_comparison_sweep.py \ + --pre-embed-dir /path/to/pre_embed_dir \ + --query-csv /path/to/query_gt.csv \ + --embed-model-path /path/to/llama-nemotron-embed-1b-v2/main \ + --output-csv comparison_sweep.csv +``` + +### Tracking progress (log file) + +```bash +unset RAY_ADDRESS +uv run python nemo_retriever/scripts/run_comparison_sweep.py \ + --pre-embed-dir /path/to/pre_embed_dir \ + --query-csv /path/to/query_gt.csv \ + --embed-model-path /path/to/llama-nemotron-embed-1b-v2/main \ + --output-csv comparison_sweep.csv \ + 2>&1 | tee comparison_sweep.log +``` + +Then in another terminal: `tail -f comparison_sweep.log`. + +--- + +## 3. Plotting (`plot_comparison_sweep.py`) + +Reads the CSV produced by the sweep and writes figures (ingest time vs max_rows, ingest vs gpu_util, recall@10, optional FlashInfer impact). + +### Usage + +```bash +uv run python nemo_retriever/scripts/plot_comparison_sweep.py run \ + --input-csv comparison_sweep.csv \ + --output-dir ./comparison_plots +``` + +Requires **pandas** and **matplotlib** (usually already in the env). + +--- + +## 4. Optional: Ray GPU check (`check_ray_gpu.py`) + +Prints Ray cluster resources and actors that use or are waiting for GPU. Useful when the comparison is “stuck pending” (no GPU placement). + +- To see the comparison’s actors, the comparison must use a **persistent** Ray cluster (`ray start --head --num-gpus=1`) and you must set **RAY_ADDRESS** for both the comparison and this script. +- If the comparison is run **without** `--ray-address` (default), it uses `ray.init("local")` and no separate Ray server exists, so this script cannot attach. + +```bash +RAY_ADDRESS=127.0.0.1:6379 uv run python nemo_retriever/scripts/check_ray_gpu.py +``` + +--- + +## Running on other machines + +1. **Paths:** Replace `/path/to/pre_embed_dir`, `/path/to/query_gt.csv`, and `--embed-model-path` with paths valid on that machine. +2. **Environment:** Use the same Python env as for development (e.g. `uv run` from repo root, or activate the venv and run `python nemo_retriever/scripts/...`). +3. **RAY_ADDRESS:** Run `unset RAY_ADDRESS` before the comparison/sweep so the script starts its own Ray cluster; otherwise you may attach to another user’s cluster and see 0 GPUs. +4. **GPU:** The embed service needs 1 GPU. If Ray reports 0 GPUs, the script will error; ensure the node has a GPU and Ray can see it (e.g. `ray start --head --num-gpus=1` if using a persistent cluster). + +--- + +## Script summary + +| Script | Purpose | +|--------|--------| +| `vllm_embedding_comparison.py` | Single compare-from-pre-embed run (baseline + vLLM); optional `--output-csv`. | +| `run_comparison_sweep.py` | Sweep over gpu_util × max_rows; appends to `--output-csv`. | +| `plot_comparison_sweep.py` | Read sweep CSV, write figures to `--output-dir`. | +| `check_ray_gpu.py` | Print Ray GPU/resources and GPU actors (for debugging). | diff --git a/nemo_retriever/scripts/check_ray_gpu.py b/nemo_retriever/scripts/check_ray_gpu.py new file mode 100644 index 000000000..4c0d6e173 --- /dev/null +++ b/nemo_retriever/scripts/check_ray_gpu.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Print Ray cluster GPU (and CPU) usage and list actors that use or are waiting for GPU. + +Run in a separate terminal to see what is holding the GPU while the comparison (or +another Ray job) is running. + +To see the comparison's actors, the comparison must use a persistent Ray cluster +(started with `ray start --head` and optional `--num-gpus=1`), and you must set +RAY_ADDRESS when running both the comparison and this script (e.g. RAY_ADDRESS=127.0.0.1:6379). +If the comparison uses ray.init("local") in-process, no separate Ray server exists +and this script cannot attach to it. + +Usage (from repo root): + RAY_ADDRESS=127.0.0.1:6379 uv run python nemo_retriever/scripts/check_ray_gpu.py + # Or, if a cluster is already running and RAY_ADDRESS is set in the env: + uv run python nemo_retriever/scripts/check_ray_gpu.py +""" + +from __future__ import annotations + +import os +import sys + + +def main() -> None: + import ray + + address = os.environ.get("RAY_ADDRESS", "auto") + ray_addr_env = os.environ.get("RAY_ADDRESS") + print( + f"RAY_ADDRESS env: {repr(ray_addr_env) if ray_addr_env is not None else 'unset'} -> connecting with " + "address={address!r}" + ) + if not ray.is_initialized(): + try: + ray.init(address=address, ignore_reinit_error=True) + except Exception as e: + print(f"Failed to connect to Ray (address={address!r}): {e}", file=sys.stderr) + print("Set RAY_ADDRESS to the cluster (e.g. 127.0.0.1:6379) to see that cluster's actors.", file=sys.stderr) + print( + "If the comparison uses address='local' (no --ray-address), it starts an in-process cluster and this " + "script cannot attach.", + file=sys.stderr, + ) + sys.exit(1) + + cluster = ray.cluster_resources() + available = ray.available_resources() + gpu_total = cluster.get("GPU", 0) + gpu_avail = available.get("GPU", 0) + print("--- Ray resources ---") + print(f" cluster_resources: {cluster}") + print(f" available_resources: {available}") + print(f" GPU: total={gpu_total} available={gpu_avail} used={gpu_total - gpu_avail}") + print() + + try: + from ray.util.state import list_actors + except ImportError: + print("ray.util.state.list_actors not available (older Ray?). Run: ray status") + return + + actors = list_actors(detail=True, limit=500) + gpu_actors = [ + a for a in actors if getattr(a, "required_resources", None) and (a.required_resources or {}).get("GPU", 0) > 0 + ] + pending = [a for a in actors if getattr(a, "state", None) == "PENDING"] + pending_gpu = [ + a for a in pending if getattr(a, "required_resources", None) and (a.required_resources or {}).get("GPU", 0) > 0 + ] + + print("--- Actors requiring GPU ---") + if not gpu_actors: + print(" (none)") + else: + for a in gpu_actors: + name = getattr(a, "name", None) or "(no name)" + state = getattr(a, "state", "?") + res = getattr(a, "required_resources", None) or {} + pid = getattr(a, "pid", None) + print(f" name={name!r} state={state} required_resources={res} pid={pid}") + print() + + if pending_gpu: + print("--- PENDING actors waiting for GPU (likely cause of 'stuck') ---") + for a in pending_gpu: + name = getattr(a, "name", None) or "(no name)" + res = getattr(a, "required_resources", None) or {} + print(f" name={name!r} required_resources={res}") + print() + + print("For more detail: ray status and ray list actors") + return + + +if __name__ == "__main__": + main() diff --git a/nemo_retriever/scripts/plot_comparison_sweep.py b/nemo_retriever/scripts/plot_comparison_sweep.py new file mode 100644 index 000000000..43357bdd6 --- /dev/null +++ b/nemo_retriever/scripts/plot_comparison_sweep.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Read the CSV produced by run_comparison_sweep.py and generate figures for +ingest time vs scale/GPU util, recall comparison, and FlashInfer impact. + +Requires: pandas, matplotlib (pip install matplotlib if needed). + +Example (from repo root): + uv run python nemo_retriever/scripts/plot_comparison_sweep.py run \ + --input-csv comparison_sweep.csv \ + --output-dir ./comparison_plots +""" + +from __future__ import annotations + +from pathlib import Path + +import pandas as pd +import typer + +app = typer.Typer(help="Plot comparison sweep CSV (ingest time, recall, FlashInfer impact).") + + +def _check_matplotlib() -> None: + try: + import matplotlib # noqa: F401 + except ImportError: + raise ImportError("plot_comparison_sweep.py requires matplotlib. Install with: pip install matplotlib") + + +@app.command() +def run( + input_csv: Path = typer.Option(..., "--input-csv", path_type=Path), + output_dir: Path = typer.Option( + Path("comparison_plots"), + "--output-dir", + path_type=Path, + help="Directory to write PNG/PDF figures.", + ), +) -> None: + """Generate 2–4 figures from the sweep CSV.""" + _check_matplotlib() + import matplotlib.pyplot as plt + + input_csv = Path(input_csv) + output_dir = Path(output_dir) + if not input_csv.is_file(): + raise typer.BadParameter(f"Input CSV not found: {input_csv}") + output_dir.mkdir(parents=True, exist_ok=True) + + df = pd.read_csv(input_csv) + # Normalize column names (allow both with/without underscores) + df = df.rename(columns=lambda c: c.strip()) + required = [ + "gpu_memory_utilization", + "max_rows", + "baseline_ingest_s", + "vllm_ingest_s", + "baseline_recall_at_1", + "baseline_recall_at_5", + "baseline_recall_at_10", + "vllm_recall_at_1", + "vllm_recall_at_5", + "vllm_recall_at_10", + ] + missing = [c for c in required if c not in df.columns] + if missing: + raise typer.BadParameter(f"CSV missing columns: {missing}. Expected columns from run_comparison_sweep.") + + # Normalize flashinfer_cubin (CSV may have "True"/"False" strings) + if "flashinfer_cubin" in df.columns: + df["flashinfer_cubin"] = df["flashinfer_cubin"].map( + lambda x: str(x).lower() in ("true", "1", "yes") if pd.notna(x) else False + ) + has_flashinfer = "flashinfer_cubin" in df.columns and df["flashinfer_cubin"].nunique() > 1 + + # For figures 1–3 use one row per (max_rows, gpu_util) to avoid duplicate lines when both flashinfer states present + df_main = df.drop_duplicates(subset=["max_rows", "gpu_memory_utilization"], keep="first") + + # 1) Ingest time vs max_rows (baseline vs vLLM); one baseline series, vLLM by gpu_util + fig, ax = plt.subplots(figsize=(8, 5)) + base = df_main.groupby("max_rows", as_index=False).agg({"baseline_ingest_s": "first"}).sort_values("max_rows") + ax.plot(base["max_rows"], base["baseline_ingest_s"], "o-", label="Baseline (HF)", linewidth=2, alpha=0.9) + for gpu in sorted(df_main["gpu_memory_utilization"].unique()): + sub = df_main[df_main["gpu_memory_utilization"] == gpu].sort_values("max_rows") + if sub.empty: + continue + ax.plot( + sub["max_rows"], + sub["vllm_ingest_s"], + "s--", + label=f"vLLM (gpu_util={gpu})", + alpha=0.8, + ) + ax.set_xlabel("max_rows") + ax.set_ylabel("Ingest time (s)") + ax.set_title("Ingest time vs benchmark scale (max_rows)") + ax.legend(loc="best", fontsize=8) + ax.grid(True, alpha=0.3) + fig.tight_layout() + for ext in ("png", "pdf"): + out = output_dir / f"ingest_vs_max_rows.{ext}" + fig.savefig(out, dpi=150 if ext == "png" else None) + typer.echo(f"Wrote {out}") + plt.close(fig) + + # 2) Ingest time vs gpu_memory_utilization (baseline flat per max_rows, vLLM by max_rows) + fig, ax = plt.subplots(figsize=(8, 5)) + for max_r in sorted(df_main["max_rows"].unique()): + sub = df_main[df_main["max_rows"] == max_r].sort_values("gpu_memory_utilization") + if sub.empty: + continue + ax.plot( + sub["gpu_memory_utilization"], + sub["baseline_ingest_s"], + "o-", + label=f"Baseline (max_rows={max_r})", + alpha=0.8, + ) + ax.plot( + sub["gpu_memory_utilization"], + sub["vllm_ingest_s"], + "s--", + label=f"vLLM (max_rows={max_r})", + alpha=0.8, + ) + ax.set_xlabel("gpu_memory_utilization") + ax.set_ylabel("Ingest time (s)") + ax.set_title("Ingest time vs vLLM GPU utilization") + ax.legend(loc="best", fontsize=8) + ax.grid(True, alpha=0.3) + fig.tight_layout() + for ext in ("png", "pdf"): + out = output_dir / f"ingest_vs_gpu_util.{ext}" + fig.savefig(out, dpi=150 if ext == "png" else None) + typer.echo(f"Wrote {out}") + plt.close(fig) + + # 3) Recall comparison: recall@10 vs max_rows (baseline vs vLLM) + fig, ax = plt.subplots(figsize=(8, 5)) + for gpu in sorted(df_main["gpu_memory_utilization"].unique())[:5]: + s = df_main[df_main["gpu_memory_utilization"] == gpu].sort_values("max_rows") + if s.empty: + continue + ax.plot(s["max_rows"], s["baseline_recall_at_10"], "o-", label=f"Baseline (gpu_util={gpu})", alpha=0.8) + ax.plot(s["max_rows"], s["vllm_recall_at_10"], "s--", label=f"vLLM (gpu_util={gpu})", alpha=0.8) + ax.set_xlabel("max_rows") + ax.set_ylabel("recall@10") + ax.set_title("Recall@10 vs benchmark scale") + ax.legend(loc="best", fontsize=8) + ax.grid(True, alpha=0.3) + fig.tight_layout() + for ext in ("png", "pdf"): + out = output_dir / f"recall10_vs_max_rows.{ext}" + fig.savefig(out, dpi=150 if ext == "png" else None) + typer.echo(f"Wrote {out}") + plt.close(fig) + + # 4) FlashInfer impact (if we have both true/false) + if has_flashinfer: + fig, axes = plt.subplots(1, 2, figsize=(12, 5)) + for idx, (cubin_val, label) in enumerate([(True, "flashinfer_cubin=True"), (False, "flashinfer_cubin=False")]): + sub = df[df["flashinfer_cubin"] == cubin_val].sort_values(["max_rows", "gpu_memory_utilization"]) + if sub.empty: + continue + ax = axes[idx] + for gpu in sorted(sub["gpu_memory_utilization"].unique()): + s = sub[sub["gpu_memory_utilization"] == gpu].sort_values("max_rows") + ax.plot(s["max_rows"], s["baseline_ingest_s"], "o-", label=f"Baseline (gpu={gpu})", alpha=0.8) + ax.plot(s["max_rows"], s["vllm_ingest_s"], "s--", label=f"vLLM (gpu={gpu})", alpha=0.8) + ax.set_xlabel("max_rows") + ax.set_ylabel("Ingest time (s)") + ax.set_title(label) + ax.legend(loc="best", fontsize=7) + ax.grid(True, alpha=0.3) + fig.tight_layout() + for ext in ("png", "pdf"): + out = output_dir / f"flashinfer_impact.{ext}" + fig.savefig(out, dpi=150 if ext == "png" else None) + typer.echo(f"Wrote {out}") + plt.close(fig) + else: + typer.echo("No flashinfer_cubin variation in CSV; skipping FlashInfer impact figure.") + + typer.echo(f"Done. Figures in {output_dir}") + + +if __name__ == "__main__": + app() diff --git a/nemo_retriever/scripts/run_comparison_sweep.py b/nemo_retriever/scripts/run_comparison_sweep.py new file mode 100644 index 000000000..b732291a6 --- /dev/null +++ b/nemo_retriever/scripts/run_comparison_sweep.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Run compare-from-pre-embed over a parameter grid (gpu_util x max_rows x embed_batch_size) +and append each run's metrics to a CSV (and optionally JSONL) for later plotting. + +Uses the long-lived embedder service (EmbedServiceActor); sweep --embed-batch-sizes +to see if 5K batch-size behavior holds with the service (e.g. 256,512,768). + +FlashInfer: not a CLI flag. Run this script twice — once with flashinfer-cubin +installed, once after uninstalling it — and append to the same --output-csv so +the file contains both flashinfer_cubin=true and false rows. + +Example (sweep embed batch size at 5K only): + uv run python nemo_retriever/scripts/run_comparison_sweep.py \\ + --pre-embed-dir /path/to/bo767_pre_embed \\ + --query-csv data/bo767_query_gt.csv \\ + --embed-model-path /path/to/llama-nemotron-embed-1b-v2/main \\ + --output-csv comparison_sweep.csv \\ + --gpu-utils 0.7 --max-rows-list 5000 --embed-batch-sizes 256,512,768 + +Example (default grid: gpu_util x max_rows, single batch size 256): + uv run python nemo_retriever/scripts/run_comparison_sweep.py \\ + --pre-embed-dir /path/to/bo767_pre_embed --query-csv data/bo767_query_gt.csv \\ + --embed-model-path /path/to/model --output-csv comparison_sweep.csv +""" + +from __future__ import annotations + +import os +import sys +from pathlib import Path + +# Allow importing vllm_embedding_comparison from this directory (nemo_retriever/scripts) +_script_dir = Path(__file__).resolve().parent +if str(_script_dir) not in sys.path: + sys.path.insert(0, str(_script_dir)) + +import typer +from vllm_embedding_comparison import ( + _append_comparison_row_csv, + _append_comparison_row_json, + _detect_flashinfer_cubin, + run_compare_from_pre_embed, +) + +app = typer.Typer(help="Sweep compare-from-pre-embed over gpu_util and max_rows; append to CSV.") + +# Defaults from plan +DEFAULT_GPU_UTILS = [0.4, 0.5, 0.6, 0.7, 0.8] +DEFAULT_MAX_ROWS = [1000, 2000, 5000, 10000] + + +def _parse_float_list(s: str) -> list[float]: + return [float(x.strip()) for x in s.split(",") if x.strip()] + + +def _parse_int_list(s: str) -> list[int]: + return [int(x.strip()) for x in s.split(",") if x.strip()] + + +@app.callback(invoke_without_command=True) +def run_sweep( + pre_embed_dir: Path = typer.Option(..., "--pre-embed-dir", path_type=Path), + query_csv: Path = typer.Option(..., "--query-csv", path_type=Path), + embed_model_path: Path | None = typer.Option(None, "--embed-model-path", path_type=Path), + output_csv: Path = typer.Option(..., "--output-csv", path_type=Path), + output_json: Path | None = typer.Option(None, "--output-json", path_type=Path), + lancedb_uri: str = typer.Option("lancedb", "--lancedb-uri"), + embed_model_name: str = typer.Option("nvidia/llama-3.2-nv-embedqa-1b-v2", "--embed-model-name"), + gpu_utils: str = typer.Option( + ",".join(str(x) for x in DEFAULT_GPU_UTILS), + "--gpu-utils", + help="Comma-separated gpu_memory_utilization values (e.g. 0.4,0.5,0.6,0.7,0.8).", + ), + max_rows_list: str = typer.Option( + ",".join(str(x) for x in DEFAULT_MAX_ROWS), + "--max-rows-list", + help="Comma-separated max_rows values (e.g. 1000,2000,5000,10000).", + ), + embed_batch_sizes: str = typer.Option( + "256", + "--embed-batch-sizes", + help="Comma-separated embed_batch_size values (e.g. 256,512,768). Sweep to compare long-lived embedder service " + "at different batch sizes.", + ), + sort_key_column: str | None = typer.Option(None, "--sort-key"), + ray_address: str | None = typer.Option(None, "--ray-address"), + require_flashinfer_cubin: bool = typer.Option( + True, + "--require-flashinfer-cubin/--no-require-flashinfer-cubin", + help="Exit with instructions if flashinfer-cubin is not installed (use env with .[vllm-attention]).", + ), +) -> None: + """Run grid of compare-from-pre-embed; append one row per run to output_csv (and optional output_json).""" + pre_embed_dir = Path(pre_embed_dir) + query_csv = Path(query_csv) + output_csv = Path(output_csv) + if not pre_embed_dir.is_dir(): + raise typer.BadParameter(f"Pre-embed directory not found: {pre_embed_dir}") + if not query_csv.is_file(): + raise typer.BadParameter(f"Query CSV not found: {query_csv}") + + os.environ.pop("RAY_ADDRESS", None) + + has_cubin = _detect_flashinfer_cubin() + if require_flashinfer_cubin and not has_cubin: + typer.echo( + "flashinfer-cubin is not installed. Use an env with the vllm-attention extra, e.g.:", + err=True, + ) + typer.echo(" cd nemo_retriever && uv sync --extra vllm-attention", err=True) + typer.echo(" uv pip install -e '.[vllm-attention]' # or from repo root", err=True) + raise typer.Exit(1) + + gpu_util_values = _parse_float_list(gpu_utils) + max_rows_values = _parse_int_list(max_rows_list) + embed_batch_size_values = _parse_int_list(embed_batch_sizes) + total = len(gpu_util_values) * len(max_rows_values) * len(embed_batch_size_values) + typer.echo(f"flashinfer_cubin={has_cubin} (detected at start)") + typer.echo( + f"Grid: {len(gpu_util_values)} gpu_utils x {len(max_rows_values)} max_rows x {len(embed_batch_size_values)} " + "embed_batch_sizes = {total} runs" + ) + typer.echo(f"Output CSV: {output_csv}") + + run_id = 0 + for gpu_util in gpu_util_values: + for max_rows in max_rows_values: + for embed_batch_size in embed_batch_size_values: + run_id += 1 + typer.echo( + f"[{run_id}/{total}] gpu_memory_utilization={gpu_util}, max_rows={max_rows}, " + f"embed_batch_size={embed_batch_size}" + ) + try: + row = run_compare_from_pre_embed( + pre_embed_dir=pre_embed_dir, + query_csv=query_csv, + lancedb_uri=lancedb_uri, + ray_address=ray_address, + embed_model_path=str(embed_model_path) if embed_model_path else None, + embed_model_name=embed_model_name, + max_rows=max_rows, + gpu_memory_utilization=gpu_util, + enforce_eager=False, + compile_cache_dir=None, + embed_batch_size=embed_batch_size, + sort_key_column=sort_key_column, + ) + _append_comparison_row_csv(row, output_csv) + if output_json is not None: + _append_comparison_row_json(row, Path(output_json)) + except Exception as e: + typer.echo(f"Run failed: {e}", err=True) + raise + + typer.echo(f"Wrote {total} rows to {output_csv}") + + +if __name__ == "__main__": + app() diff --git a/nemo_retriever/scripts/vllm_embedding_comparison.py b/nemo_retriever/scripts/vllm_embedding_comparison.py new file mode 100644 index 000000000..37fc13d9a --- /dev/null +++ b/nemo_retriever/scripts/vllm_embedding_comparison.py @@ -0,0 +1,1084 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +jd5 recall barometer: ingest PDFs, run recall, report metrics. + +Use this script to ensure the vLLM embedding path does not veer from the +current implementation. Run once with local HF embeddings (baseline), once +with vLLM endpoint + --embed-use-vllm-compat, and/or once with vLLM offline +(--embed-use-vllm-offline), then compare recall@1/@5/@10. + +Query CSV format: must contain columns "query" and either "pdf_page" or +"pdf" + "page". Use "pdf" = PDF basename (with or without .pdf); page should +match the convention used when writing to LanceDB (e.g. 1-based). + +Example (baseline, local HF): + uv run python nemo_retriever/scripts/jd5_recall_barometer.py run \\ + --pdf-dir /raid/charlesb/datasets/jd5/data \\ + --query-csv /raid/charlesb/datasets/jd5/jd5_query_gt.csv + +Example (vLLM server; requires vllm==0.11.0 and model config_vllm.json for +llama-nemotron-embed-1b-v2 — see model README): + uv run python nemo_retriever/scripts/jd5_recall_barometer.py run \\ + --pdf-dir /raid/charlesb/datasets/jd5/data \\ + --query-csv /raid/charlesb/datasets/jd5/jd5_query_gt.csv \\ + --embed-invoke-url http://localhost:8000/v1 \\ + --embed-use-vllm-compat + +Example (vLLM offline, no server): + uv run python nemo_retriever/scripts/jd5_recall_barometer.py run \\ + --pdf-dir /raid/charlesb/datasets/jd5/data \\ + --query-csv /raid/charlesb/datasets/jd5/jd5_query_gt.csv \\ + --embed-use-vllm-offline + +Example (compare baseline vs vLLM server): + uv run python nemo_retriever/scripts/jd5_recall_barometer.py compare \\ + --pdf-dir /raid/charlesb/datasets/jd5/data \\ + --query-csv /raid/charlesb/datasets/jd5/jd5_query_gt.csv \\ + --embed-invoke-url http://localhost:8000/v1 + +Example (three-way compare: baseline, vLLM server, vLLM offline): + uv run python nemo_retriever/scripts/jd5_recall_barometer.py compare \\ + --pdf-dir /raid/charlesb/datasets/jd5/data \\ + --query-csv /raid/charlesb/datasets/jd5/jd5_query_gt.csv \\ + --embed-invoke-url http://localhost:8000/v1 \\ + --embed-use-vllm-offline +""" + +from __future__ import annotations + +import csv +import json +import os +import sys +import time +from pathlib import Path + +# Allow running from repo root or nemo_retriever root (package lives at .../src/nemo_retriever) +if __name__ == "__main__": + _root = Path(__file__).resolve().parents[1] + if _root.name == "scripts": + _root = _root.parent + _src = _root / "src" + if _src.exists() and str(_src) not in sys.path: + sys.path.insert(0, str(_src)) + +import typer +from nemo_retriever import create_ingestor +from nemo_retriever.ingest_modes.batch import EmbedServiceActor +from nemo_retriever.params import ( + BatchTuningParams, + EmbedParams, + ExtractParams, + IngestorCreateParams, + IngestExecuteParams, + ModelRuntimeParams, + VdbUploadParams, +) +from nemo_retriever.recall.core import RecallConfig, retrieve_and_score + +app = typer.Typer(help="jd5 recall barometer: ingest PDFs, run recall, report recall@1/@5/@10.") + +JD5_PDF_DIR = "/raid/charlesb/datasets/jd5/data" +JD5_QUERY_CSV = "/raid/charlesb/datasets/jd5/jd5_query_gt.csv" +LANCEDB_URI = "lancedb" +TABLE_BASELINE = "nv-ingest-baseline" +TABLE_VLLM = "nv-ingest-vllm" +TABLE_VLLM_OFFLINE = "nv-ingest-vllm-offline" + + +def _run_ingest_and_recall( + pdf_dir: Path, + query_csv: Path, + lancedb_uri: str, + table_name: str, + embed_invoke_url: str | None, + embed_use_vllm_compat: bool, + embed_use_vllm_offline: bool, + embed_model_name: str, + embed_model_path: str | None = None, + run_mode: str = "inprocess", + ray_address: str | None = None, +) -> dict: + """Run ingest (inprocess or batch) then recall; return metrics.""" + pdf_dir = Path(pdf_dir) + query_csv = Path(query_csv) + if not pdf_dir.is_dir(): + raise typer.BadParameter(f"PDF directory not found: {pdf_dir}") + if not query_csv.is_file(): + raise typer.BadParameter(f"Query CSV not found: {query_csv}") + + glob_pattern = str(pdf_dir / "*.pdf") + create_kwargs: dict = {"run_mode": run_mode} + if run_mode == "batch": + create_kwargs["params"] = IngestorCreateParams(ray_address=ray_address) + ingestor = create_ingestor(**create_kwargs) + ingestor = ( + ingestor.files(glob_pattern) + .extract( + ExtractParams( + method="pdfium", + extract_text=True, + extract_tables=True, + extract_charts=True, + extract_infographics=False, + ) + ) + .embed( + EmbedParams( + model_name=embed_model_name, + embed_invoke_url=embed_invoke_url, + embed_use_vllm_compat=embed_use_vllm_compat, + embed_use_vllm_offline=embed_use_vllm_offline, + embed_model_path=embed_model_path, + ) + ) + .vdb_upload( + VdbUploadParams( + lancedb={ + "lancedb_uri": lancedb_uri, + "table_name": table_name, + "overwrite": True, + "create_index": True, + } + ) + ) + ) + start = time.perf_counter() + ingestor.ingest( + params=IngestExecuteParams(parallel=True, max_workers=8, show_progress=True), + ) + ingest_elapsed = time.perf_counter() - start + + cfg = RecallConfig( + lancedb_uri=lancedb_uri, + lancedb_table=table_name, + embedding_model=embed_model_name, + embedding_http_endpoint=embed_invoke_url, + embedding_use_vllm_compat=embed_use_vllm_compat, + embedding_use_vllm_offline=embed_use_vllm_offline, + embedding_vllm_model_path=embed_model_path, + top_k=10, + ks=(1, 5, 10), + ) + _df, _gold, _raw, _keys, metrics = retrieve_and_score( + query_csv=query_csv, + cfg=cfg, + vector_column_name="vector", + ) + return { + "metrics": metrics, + "ingest_elapsed_s": ingest_elapsed, + "n_queries": len(_df), + } + + +@app.command("run") +def run( + pdf_dir: Path = typer.Option( + JD5_PDF_DIR, + "--pdf-dir", + path_type=Path, + help="Directory containing PDFs to ingest.", + ), + query_csv: Path = typer.Option( + JD5_QUERY_CSV, + "--query-csv", + path_type=Path, + help="Query ground-truth CSV (columns: query, pdf_page OR query, pdf, page).", + ), + lancedb_uri: str = typer.Option(LANCEDB_URI, "--lancedb-uri", help="LanceDB URI (directory path)."), + table_name: str = typer.Option( + "nv-ingest", + "--table-name", + help="LanceDB table name.", + ), + run_mode: str = typer.Option( + "inprocess", + "--run-mode", + help="Ingest run mode: inprocess or batch (batch uses Ray Data).", + ), + ray_address: str | None = typer.Option(None, "--ray-address", help="Ray cluster address for batch (e.g. auto)."), + embed_invoke_url: str | None = typer.Option( + None, + "--embed-invoke-url", + help="Embedding endpoint (e.g. http://localhost:8000/v1). Omit for local HF.", + ), + embed_use_vllm_compat: bool = typer.Option( + False, + "--embed-use-vllm-compat/--no-embed-use-vllm-compat", + help="Use vLLM-compatible HTTP payload. Set when endpoint is a vLLM server.", + ), + embed_use_vllm_offline: bool = typer.Option( + False, + "--embed-use-vllm-offline/--no-embed-use-vllm-offline", + help="Use vLLM offline Python API (no server).", + ), + embed_model_path: str | None = typer.Option( + None, + "--embed-model-path", + help="Local path to model for vLLM offline (optional; else uses --embed-model-name).", + ), + embed_model_name: str = typer.Option( + "nvidia/llama-3.2-nv-embedqa-1b-v2", + "--embed-model-name", + help="Embedding model name.", + ), +) -> None: + """Run ingest + recall once; print recall@1, @5, @10.""" + if run_mode not in ("inprocess", "batch"): + raise typer.BadParameter("run_mode must be 'inprocess' or 'batch'") + result = _run_ingest_and_recall( + pdf_dir=pdf_dir, + query_csv=query_csv, + lancedb_uri=lancedb_uri, + table_name=table_name, + embed_invoke_url=embed_invoke_url, + embed_use_vllm_compat=embed_use_vllm_compat, + embed_use_vllm_offline=embed_use_vllm_offline, + embed_model_name=embed_model_name, + embed_model_path=embed_model_path, + run_mode=run_mode, + ray_address=ray_address, + ) + m = result["metrics"] + print(f"Queries: {result['n_queries']}") + print(f"Ingest (s): {result['ingest_elapsed_s']:.2f}") + print(f"recall@1: {m.get('recall@1', 0):.4f}") + print(f"recall@5: {m.get('recall@5', 0):.4f}") + print(f"recall@10: {m.get('recall@10', 0):.4f}") + + +@app.command("check-server") +def check_server( + embed_invoke_url: str = typer.Argument( + ..., + help="vLLM embedding base URL (e.g. http://localhost:8000/v1).", + ), +) -> None: + """Probe the embedding server; print the first model id or an error.""" + from nemo_retriever.text_embed.vllm_http import get_model_id_from_server + + model_id = get_model_id_from_server(embed_invoke_url) + if model_id: + typer.echo(f"OK: server reachable, model id: {model_id}") + else: + typer.echo( + "Could not reach server or no models listed. " + f"Ensure vLLM is running (e.g. vllm serve --runner pooling ...) and try: {embed_invoke_url}/models", + err=True, + ) + raise typer.Exit(1) + + +@app.command("compare") +def compare( + pdf_dir: Path = typer.Option( + JD5_PDF_DIR, + "--pdf-dir", + path_type=Path, + help="Directory containing PDFs to ingest.", + ), + query_csv: Path = typer.Option( + JD5_QUERY_CSV, + "--query-csv", + path_type=Path, + help="Query ground-truth CSV.", + ), + lancedb_uri: str = typer.Option(LANCEDB_URI, "--lancedb-uri", help="LanceDB URI."), + run_mode: str = typer.Option( + "inprocess", + "--run-mode", + help="Ingest run mode: inprocess or batch (batch uses Ray Data).", + ), + ray_address: str | None = typer.Option(None, "--ray-address", help="Ray cluster address for batch (e.g. auto)."), + embed_invoke_url: str | None = typer.Option( + None, + "--embed-invoke-url", + help="vLLM embedding endpoint (e.g. http://localhost:8000/v1). Include to run vLLM server path.", + ), + embed_use_vllm_offline: bool = typer.Option( + False, + "--embed-use-vllm-offline/--no-embed-use-vllm-offline", + help="Include vLLM offline path in comparison.", + ), + embed_model_path: str | None = typer.Option( + None, + "--embed-model-path", + help="Local path to model for vLLM offline (optional).", + ), + embed_model_name: str = typer.Option( + "nvidia/llama-3.2-nv-embedqa-1b-v2", + "--embed-model-name", + help="Embedding model name.", + ), +) -> None: + """Run baseline (local HF); optionally vLLM server and/or vLLM offline; print metrics side-by-side.""" + if not embed_invoke_url and not embed_use_vllm_offline: + typer.echo( + "For compare, set at least one of: --embed-invoke-url (vLLM server) or --embed-use-vllm-offline.", + err=True, + ) + raise typer.Exit(1) + if run_mode not in ("inprocess", "batch"): + raise typer.BadParameter("run_mode must be 'inprocess' or 'batch'") + pdf_dir = Path(pdf_dir) + query_csv = Path(query_csv) + + vllm_model = embed_model_name + if embed_invoke_url: + from nemo_retriever.text_embed.vllm_http import get_model_id_from_server + + discovered = get_model_id_from_server(embed_invoke_url) + if discovered: + typer.echo(f"Using vLLM model id from server: {discovered}") + vllm_model = discovered + else: + typer.echo("Warning: could not discover model from server, using --embed-model-name.", err=True) + + _run = lambda **kw: _run_ingest_and_recall( # noqa: E731 + pdf_dir=pdf_dir, + query_csv=query_csv, + lancedb_uri=lancedb_uri, + run_mode=run_mode, + ray_address=ray_address, + **kw, + ) + typer.echo("Running baseline (local HF)...") + baseline = _run( + table_name=TABLE_BASELINE, + embed_invoke_url=None, + embed_use_vllm_compat=False, + embed_use_vllm_offline=False, + embed_model_name=embed_model_name, + embed_model_path=None, + ) + results = [("Baseline (HF)", baseline)] + + if embed_invoke_url: + typer.echo("Running vLLM server...") + vllm = _run( + table_name=TABLE_VLLM, + embed_invoke_url=embed_invoke_url, + embed_use_vllm_compat=True, + embed_use_vllm_offline=False, + embed_model_name=vllm_model, + embed_model_path=None, + ) + results.append(("vLLM server", vllm)) + + if embed_use_vllm_offline: + typer.echo("Running vLLM offline...") + vllm_off = _run( + table_name=TABLE_VLLM_OFFLINE, + embed_invoke_url=None, + embed_use_vllm_compat=False, + embed_use_vllm_offline=True, + embed_model_name=embed_model_name, + embed_model_path=embed_model_path, + ) + results.append(("vLLM offline", vllm_off)) + + # Print side-by-side table + names = [r[0] for r in results] + col_w = max(16, max(len(n) for n in names)) + header = f"{'Metric':<12} " + " ".join(f"{n:<{col_w}}" for n in names) + print("\n--- Comparison ---") + print(header) + print("-" * len(header)) + for k in ("recall@1", "recall@5", "recall@10"): + row = f"{k:<12} " + " ".join(f"{r[1]['metrics'].get(k, 0):<{col_w}.4f}" for r in results) + print(row) + ingest_row = "Ingest (s) " + " ".join(f"{r[1]['ingest_elapsed_s']:.2f}".ljust(col_w) for r in results) + print(f"\n{ingest_row}") + + +def _run_recall_only( + query_csv: Path, + lancedb_uri: str, + table_name: str, + embed_model_name: str, + embed_invoke_url: str | None = None, + embed_use_vllm_offline: bool = False, + embed_model_path: str | None = None, +) -> dict: + """Run recall only (no ingest); return metrics.""" + cfg = RecallConfig( + lancedb_uri=lancedb_uri, + lancedb_table=table_name, + embedding_model=embed_model_name, + embedding_http_endpoint=embed_invoke_url, + embedding_use_vllm_compat=False, + embedding_use_vllm_offline=embed_use_vllm_offline, + embedding_vllm_model_path=embed_model_path, + top_k=10, + ks=(1, 5, 10), + ) + _df, _gold, _raw, _keys, metrics = retrieve_and_score( + query_csv=query_csv, + cfg=cfg, + vector_column_name="vector", + ) + return {"metrics": metrics, "n_queries": len(_df)} + + +@app.command("save-pre-embed") +def save_pre_embed( + pdf_dir: Path = typer.Option( + JD5_PDF_DIR, + "--pdf-dir", + path_type=Path, + help="Directory containing PDFs to ingest.", + ), + pre_embed_dir: Path = typer.Option( + ..., + "--pre-embed-dir", + path_type=Path, + help="Output directory for pre-embed parquet (post-explode, pre-embed).", + ), + run_mode: str = typer.Option( + "batch", + "--run-mode", + help="Run mode (batch only for save-pre-embed).", + ), + ray_address: str | None = typer.Option(None, "--ray-address", help="Ray cluster address (e.g. auto)."), + embed_model_name: str = typer.Option( + "nvidia/llama-3.2-nv-embedqa-1b-v2", + "--embed-model-name", + help="Embedding model name (used for modality; no model loaded).", + ), +) -> None: + """Run pipeline through explode and write pre-embed parquet. No embed model loaded.""" + if run_mode != "batch": + raise typer.BadParameter("save-pre-embed requires --run-mode batch") + pdf_dir = Path(pdf_dir) + if not pdf_dir.is_dir(): + raise typer.BadParameter(f"PDF directory not found: {pdf_dir}") + + glob_pattern = str(pdf_dir / "*.pdf") + create_kwargs: dict = {"run_mode": "batch"} + if ray_address is not None: + create_kwargs["params"] = IngestorCreateParams(ray_address=ray_address) + ingestor = create_ingestor(**create_kwargs) + ingestor = ( + ingestor.files(glob_pattern) + .extract( + ExtractParams( + method="pdfium", + extract_text=True, + extract_tables=True, + extract_charts=True, + extract_infographics=False, + ) + ) + .prepare_for_embed(EmbedParams(model_name=embed_model_name)) + .save_intermediate_results(str(pre_embed_dir)) + ) + typer.echo(f"Wrote pre-embed parquet to {pre_embed_dir}") + + +def _detect_flashinfer_cubin() -> bool: + """Return True if flashinfer_cubin is importable (prebuilt FlashInfer kernels).""" + try: + import flashinfer_cubin # noqa: F401 + + return True + except ImportError: + return False + + +# CSV/JSON output column order for sweep and single-run logging +_COMPARISON_ROW_KEYS = [ + "gpu_memory_utilization", + "max_rows", + "embed_batch_size", + "enforce_eager", + "flashinfer_cubin", + "baseline_recall_at_1", + "baseline_recall_at_5", + "baseline_recall_at_10", + "baseline_ingest_s", + "vllm_recall_at_1", + "vllm_recall_at_5", + "vllm_recall_at_10", + "vllm_ingest_s", +] + + +def run_compare_from_pre_embed( + *, + pre_embed_dir: Path, + query_csv: Path, + lancedb_uri: str, + ray_address: str | None, + embed_model_path: str | None, + embed_model_name: str, + max_rows: int | None, + gpu_memory_utilization: float, + enforce_eager: bool, + compile_cache_dir: Path | None, + embed_batch_size: int = 256, + sort_key_column: str | None = None, +) -> dict: + """Run baseline + vLLM offline from pre-embed parquet; return one row of metrics as a dict. + + Used by compare-from-pre-embed (CLI) and by the sweep script. Does not print; + caller may append the returned dict to --output-csv / --output-json. + + Ray is started first. A long-lived EmbedServiceActor is created per backend (baseline, + then vLLM), warmed up with one batch, then the timed ingest pipeline calls the service + so model load is excluded from the timed run. The service is torn down after each backend. + """ + import ray + import ray.data as rd + + flashinfer_cubin = _detect_flashinfer_cubin() + create_kwargs: dict = {"run_mode": "batch"} + if ray_address is not None: + create_kwargs["params"] = IngestorCreateParams(ray_address=ray_address) + + # Start Ray cluster first so init time is not included in ingest timing. + # We use only the CLI --ray-address (not RAY_ADDRESS env); None => "local" so we start our own cluster. + ray_addr = ray_address or "local" + if not ray.is_initialized(): + ray.init(address=ray_addr, ignore_reinit_error=True, log_to_driver=True) + print( + f"Ray address: {ray_addr} (RAY_ADDRESS env is {'set' if os.environ.get('RAY_ADDRESS') else 'unset'})", + flush=True, + ) + _ = ray.cluster_resources() + # EmbedServiceActor requires num_gpus=1; without a GPU the actor stays pending forever. + available = ray.available_resources() + cluster = ray.cluster_resources() + num_gpus = available.get("GPU", 0) + print(f"Ray cluster_resources={cluster} available_resources={available} (embed service needs 1 GPU)", flush=True) + if num_gpus < 1: + raise RuntimeError( + "Embed service requires at least 1 GPU. Ray reports 0 GPUs available " + f"(cluster_resources={cluster}, available_resources={available}). " + "Ensure the cluster has a GPU and Ray can see it " + "(e.g. run with CUDA_VISIBLE_DEVICES set, or use a Ray cluster started with --num-gpus=1)." + ) + + def _limit(ds: "rd.Dataset", n: int, sort_key: str | None) -> "rd.Dataset": + if sort_key is not None: + ds = ds.sort(sort_key) + return ds.limit(n) + + def _warmup_batch(pre_embed_dir: Path, sort_key: str | None): + ds = rd.read_parquet(str(pre_embed_dir)) + ds = _limit(ds, 1, sort_key) + return next(iter(ds.iter_batches(batch_format="pandas", batch_size=1))) + + # Baseline: long-lived service -> warmup -> timed ingest -> teardown + try: + ray.get_actor("embed_service_baseline") + ray.kill(ray.get_actor("embed_service_baseline")) + except ValueError: + pass + batch_tuning = BatchTuningParams(embed_batch_size=embed_batch_size) + baseline_params = EmbedParams( + model_name=embed_model_name, + embed_use_vllm_offline=False, + embed_model_path=None, + batch_tuning=batch_tuning, + ) + service_baseline = EmbedServiceActor.options(name="embed_service_baseline").remote(baseline_params) + warmup_df = _warmup_batch(pre_embed_dir, sort_key_column) + _warmup_timeout_s = 300 + try: + ray.get(service_baseline.embed_batch.remote(warmup_df), timeout=_warmup_timeout_s) + except ray.exceptions.GetTimeoutError: + ray.kill(service_baseline) + raise RuntimeError( + f"Baseline embed service warmup did not complete within {_warmup_timeout_s}s. " + "Actor may be stuck pending (no GPU placement). Check ray.available_resources() and cluster GPU config." + ) from None + ds_baseline = rd.read_parquet(str(pre_embed_dir)) + if max_rows is not None: + ds_baseline = _limit(ds_baseline, max_rows, sort_key_column) + ingestor_b = create_ingestor(**create_kwargs) + ingestor_b._rd_dataset = ds_baseline + ingestor_b._input_documents = [] + t0 = time.perf_counter() + ingestor_b.embed_only( + EmbedParams( + model_name=embed_model_name, + embed_use_vllm_offline=False, + embed_model_path=None, + batch_tuning=batch_tuning, + ), + embedding_service_name="embed_service_baseline", + ).vdb_upload( + VdbUploadParams( + lancedb={ + "lancedb_uri": lancedb_uri, + "table_name": TABLE_BASELINE, + "overwrite": True, + "create_index": True, + } + ) + ).ingest( + params=IngestExecuteParams(parallel=True, max_workers=8, show_progress=True) + ) + baseline_ingest_s = time.perf_counter() - t0 + ray.kill(service_baseline) + baseline_recall = _run_recall_only( + query_csv=query_csv, + lancedb_uri=lancedb_uri, + table_name=TABLE_BASELINE, + embed_model_name=embed_model_name, + embed_use_vllm_offline=False, + ) + + # vLLM offline: long-lived service -> warmup -> timed ingest -> teardown + try: + ray.get_actor("embed_service_vllm") + ray.kill(ray.get_actor("embed_service_vllm")) + except ValueError: + pass + vllm_runtime = ModelRuntimeParams( + gpu_memory_utilization=gpu_memory_utilization, + enforce_eager=enforce_eager, + compile_cache_dir=str(compile_cache_dir) if compile_cache_dir else None, + ) + vllm_params = EmbedParams( + model_name=embed_model_name, + embed_use_vllm_offline=True, + embed_model_path=embed_model_path, + runtime=vllm_runtime, + batch_tuning=batch_tuning, + ) + service_vllm = EmbedServiceActor.options(name="embed_service_vllm").remote(vllm_params) + try: + ray.get(service_vllm.embed_batch.remote(warmup_df), timeout=_warmup_timeout_s) + except ray.exceptions.GetTimeoutError: + ray.kill(service_vllm) + raise RuntimeError( + f"vLLM embed service warmup did not complete within {_warmup_timeout_s}s. " + "Actor may be stuck pending (no GPU placement). Check ray.available_resources() and cluster GPU config." + ) from None + ds_vllm = rd.read_parquet(str(pre_embed_dir)) + if max_rows is not None: + ds_vllm = _limit(ds_vllm, max_rows, sort_key_column) + ingestor_v = create_ingestor(**create_kwargs) + ingestor_v._rd_dataset = ds_vllm + ingestor_v._input_documents = [] + t0 = time.perf_counter() + ingestor_v.embed_only( + EmbedParams( + model_name=embed_model_name, + embed_use_vllm_offline=True, + embed_model_path=embed_model_path, + runtime=vllm_runtime, + batch_tuning=batch_tuning, + ), + embedding_service_name="embed_service_vllm", + ).vdb_upload( + VdbUploadParams( + lancedb={ + "lancedb_uri": lancedb_uri, + "table_name": TABLE_VLLM_OFFLINE, + "overwrite": True, + "create_index": True, + } + ) + ).ingest( + params=IngestExecuteParams(parallel=True, max_workers=8, show_progress=True) + ) + vllm_ingest_s = time.perf_counter() - t0 + ray.kill(service_vllm) + vllm_recall = _run_recall_only( + query_csv=query_csv, + lancedb_uri=lancedb_uri, + table_name=TABLE_VLLM_OFFLINE, + embed_model_name=embed_model_name, + embed_use_vllm_offline=True, + embed_model_path=embed_model_path, + ) + + return { + "gpu_memory_utilization": gpu_memory_utilization, + "max_rows": max_rows, + "embed_batch_size": embed_batch_size, + "enforce_eager": enforce_eager, + "flashinfer_cubin": flashinfer_cubin, + "baseline_recall_at_1": baseline_recall["metrics"].get("recall@1"), + "baseline_recall_at_5": baseline_recall["metrics"].get("recall@5"), + "baseline_recall_at_10": baseline_recall["metrics"].get("recall@10"), + "baseline_ingest_s": baseline_ingest_s, + "vllm_recall_at_1": vllm_recall["metrics"].get("recall@1"), + "vllm_recall_at_5": vllm_recall["metrics"].get("recall@5"), + "vllm_recall_at_10": vllm_recall["metrics"].get("recall@10"), + "vllm_ingest_s": vllm_ingest_s, + } + + +def _append_comparison_row_csv(row: dict, path: Path) -> None: + """Append one row (with keys in _COMPARISON_ROW_KEYS) to a CSV file; create with header if missing.""" + path = Path(path) + file_exists = path.exists() + with path.open("a", newline="") as f: + writer = csv.DictWriter(f, fieldnames=_COMPARISON_ROW_KEYS, extrasaction="ignore") + if not file_exists: + writer.writeheader() + writer.writerow({k: row.get(k) for k in _COMPARISON_ROW_KEYS}) + + +def _append_comparison_row_json(row: dict, path: Path) -> None: + """Append one row as a JSON line to a JSONL file.""" + with path.open("a") as f: + f.write(json.dumps({k: row.get(k) for k in _COMPARISON_ROW_KEYS}) + "\n") + + +@app.command("compare-from-pre-embed") +def compare_from_pre_embed( + pre_embed_dir: Path = typer.Option( + ..., + "--pre-embed-dir", + path_type=Path, + help="Directory containing pre-embed parquet (from save-pre-embed).", + ), + query_csv: Path = typer.Option( + JD5_QUERY_CSV, + "--query-csv", + path_type=Path, + help="Query ground-truth CSV.", + ), + lancedb_uri: str = typer.Option(LANCEDB_URI, "--lancedb-uri", help="LanceDB URI."), + ray_address: str | None = typer.Option(None, "--ray-address", help="Ray cluster address for batch (e.g. auto)."), + embed_model_path: str | None = typer.Option( + None, + "--embed-model-path", + help="Local path to model for vLLM offline (optional).", + ), + embed_model_name: str = typer.Option( + "nvidia/llama-3.2-nv-embedqa-1b-v2", + "--embed-model-name", + help="Embedding model name.", + ), + max_rows: int | None = typer.Option( + None, + "--max-rows", + help="If set, use only this many rows from pre-embed for both baseline and vLLM (faster iteration).", + ), + gpu_memory_utilization: float = typer.Option( + 0.55, + "--gpu-memory-utilization", + help="vLLM: fraction of GPU memory for KV cache (default 0.55). Increase (e.g. 0.6–0.7) for throughput.", + ), + enforce_eager: bool = typer.Option( + False, + "--enforce-eager/--no-enforce-eager", + help="vLLM: if set, disable CUDA graphs/torch.compile (slower but avoids noexec/compile issues).", + ), + compile_cache_dir: str | None = typer.Option( + None, + "--compile-cache-dir", + path_type=Path, + help="vLLM: dir for torch inductor/Triton cache when enforce_eager=False (must be on non-noexec fs).", + ), + output_csv: Path | None = typer.Option( + None, + "--output-csv", + path_type=Path, + help="If set, append one row of metrics (params + recall + ingest s) to this CSV for sweep/plotting.", + ), + output_json: Path | None = typer.Option( + None, + "--output-json", + path_type=Path, + help="If set, append one row as a JSON line (JSONL) to this file.", + ), + sort_key_column: str | None = typer.Option( + None, + "--sort-key", + help="If set, sort pre-embed dataset by this column before limit (deterministic same rows for baseline and " + "vLLM).", + ), + embed_batch_size: int = typer.Option( + 256, + "--embed-batch-size", + help="Ray Data batch size for embed stage (rows per batch sent to long-lived embedder service). " + "Sweep 256,512,768 to compare.", + ), +) -> None: + """Load pre-embed parquet, run baseline + vLLM offline embed+vdb, then recall; print comparison. + + For faster iteration use --max-rows (e.g. with bo767 pre-embed at + /raid/charlesb/datasets/bo767_pre_embed) to compare on a subset. + Use --output-csv to append one row for data collection sweeps. + """ + pre_embed_dir = Path(pre_embed_dir) + query_csv = Path(query_csv) + if not pre_embed_dir.is_dir(): + raise typer.BadParameter(f"Pre-embed directory not found: {pre_embed_dir}") + if not query_csv.is_file(): + raise typer.BadParameter(f"Query CSV not found: {query_csv}") + + if max_rows is not None: + typer.echo(f"Using up to {max_rows} rows (--max-rows={max_rows})") + typer.echo(f"Embed batch size: {embed_batch_size}") + if sort_key_column is not None: + typer.echo(f"Sorting by {sort_key_column!r} before limit for deterministic row set.") + + typer.echo("Running baseline (local HF) then vLLM offline from pre-embed...") + row = run_compare_from_pre_embed( + pre_embed_dir=pre_embed_dir, + query_csv=query_csv, + lancedb_uri=lancedb_uri, + ray_address=ray_address, + embed_model_path=embed_model_path, + embed_model_name=embed_model_name, + max_rows=max_rows, + gpu_memory_utilization=gpu_memory_utilization, + enforce_eager=enforce_eager, + compile_cache_dir=Path(compile_cache_dir) if compile_cache_dir else None, + embed_batch_size=embed_batch_size, + sort_key_column=sort_key_column, + ) + + # Build results for human-readable table from returned row + results = [ + ( + "Baseline (HF)", + { + "metrics": { + "recall@1": row["baseline_recall_at_1"], + "recall@5": row["baseline_recall_at_5"], + "recall@10": row["baseline_recall_at_10"], + }, + "ingest_elapsed_s": row["baseline_ingest_s"], + }, + ), + ( + "vLLM offline", + { + "metrics": { + "recall@1": row["vllm_recall_at_1"], + "recall@5": row["vllm_recall_at_5"], + "recall@10": row["vllm_recall_at_10"], + }, + "ingest_elapsed_s": row["vllm_ingest_s"], + }, + ), + ] + names = [r[0] for r in results] + col_w = max(16, max(len(n) for n in names)) + header = f"{'Metric':<12} " + " ".join(f"{n:<{col_w}}" for n in names) + print("\n--- Comparison (from pre-embed) ---") + print(header) + print("-" * len(header)) + for k in ("recall@1", "recall@5", "recall@10"): + row_line = f"{k:<12} " + " ".join(f"{r[1]['metrics'].get(k, 0):<{col_w}.4f}" for r in results) + print(row_line) + ingest_row = "Ingest (s) " + " ".join(f"{r[1]['ingest_elapsed_s']:.2f}".ljust(col_w) for r in results) + print(f"\n{ingest_row}") + + if output_csv is not None: + _append_comparison_row_csv(row, Path(output_csv)) + typer.echo(f"Appended row to {output_csv}") + if output_json is not None: + _append_comparison_row_json(row, Path(output_json)) + typer.echo(f"Appended row to {output_json}") + + +def _metrics_equal(a: dict, b: dict, tol: float = 1e-5) -> bool: + """Return True if recall@1/5/10 in a and b are within tol.""" + for k in ("recall@1", "recall@5", "recall@10"): + va = a.get(k) + vb = b.get(k) + if va is None and vb is None: + continue + if va is None or vb is None: + return False + if abs(float(va) - float(vb)) > tol: + return False + return True + + +@app.command("validate-recall-parity") +def validate_recall_parity( + pdf_dir: Path = typer.Option( + JD5_PDF_DIR, + "--pdf-dir", + path_type=Path, + help="Directory containing PDFs.", + ), + query_csv: Path = typer.Option( + JD5_QUERY_CSV, + "--query-csv", + path_type=Path, + help="Query ground-truth CSV.", + ), + lancedb_uri: str = typer.Option(LANCEDB_URI, "--lancedb-uri", help="LanceDB URI."), + pre_embed_dir: Path = typer.Option( + ..., + "--pre-embed-dir", + path_type=Path, + help="Directory to write/read pre-embed parquet (will be overwritten).", + ), + ray_address: str | None = typer.Option(None, "--ray-address", help="Ray cluster address for batch (e.g. auto)."), + embed_model_path: str | None = typer.Option( + None, + "--embed-model-path", + help="Local path to model for vLLM offline (optional).", + ), + embed_model_name: str = typer.Option( + "nvidia/llama-3.2-nv-embedqa-1b-v2", + "--embed-model-name", + help="Embedding model name.", + ), + tolerance: float = typer.Option( + 1e-5, + "--tolerance", + help="Max allowed difference in recall metrics between full and pre-embed paths.", + ), +) -> None: + """Run full compare, then save-pre-embed + compare-from-pre-embed; exit non-zero if recall differs.""" + pdf_dir = Path(pdf_dir) + query_csv = Path(query_csv) + pre_embed_dir = Path(pre_embed_dir) + if not pdf_dir.is_dir(): + raise typer.BadParameter(f"PDF directory not found: {pdf_dir}") + if not query_csv.is_file(): + raise typer.BadParameter(f"Query CSV not found: {query_csv}") + + typer.echo("Step 1: Full compare (baseline + vLLM offline)...") + full_baseline = _run_ingest_and_recall( + pdf_dir=pdf_dir, + query_csv=query_csv, + lancedb_uri=lancedb_uri, + table_name=TABLE_BASELINE, + embed_invoke_url=None, + embed_use_vllm_compat=False, + embed_use_vllm_offline=False, + embed_model_name=embed_model_name, + embed_model_path=None, + run_mode="batch", + ray_address=ray_address, + ) + full_vllm = _run_ingest_and_recall( + pdf_dir=pdf_dir, + query_csv=query_csv, + lancedb_uri=lancedb_uri, + table_name=TABLE_VLLM_OFFLINE, + embed_invoke_url=None, + embed_use_vllm_compat=False, + embed_use_vllm_offline=True, + embed_model_name=embed_model_name, + embed_model_path=embed_model_path, + run_mode="batch", + ray_address=ray_address, + ) + + typer.echo("Step 2: Save pre-embed parquet...") + pre_embed_dir.mkdir(parents=True, exist_ok=True) + create_kwargs: dict = {"run_mode": "batch"} + if ray_address is not None: + create_kwargs["params"] = IngestorCreateParams(ray_address=ray_address) + ingestor = create_ingestor(**create_kwargs) + ingestor = ( + ingestor.files(str(pdf_dir / "*.pdf")) + .extract( + ExtractParams( + method="pdfium", + extract_text=True, + extract_tables=True, + extract_charts=True, + extract_infographics=False, + ) + ) + .prepare_for_embed(EmbedParams(model_name=embed_model_name)) + .save_intermediate_results(str(pre_embed_dir)) + ) + + typer.echo("Step 3: Compare from pre-embed...") + import ray.data as rd + + create_kwargs2: dict = {"run_mode": "batch"} + if ray_address is not None: + create_kwargs2["params"] = IngestorCreateParams(ray_address=ray_address) + + ds_b = rd.read_parquet(str(pre_embed_dir)) + ingestor_b = create_ingestor(**create_kwargs2) + ingestor_b._rd_dataset = ds_b + ingestor_b._input_documents = [] + ingestor_b.embed_only( + EmbedParams(model_name=embed_model_name, embed_use_vllm_offline=False, embed_model_path=None) + ).vdb_upload( + VdbUploadParams( + lancedb={ + "lancedb_uri": lancedb_uri, + "table_name": TABLE_BASELINE, + "overwrite": True, + "create_index": True, + } + ) + ).ingest( + params=IngestExecuteParams(parallel=True, max_workers=8, show_progress=True) + ) + preembed_baseline = _run_recall_only( + query_csv=query_csv, + lancedb_uri=lancedb_uri, + table_name=TABLE_BASELINE, + embed_model_name=embed_model_name, + embed_use_vllm_offline=False, + ) + + ds_v = rd.read_parquet(str(pre_embed_dir)) + ingestor_v = create_ingestor(**create_kwargs2) + ingestor_v._rd_dataset = ds_v + ingestor_v._input_documents = [] + ingestor_v.embed_only( + EmbedParams( + model_name=embed_model_name, + embed_use_vllm_offline=True, + embed_model_path=embed_model_path, + ) + ).vdb_upload( + VdbUploadParams( + lancedb={ + "lancedb_uri": lancedb_uri, + "table_name": TABLE_VLLM_OFFLINE, + "overwrite": True, + "create_index": True, + } + ) + ).ingest( + params=IngestExecuteParams(parallel=True, max_workers=8, show_progress=True) + ) + preembed_vllm = _run_recall_only( + query_csv=query_csv, + lancedb_uri=lancedb_uri, + table_name=TABLE_VLLM_OFFLINE, + embed_model_name=embed_model_name, + embed_use_vllm_offline=True, + embed_model_path=embed_model_path, + ) + + typer.echo("Step 4: Compare metrics...") + ok = True + if not _metrics_equal(full_baseline["metrics"], preembed_baseline["metrics"], tol=tolerance): + typer.echo( + f"Baseline recall mismatch: full={full_baseline['metrics']} vs pre-embed={preembed_baseline['metrics']}", + err=True, + ) + ok = False + if not _metrics_equal(full_vllm["metrics"], preembed_vllm["metrics"], tol=tolerance): + typer.echo( + f"vLLM recall mismatch: full={full_vllm['metrics']} vs pre-embed={preembed_vllm['metrics']}", + err=True, + ) + ok = False + if ok: + typer.echo("Recall parity: OK (full pipeline vs pre-embed path match within tolerance).") + else: + raise typer.Exit(1) + + +if __name__ == "__main__": + app() diff --git a/nemo_retriever/src/nemo_retriever/examples/inprocess_pipeline.py b/nemo_retriever/src/nemo_retriever/examples/inprocess_pipeline.py index 360194120..549ffa3f9 100644 --- a/nemo_retriever/src/nemo_retriever/examples/inprocess_pipeline.py +++ b/nemo_retriever/src/nemo_retriever/examples/inprocess_pipeline.py @@ -173,6 +173,22 @@ def main( min=0.0, help="Parse stage batch size (enables Parse-only path when > 0.0 with parse workers/GPU).", ), + embed_use_vllm_compat: bool = typer.Option( + False, + "--embed-use-vllm-compat/--no-embed-use-vllm-compat", + help="Use vLLM-compatible HTTP payload for embeddings (no input_type/truncate)." + "Set when --embed-invoke-url is a vLLM server.", + ), + embed_use_vllm_offline: bool = typer.Option( + False, + "--embed-use-vllm-offline/--no-embed-use-vllm-offline", + help="Use vLLM offline Python API for embeddings (no server). Same model as vLLM server path.", + ), + embed_model_path: Optional[str] = typer.Option( + None, + "--embed-model-path", + help="Local path to embedding model for vLLM offline (optional; else uses --embed-model-name).", + ), embed_modality: str = typer.Option( "text", "--embed-modality", @@ -212,6 +228,9 @@ def main( EmbedParams( model_name=str(embed_model_name), embed_invoke_url=embed_invoke_url, + embed_use_vllm_compat=embed_use_vllm_compat, + embed_use_vllm_offline=embed_use_vllm_offline, + embed_model_path=embed_model_path, embed_modality=embed_modality, text_elements_modality=text_elements_modality, structured_elements_modality=structured_elements_modality, @@ -238,6 +257,9 @@ def main( EmbedParams( model_name=str(embed_model_name), embed_invoke_url=embed_invoke_url, + embed_use_vllm_compat=embed_use_vllm_compat, + embed_use_vllm_offline=embed_use_vllm_offline, + embed_model_path=embed_model_path, embed_modality=embed_modality, text_elements_modality=text_elements_modality, structured_elements_modality=structured_elements_modality, @@ -280,6 +302,9 @@ def main( EmbedParams( model_name=str(embed_model_name), embed_invoke_url=embed_invoke_url, + embed_use_vllm_compat=embed_use_vllm_compat, + embed_use_vllm_offline=embed_use_vllm_offline, + embed_model_path=embed_model_path, embed_modality=embed_modality, text_elements_modality=text_elements_modality, structured_elements_modality=structured_elements_modality, @@ -321,6 +346,9 @@ def main( EmbedParams( model_name=str(embed_model_name), embed_invoke_url=embed_invoke_url, + embed_use_vllm_compat=embed_use_vllm_compat, + embed_use_vllm_offline=embed_use_vllm_offline, + embed_model_path=embed_model_path, embed_modality=embed_modality, text_elements_modality=text_elements_modality, structured_elements_modality=structured_elements_modality, @@ -379,6 +407,7 @@ def main( embedding_http_endpoint=embed_invoke_url, top_k=10, ks=(1, 5, 10), + embedding_use_vllm_compat=bool(embed_use_vllm_compat), ) _df_query, _gold, _raw_hits, _retrieved_keys, metrics = retrieve_and_score(query_csv=query_csv, cfg=cfg) diff --git a/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py b/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py index e33f82ee3..851740aae 100644 --- a/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py +++ b/nemo_retriever/src/nemo_retriever/ingest_modes/batch.py @@ -219,6 +219,10 @@ def _build_rows(self, df: Any) -> list: rows.append(row_out) return rows + # Columns that can trigger "cannot call vectorize on size 0 inputs" when Ray/Lance + # compute size; safe to drop for perf comparison (we only need ingest to complete). + _DROP_COLS_FOR_SIZE_GUARD = ("images", "tables", "charts", "infographics", "infographic") + def __call__(self, batch_df: Any) -> Any: rows = self._build_rows(batch_df) if rows: @@ -229,6 +233,11 @@ def __call__(self, batch_df: Any) -> Any: self._total_rows += len(rows) + # Drop list columns that cause "vectorize on size 0 inputs" in size calculation. + if hasattr(batch_df, "columns"): + drop = [c for c in self._DROP_COLS_FOR_SIZE_GUARD if c in batch_df.columns] + if drop: + batch_df = batch_df.drop(columns=drop) return batch_df @@ -244,6 +253,7 @@ def __init__(self, params: EmbedParams) -> None: self._kwargs = { **params.model_dump(mode="python", exclude={"runtime", "batch_tuning", "fused_tuning"}, exclude_none=True), **params.runtime.model_dump(mode="python", exclude_none=True), + **params.batch_tuning.model_dump(mode="python", exclude_none=True), } if "embedding_endpoint" not in self._kwargs and self._kwargs.get("embed_invoke_url"): self._kwargs["embedding_endpoint"] = self._kwargs.get("embed_invoke_url") @@ -254,6 +264,28 @@ def __init__(self, params: EmbedParams) -> None: self._model = None return + # When using vLLM offline, create one LLM per actor and reuse for all batches. + if self._kwargs.get("embed_use_vllm_offline"): + self._model = None + from nemo_retriever.text_embed.vllm_offline import create_vllm_llm + + embed_model = ( + self._kwargs.get("embed_model_path") + or self._kwargs.get("embed_model_name") + or self._kwargs.get("model_name") + or "" + ) + self._vllm_llm = create_vllm_llm( + str(embed_model), + dimensions=self._kwargs.get("dimensions"), + dtype=self._kwargs.get("dtype") or "float32", + gpu_memory_utilization=float(self._kwargs.get("gpu_memory_utilization", 0.45)), + enforce_eager=bool(self._kwargs.get("enforce_eager", False)), + compile_cache_dir=self._kwargs.get("compile_cache_dir"), + attention_backend=self._kwargs.get("attention_backend"), + ) + return + device = self._kwargs.get("device") hf_cache_dir = self._kwargs.get("hf_cache_dir") normalize = bool(self._kwargs.get("normalize", True)) @@ -290,7 +322,111 @@ def __init__(self, params: EmbedParams) -> None: def __call__(self, batch_df: Any) -> Any: from nemo_retriever.ingest_modes.inprocess import embed_text_main_text_embed - return embed_text_main_text_embed(batch_df, model=self._model, **self._kwargs) + kwargs = dict(self._kwargs) + if getattr(self, "_vllm_llm", None) is not None: + kwargs["vllm_llm"] = self._vllm_llm + # Use embed_batch_size as inference_batch_size so the full Ray batch is sent to vLLM in one go. + if kwargs.get("embed_use_vllm_offline") and "embed_batch_size" in kwargs: + kwargs["inference_batch_size"] = int(kwargs["embed_batch_size"]) + return embed_text_main_text_embed(batch_df, model=self._model, **kwargs) + + +class _EmbedServiceActorImpl: + """Long-lived actor that holds a single embedder (HF or vLLM). Same init as _BatchEmbedActor.""" + + def __init__(self, params: EmbedParams) -> None: + self._params = params + self._kwargs = { + **params.model_dump(mode="python", exclude={"runtime", "batch_tuning", "fused_tuning"}, exclude_none=True), + **params.runtime.model_dump(mode="python", exclude_none=True), + **params.batch_tuning.model_dump(mode="python", exclude_none=True), + } + if "embedding_endpoint" not in self._kwargs and self._kwargs.get("embed_invoke_url"): + self._kwargs["embedding_endpoint"] = self._kwargs.get("embed_invoke_url") + + endpoint = (self._kwargs.get("embedding_endpoint") or self._kwargs.get("embed_invoke_url") or "").strip() + if endpoint: + self._model = None + return + + if self._kwargs.get("embed_use_vllm_offline"): + self._model = None + from nemo_retriever.text_embed.vllm_offline import create_vllm_llm + + embed_model = ( + self._kwargs.get("embed_model_path") + or self._kwargs.get("embed_model_name") + or self._kwargs.get("model_name") + or "" + ) + self._vllm_llm = create_vllm_llm( + str(embed_model), + dimensions=self._kwargs.get("dimensions"), + dtype=self._kwargs.get("dtype") or "float32", + gpu_memory_utilization=float(self._kwargs.get("gpu_memory_utilization", 0.45)), + enforce_eager=bool(self._kwargs.get("enforce_eager", False)), + compile_cache_dir=self._kwargs.get("compile_cache_dir"), + attention_backend=self._kwargs.get("attention_backend"), + ) + return + + device = self._kwargs.get("device") + hf_cache_dir = self._kwargs.get("hf_cache_dir") + normalize = bool(self._kwargs.get("normalize", True)) + max_length = int(self._kwargs.get("max_length", 8192)) + model_name_raw = self._kwargs.get("model_name") + + from nemo_retriever.model import is_vl_embed_model, resolve_embed_model + + model_id = resolve_embed_model(model_name_raw) + + if is_vl_embed_model(model_name_raw): + from nemo_retriever.model.local.llama_nemotron_embed_vl_1b_v2_embedder import ( + LlamaNemotronEmbedVL1BV2Embedder, + ) + + self._model = LlamaNemotronEmbedVL1BV2Embedder( + device=str(device) if device else None, + hf_cache_dir=str(hf_cache_dir) if hf_cache_dir else None, + model_id=model_id, + ) + else: + from nemo_retriever.model.local.llama_nemotron_embed_1b_v2_embedder import ( + LlamaNemotronEmbed1BV2Embedder, + ) + + self._model = LlamaNemotronEmbed1BV2Embedder( + device=str(device) if device else None, + hf_cache_dir=str(hf_cache_dir) if hf_cache_dir else None, + normalize=normalize, + max_length=max_length, + model_id=model_id, + ) + + def embed_batch(self, batch_df: Any) -> Any: + from nemo_retriever.ingest_modes.inprocess import embed_text_main_text_embed + + kwargs = dict(self._kwargs) + if getattr(self, "_vllm_llm", None) is not None: + kwargs["vllm_llm"] = self._vllm_llm + # Use embed_batch_size as inference_batch_size so the full Ray batch is sent to vLLM in one go. + if kwargs.get("embed_use_vllm_offline") and "embed_batch_size" in kwargs: + kwargs["inference_batch_size"] = int(kwargs["embed_batch_size"]) + return embed_text_main_text_embed(batch_df, model=self._model, **kwargs) + + +EmbedServiceActor = ray.remote(num_gpus=1)(_EmbedServiceActorImpl) + + +class EmbedViaService: + """Stateless callable for map_batches: forwards each batch to a named EmbedServiceActor.""" + + def __init__(self, service_name: str) -> None: + self._service_name = service_name + + def __call__(self, batch_df: Any) -> Any: + actor = ray.get_actor(self._service_name) + return ray.get(actor.embed_batch.remote(batch_df)) class BatchIngestor(Ingestor): @@ -797,6 +933,61 @@ def extract_audio( ) return self + def _add_explode_stage( + self, + *, + embed_modality: str, + text_elements_modality: str, + structured_elements_modality: str, + embed_batch_size: int = 256, + ) -> None: + """Append repartition + explode_content_to_rows to the current Ray Dataset.""" + self._rd_dataset = self._rd_dataset.repartition(target_num_rows_per_block=256) + from functools import partial + from nemo_retriever.ingest_modes.inprocess import explode_content_to_rows + + _explode_fn = partial( + explode_content_to_rows, + modality=embed_modality, + text_elements_modality=text_elements_modality, + structured_elements_modality=structured_elements_modality, + ) + self._rd_dataset = self._rd_dataset.map_batches( + _explode_fn, + batch_size=embed_batch_size, + batch_format="pandas", + num_cpus=1, + num_gpus=0, + ) + + def prepare_for_embed(self, params: EmbedParams | None = None, **kwargs: Any) -> "BatchIngestor": + """ + Add the explode stage (repartition + explode_content_to_rows) without the embed actor. + + Use before ``save_intermediate_results(pre_embed_dir)`` to write pre-embed + parquet. Later, load with ``ray.data.read_parquet(pre_embed_dir)`` and call + ``embed_only(...).vdb_upload(...).ingest()``. + """ + resolved = _coerce_params(params, EmbedParams, kwargs) + kwargs_copy = { + **resolved.model_dump( + mode="python", exclude={"runtime", "batch_tuning", "fused_tuning"}, exclude_none=True + ), + **resolved.runtime.model_dump(mode="python", exclude_none=True), + **resolved.batch_tuning.model_dump(mode="python", exclude_none=True), + } + embed_batch_size = int(kwargs_copy.pop("embed_batch_size", 256)) + embed_modality = resolved.embed_modality + text_elements_modality = resolved.text_elements_modality or embed_modality + structured_elements_modality = resolved.structured_elements_modality or embed_modality + self._add_explode_stage( + embed_modality=embed_modality, + text_elements_modality=text_elements_modality, + structured_elements_modality=structured_elements_modality, + embed_batch_size=embed_batch_size, + ) + return self + def embed(self, params: EmbedParams | None = None, **kwargs: Any) -> "BatchIngestor": """ Add a text-embedding stage to the batch pipeline. @@ -814,7 +1005,6 @@ def embed(self, params: EmbedParams | None = None, **kwargs: Any) -> "BatchInges delegates to the remote NIM instead of loading a local model, and no GPU is requested for this stage. """ - resolved = _coerce_params(params, EmbedParams, kwargs) kwargs = { **resolved.model_dump( @@ -847,41 +1037,22 @@ def _endpoint_count(raw: Any) -> int: ) embed_workers = int(endpoint_count) - # Remaining kwargs are forwarded to the actor constructor. embed_modality = resolved.embed_modality text_elements_modality = resolved.text_elements_modality or embed_modality structured_elements_modality = resolved.structured_elements_modality or embed_modality self._tasks.append(("embed", dict(kwargs))) - # Explode content rows before embedding so each table/chart/infographic - # gets its own embedding vector (mirrors nv-ingest per-element embeddings). - self._rd_dataset = self._rd_dataset.repartition(target_num_rows_per_block=256) - - from functools import partial - from nemo_retriever.ingest_modes.inprocess import explode_content_to_rows - - _explode_fn = partial( - explode_content_to_rows, - modality=embed_modality, + self._add_explode_stage( + embed_modality=embed_modality, text_elements_modality=text_elements_modality, structured_elements_modality=structured_elements_modality, - ) - self._rd_dataset = self._rd_dataset.map_batches( - _explode_fn, - batch_size=embed_batch_size, - batch_format="pandas", - num_cpus=1, - num_gpus=0, + embed_batch_size=embed_batch_size, ) - # When using a remote NIM endpoint, no GPU is needed for embedding. endpoint = (kwargs.get("embedding_endpoint") or kwargs.get("embed_invoke_url") or "").strip() if endpoint: gpu_per_stage = 0 else: - # Embedding is GPU-bound; only needs modest CPU for tokenisation. - # Requesting all CPUs would prevent this stage from overlapping with - # upstream extraction/detection in Ray Data's streaming pipeline. gpu_per_stage = getattr(self, "_gpu_embed", 1.0) self._rd_dataset = self._rd_dataset.map_batches( @@ -896,6 +1067,68 @@ def _endpoint_count(raw: Any) -> int: return self + def embed_only(self, params: EmbedParams | None = None, **kwargs: Any) -> "BatchIngestor": + """ + Add only the _BatchEmbedActor stage (no repartition, no explode). + + Assumes ``self._rd_dataset`` is already the post-explode dataset, e.g. from + ``ray.data.read_parquet(pre_embed_dir)``. Chain with ``vdb_upload(...).ingest()``. + """ + # embedding_service_name is not on EmbedParams; pop from kwargs before _coerce_params so it is not lost. + embedding_service_name = kwargs.pop("embedding_service_name", None) + resolved = _coerce_params(params, EmbedParams, kwargs) + kwargs_copy = { + **resolved.model_dump( + mode="python", exclude={"runtime", "batch_tuning", "fused_tuning"}, exclude_none=True + ), + **resolved.runtime.model_dump(mode="python", exclude_none=True), + **resolved.batch_tuning.model_dump(mode="python", exclude_none=True), + } + + def _endpoint_count(raw: Any) -> int: + s = str(raw or "").strip() + if not s: + return 0 + return len([p for p in s.split(",") if p.strip()]) + + embed_workers = kwargs_copy.pop("embed_workers", 1) + embed_batch_size = kwargs_copy.pop("embed_batch_size", 256) + embed_cpus_per_actor = float(kwargs_copy.pop("embed_cpus_per_actor", 1)) + + if "embedding_endpoint" not in kwargs_copy and kwargs_copy.get("embed_invoke_url"): + kwargs_copy["embedding_endpoint"] = kwargs_copy.get("embed_invoke_url") + + endpoint_count = _endpoint_count(kwargs_copy.get("embedding_endpoint")) + if endpoint_count > 0 and int(embed_workers) != int(endpoint_count): + embed_workers = int(endpoint_count) + + endpoint = (kwargs_copy.get("embedding_endpoint") or kwargs_copy.get("embed_invoke_url") or "").strip() + if endpoint: + gpu_per_stage = 0 + else: + gpu_per_stage = getattr(self, "_gpu_embed", 1.0) + + if embedding_service_name is not None: + self._rd_dataset = self._rd_dataset.map_batches( + EmbedViaService(embedding_service_name), + batch_size=embed_batch_size, + batch_format="pandas", + num_cpus=1, + num_gpus=0, + ) + else: + self._rd_dataset = self._rd_dataset.map_batches( + _BatchEmbedActor, + batch_size=embed_batch_size, + batch_format="pandas", + num_cpus=embed_cpus_per_actor, + num_gpus=gpu_per_stage, + compute=rd.ActorPoolStrategy(size=embed_workers), + fn_constructor_kwargs={"params": resolved}, + ) + + return self + def vdb_upload(self, params: VdbUploadParams | None = None, **kwargs: Any) -> "BatchIngestor": """ Add a streaming LanceDB upload stage to the batch pipeline. @@ -991,7 +1224,10 @@ def ingest(self, params: IngestExecuteParams | None = None, **kwargs: Any) -> in num_pages = self._rd_dataset.count() elapsed = time.monotonic() - t0 - print(f"[done] {len(self._input_documents)} files, {num_pages} pages in {elapsed:.1f}s") + if len(self._input_documents) == 0: + print(f"[done] Embedded {num_pages} rows from pre-embed in {elapsed:.1f}s") + else: + print(f"[done] {len(self._input_documents)} files, {num_pages} pages in {elapsed:.1f}s") # region agent log _debug_log( run_id=str(runtime_metrics_prefix or "unknown"), diff --git a/nemo_retriever/src/nemo_retriever/ingest_modes/inprocess.py b/nemo_retriever/src/nemo_retriever/ingest_modes/inprocess.py index 963f64b97..9305c4116 100644 --- a/nemo_retriever/src/nemo_retriever/ingest_modes/inprocess.py +++ b/nemo_retriever/src/nemo_retriever/ingest_modes/inprocess.py @@ -240,6 +240,10 @@ def _embed_group( inference_batch_size: int, output_column: str, resolved_model_name: str, + use_vllm_compat: bool = False, + use_vllm_offline: bool = False, + embed_model_path: Optional[str] = None, + vllm_llm: Any = None, ) -> pd.DataFrame: """Embed a single modality group via ``create_text_embeddings_for_df``. @@ -249,7 +253,10 @@ def _embed_group( _embed = None _multimodal_embedder = None - if endpoint is None and model is not None: + if endpoint is None and use_vllm_offline: + # vLLM offline path: no local model; task_config will trigger vLLM offline in create_text_embeddings_for_df. + pass + elif endpoint is None and model is not None: if group_modality in IMAGE_MODALITIES: _multimodal_embedder = model else: @@ -285,14 +292,23 @@ def _embed(texts: Sequence[str]) -> Sequence[Sequence[float]]: # noqa: F811 embed_modality=group_modality, ) + task_config = { + "embedder": _embed, + "multimodal_embedder": _multimodal_embedder, + "endpoint_url": endpoint, + "local_batch_size": int(inference_batch_size), + } + if use_vllm_compat: + task_config["use_vllm_compat"] = True + if use_vllm_offline: + task_config["use_vllm_offline"] = True + task_config["embed_model_name"] = resolved_model_name + task_config["embed_model_path"] = embed_model_path + if vllm_llm is not None: + task_config["vllm_llm"] = vllm_llm out_df, _ = create_text_embeddings_for_df( group_df, - task_config={ - "embedder": _embed, - "multimodal_embedder": _multimodal_embedder, - "endpoint_url": endpoint, - "local_batch_size": int(inference_batch_size), - }, + task_config=task_config, transform_config=cfg, ) return out_df @@ -307,6 +323,10 @@ def embed_text_main_text_embed( model_name: Optional[str] = None, embedding_endpoint: Optional[str] = None, embed_invoke_url: Optional[str] = None, + embed_use_vllm_compat: bool = False, + embed_use_vllm_offline: bool = False, + embed_model_path: Optional[str] = None, + vllm_llm: Any = None, text_column: str = "text", inference_batch_size: int = 16, output_column: str = "text_embeddings_1b_v2", @@ -343,8 +363,8 @@ def embed_text_main_text_embed( # Resolve endpoint: strip whitespace, treat empty string as None. _endpoint = (embedding_endpoint or embed_invoke_url or "").strip() or None - if _endpoint is None and model is None: - raise ValueError("Either a local model or an embedding_endpoint must be provided.") + if _endpoint is None and model is None and not embed_use_vllm_offline: + raise ValueError("Either a local model, an embedding_endpoint, or embed_use_vllm_offline must be provided.") # Resolve NIM aliases to the actual HF model ID. from nemo_retriever.model import resolve_embed_model @@ -372,6 +392,10 @@ def embed_text_main_text_embed( inference_batch_size=inference_batch_size, output_column=output_column, resolved_model_name=_resolved_model_name, + use_vllm_compat=bool(embed_use_vllm_compat), + use_vllm_offline=bool(embed_use_vllm_offline), + embed_model_path=embed_model_path, + vllm_llm=vllm_llm, ) else: # Multiple modalities: group, embed each, reassemble in original order. @@ -390,6 +414,10 @@ def embed_text_main_text_embed( inference_batch_size=inference_batch_size, output_column=output_column, resolved_model_name=_resolved_model_name, + use_vllm_compat=bool(embed_use_vllm_compat), + use_vllm_offline=bool(embed_use_vllm_offline), + embed_model_path=embed_model_path, + vllm_llm=vllm_llm, ) parts.append(part) out_df = pd.concat(parts).sort_index() @@ -1408,6 +1436,14 @@ def embed(self, params: EmbedParams | None = None, **kwargs: Any) -> "InProcessI self._tasks.append((embed_text_main_text_embed, embed_kwargs)) return self + # vLLM offline path: no server, no local HF model; embed stage will use vLLM Python API. + if resolved.embed_use_vllm_offline: + embed_kwargs["embed_use_vllm_offline"] = True + embed_kwargs["embed_model_path"] = getattr(resolved, "embed_model_path", None) + embed_kwargs.setdefault("input_type", "passage") + self._tasks.append((embed_text_main_text_embed, embed_kwargs)) + return self + # Local HF embedder path. # Allow callers to control device / max_length to avoid OOMs. device = embed_kwargs.pop("device", None) diff --git a/nemo_retriever/src/nemo_retriever/params/models.py b/nemo_retriever/src/nemo_retriever/params/models.py index 14462a414..dd8f7ae15 100644 --- a/nemo_retriever/src/nemo_retriever/params/models.py +++ b/nemo_retriever/src/nemo_retriever/params/models.py @@ -33,6 +33,16 @@ class ModelRuntimeParams(_ParamsModel): normalize: bool = True max_length: int = 8192 model_name: Optional[str] = None + # vLLM offline: use CUDA graphs when False (faster); set True to avoid torch.compile issues (e.g. noexec mount). + enforce_eager: bool = False + # vLLM: fraction of GPU memory for KV cache / batching (default 0.45); increase (e.g. 0.55–0.7) for throughput. + gpu_memory_utilization: float = 0.45 + # Optional dir for torch inductor/Triton cache when enforce_eager=False; must be on non-noexec fs. + compile_cache_dir: Optional[str] = None + # vLLM offline: dtype (default float32). Use "bfloat16" or "float16" to allow FLASH_ATTN (fp32 forces TRITON_ATTN). + dtype: Optional[str] = None + # vLLM offline: attention backend (e.g. "FLASH_ATTN", "TRITON_ATTN"). None = auto; FLASH_ATTN needs bf16/fp16. + attention_backend: Optional[str] = None class IngestorCreateParams(_ParamsModel): @@ -181,6 +191,9 @@ class EmbedParams(_ParamsModel): model_name: Optional[str] = None embedding_endpoint: Optional[str] = None embed_invoke_url: Optional[str] = None + embed_use_vllm_compat: bool = False # Use vLLM-compatible HTTP payload when using remote endpoint + embed_use_vllm_offline: bool = False # Use vLLM offline Python API (no server) for embeddings + embed_model_path: Optional[str] = None # Local path for vLLM model (optional; else uses model_name) input_type: str = "passage" embed_modality: str = "text" # "text", "image", or "text_image" — default for all element types text_elements_modality: Optional[str] = None # per-type override for page-text rows diff --git a/nemo_retriever/src/nemo_retriever/recall/core.py b/nemo_retriever/src/nemo_retriever/recall/core.py index 9fc80d59f..b25f047ee 100644 --- a/nemo_retriever/src/nemo_retriever/recall/core.py +++ b/nemo_retriever/src/nemo_retriever/recall/core.py @@ -46,6 +46,13 @@ class RecallConfig: local_hf_device: Optional[str] = None local_hf_cache_dir: Optional[str] = None local_hf_batch_size: int = 64 + # When True and an HTTP embedding endpoint is set, use vLLM-compatible minimal + # payload (no input_type/truncate). Set this when the endpoint is a vLLM server. + embedding_use_vllm_compat: bool = False + # When True and no endpoint is set, use vLLM offline Python API for query embeddings. + embedding_use_vllm_offline: bool = False + # Optional override for vLLM model path/id when using embedding_use_vllm_offline. + embedding_vllm_model_path: Optional[str] = None def _normalize_query_df(df: pd.DataFrame) -> pd.DataFrame: @@ -106,6 +113,45 @@ def _resolve_embedding_endpoint(cfg: RecallConfig) -> Tuple[Optional[str], Optio return None, None +def _embed_queries_vllm_http( + queries: List[str], + *, + endpoint: str, + model: str, + api_key: str, + batch_size: int = 256, +) -> List[List[float]]: + """Embed queries via vLLM-compatible HTTP (minimal payload, no input_type/truncate).""" + from nemo_retriever.text_embed.vllm_http import embed_via_vllm_http + + # llama-nemotron-embed-1b-v2 expects "query: " prefix for queries (see model README). + return embed_via_vllm_http( + queries, + endpoint_url=endpoint, + model_name=model, + api_key=(api_key or "").strip() or None, + batch_size=batch_size, + prefix="query: ", + ) + + +def _embed_queries_vllm_offline( + queries: List[str], + *, + model_path_or_id: str, + batch_size: int = 256, +) -> List[List[float]]: + """Embed queries via vLLM offline Python API (no server).""" + from nemo_retriever.text_embed.vllm_offline import embed_via_vllm_offline + + return embed_via_vllm_offline( + queries, + model=model_path_or_id, + batch_size=batch_size, + prefix="query: ", + ) + + def _embed_queries_nim( queries: List[str], *, @@ -297,12 +343,28 @@ def retrieve_and_score( endpoint, use_grpc = _resolve_embedding_endpoint(cfg) if endpoint is not None and use_grpc is not None: - vectors = _embed_queries_nim( + if bool(cfg.embedding_use_vllm_compat) and not use_grpc: + vectors = _embed_queries_vllm_http( + queries, + endpoint=endpoint, + model=cfg.embedding_model, + api_key=cfg.embedding_api_key, + batch_size=256, + ) + else: + vectors = _embed_queries_nim( + queries, + endpoint=endpoint, + model=cfg.embedding_model, + api_key=cfg.embedding_api_key, + grpc=bool(use_grpc), + ) + elif cfg.embedding_use_vllm_offline: + model_path = cfg.embedding_vllm_model_path or cfg.embedding_model + vectors = _embed_queries_vllm_offline( queries, - endpoint=endpoint, - model=cfg.embedding_model, - api_key=cfg.embedding_api_key, - grpc=bool(use_grpc), + model_path_or_id=model_path, + batch_size=int(cfg.local_hf_batch_size), ) else: vectors = _embed_queries_local_hf( diff --git a/nemo_retriever/src/nemo_retriever/recall/vdb_recall.py b/nemo_retriever/src/nemo_retriever/recall/vdb_recall.py index 8132f8268..3bdcf14ff 100644 --- a/nemo_retriever/src/nemo_retriever/recall/vdb_recall.py +++ b/nemo_retriever/src/nemo_retriever/recall/vdb_recall.py @@ -131,6 +131,21 @@ def recall_with_main( min=1, help="Batch size for local HF embedding inference.", ), + embedding_use_vllm_compat: bool = typer.Option( + False, + "--embedding-use-vllm-compat/--no-embedding-use-vllm-compat", + help="Use vLLM-compatible HTTP payload (no input_type/truncate). Set when endpoint is a vLLM server.", + ), + embedding_use_vllm_offline: bool = typer.Option( + False, + "--embedding-use-vllm-offline/--no-embedding-use-vllm-offline", + help="Use vLLM offline Python API for query embeddings (no server).", + ), + embedding_vllm_model_path: Optional[str] = typer.Option( + None, + "--embedding-vllm-model-path", + help="Model path/id for vLLM offline (optional; else uses --embedding-model).", + ), ) -> None: query_csv = _resolve_query_csv(Path(query_csv)) @@ -155,6 +170,9 @@ def recall_with_main( local_hf_device=_coerce_endpoint_str(local_hf_device), local_hf_cache_dir=(str(local_hf_cache_dir) if local_hf_cache_dir is not None else None), local_hf_batch_size=int(local_hf_batch_size), + embedding_use_vllm_compat=bool(embedding_use_vllm_compat), + embedding_use_vllm_offline=bool(embedding_use_vllm_offline), + embedding_vllm_model_path=_coerce_endpoint_str(embedding_vllm_model_path), ) print("Reading and normalizing query CSV...") @@ -251,6 +269,21 @@ def run( min=1, help="Batch size for local HF embedding inference.", ), + embedding_use_vllm_compat: bool = typer.Option( + False, + "--embedding-use-vllm-compat/--no-embedding-use-vllm-compat", + help="Use vLLM-compatible HTTP payload (no input_type/truncate). Set when endpoint is a vLLM server.", + ), + embedding_use_vllm_offline: bool = typer.Option( + False, + "--embedding-use-vllm-offline/--no-embedding-use-vllm-offline", + help="Use vLLM offline Python API for query embeddings (no server).", + ), + embedding_vllm_model_path: Optional[str] = typer.Option( + None, + "--embedding-vllm-model-path", + help="Model path/id for vLLM offline (optional; else uses --embedding-model).", + ), print_hits: bool = typer.Option(True, "--print-hits/--no-print-hits", help="Print top-k hits per query."), ) -> None: """ @@ -282,6 +315,9 @@ def run( local_hf_device=_coerce_endpoint_str(local_hf_device), local_hf_cache_dir=(str(local_hf_cache_dir) if local_hf_cache_dir is not None else None), local_hf_batch_size=int(local_hf_batch_size), + embedding_use_vllm_compat=bool(embedding_use_vllm_compat), + embedding_use_vllm_offline=bool(embedding_use_vllm_offline), + embedding_vllm_model_path=_coerce_endpoint_str(embedding_vllm_model_path), ) df_query, gold, raw_hits, retrieved_keys, metrics = retrieve_and_score( diff --git a/nemo_retriever/src/nemo_retriever/text_embed/commands.py b/nemo_retriever/src/nemo_retriever/text_embed/commands.py index 5a47fd60a..197bdf169 100644 --- a/nemo_retriever/src/nemo_retriever/text_embed/commands.py +++ b/nemo_retriever/src/nemo_retriever/text_embed/commands.py @@ -139,6 +139,17 @@ def run( min=1, help="Batch size for local HF embedding inference.", ), + use_vllm_offline: bool = typer.Option( + False, + "--use-vllm-offline/--no-use-vllm-offline", + help="Use vLLM offline Python API (no server) for embeddings.", + ), + embed_model_path: Optional[Path] = typer.Option( + None, + "--embed-model-path", + path_type=Path, + help="Local path to model for vLLM offline (optional; else uses --model-name).", + ), overwrite: bool = typer.Option(False, "--overwrite", help="Overwrite existing outputs."), limit: Optional[int] = typer.Option(None, "--limit", min=1, help="Optionally limit number of input files."), write_embedding_input: bool = typer.Option( @@ -162,14 +173,20 @@ def run( task_cfg: Dict[str, Any] = {} if api_key is not None: task_cfg["api_key"] = api_key - if endpoint_url is not None: + if use_vllm_offline: + task_cfg["use_vllm_offline"] = True + task_cfg["embed_model_path"] = str(embed_model_path) if embed_model_path is not None else None + if model_name is not None: + task_cfg["model_name"] = model_name + task_cfg["endpoint_url"] = None + elif endpoint_url is not None: value = endpoint_url.strip() task_cfg["endpoint_url"] = None if value.lower() in ("", "none", "null") else value elif not cfg_dict.get("embedding_nim_endpoint"): # No CLI endpoint and no config-file endpoint — override the schema default # so maybe_inject_local_hf_embedder() will inject the local HF fallback. task_cfg["endpoint_url"] = None - if model_name is not None: + if model_name is not None and "model_name" not in task_cfg: task_cfg["model_name"] = model_name if dimensions is not None: task_cfg["dimensions"] = int(dimensions) diff --git a/nemo_retriever/src/nemo_retriever/text_embed/main_text_embed.py b/nemo_retriever/src/nemo_retriever/text_embed/main_text_embed.py index 0c20f7eb6..9bb63ed80 100644 --- a/nemo_retriever/src/nemo_retriever/text_embed/main_text_embed.py +++ b/nemo_retriever/src/nemo_retriever/text_embed/main_text_embed.py @@ -455,6 +455,42 @@ def _async_runner( return flat_results +def _vllm_compat_runner( + prompts: List[List[str]], + api_key: Optional[str], + endpoint_url: str, + embedding_model: str, + encoding_format: str, + dimensions: Optional[int] = None, + batch_size: int = 256, +) -> dict: + """ + Request embeddings using vLLM-compatible minimal payload (no input_type/truncate). + Returns the same {"embeddings": [...], "info_msgs": [...]} shape as _async_runner. + """ + from nemo_retriever.text_embed.vllm_http import embed_via_vllm_http + + flat_prompts: List[str] = [] + for batch in prompts: + flat_prompts.extend(batch) + if not flat_prompts: + return {"embeddings": [], "info_msgs": []} + # llama-nemotron-embed-1b-v2 expects "passage: " for documents (see model README). + vectors = embed_via_vllm_http( + flat_prompts, + endpoint_url=endpoint_url, + model_name=embedding_model, + api_key=api_key, + dimensions=dimensions, + encoding_format=encoding_format, + batch_size=batch_size, + prefix="passage: ", + ) + # Normalize to list of list (or None for missing) + embeddings = [v if v else None for v in vectors] + return {"embeddings": embeddings, "info_msgs": [None] * len(embeddings)} + + def _callable_runner( prompts: List[List[str]], *, @@ -482,6 +518,47 @@ def _callable_runner( return {"embeddings": flat_embeddings, "info_msgs": flat_info_msgs} +def _vllm_offline_runner( + prompts: List[List[str]], + *, + model: str, + batch_size: int = 64, + dimensions: Optional[int] = None, + llm: Optional[Any] = None, +) -> dict: + """ + Request embeddings using vLLM offline Python API (no server). + If llm is provided (e.g. from a batch actor), reuse it and skip model load/CUDA capture. + Returns the same {"embeddings": [...], "info_msgs": [...]} shape as _vllm_compat_runner. + """ + flat_prompts: List[str] = [] + for batch in prompts: + flat_prompts.extend(batch) + if not flat_prompts: + return {"embeddings": [], "info_msgs": []} + if llm is not None: + from nemo_retriever.text_embed.vllm_offline import embed_with_vllm_llm + + vectors = embed_with_vllm_llm( + flat_prompts, + llm, + batch_size=batch_size, + prefix="passage: ", + ) + else: + from nemo_retriever.text_embed.vllm_offline import embed_via_vllm_offline + + vectors = embed_via_vllm_offline( + flat_prompts, + model=model, + batch_size=batch_size, + prefix="passage: ", + dimensions=dimensions, + ) + embeddings = [v if v else None for v in vectors] + return {"embeddings": embeddings, "info_msgs": [None] * len(embeddings)} + + # ------------------------------------------------------------------------------ # Row update helpers (adapted for retriever-local DataFrames) # ------------------------------------------------------------------------------ @@ -542,6 +619,8 @@ def create_text_embeddings_for_df( - **dimensions**: optional int - **embedder**: optional callable(texts)->vectors; used when endpoint_url is empty/None - **local_batch_size**: int; used to sub-batch for the callable embedder path + - **use_vllm_offline**: bool; if True and no endpoint_url, use vLLM offline Python API + - **embed_model_name** or **embed_model_path**: model id or path for vLLM offline transform_config: Optional TextEmbeddingConfig; if omitted, defaults are used. execution_trace_log: @@ -656,7 +735,28 @@ def _text_image_content(r: pd.Series) -> Optional[str]: filtered_content_list, batch_size=int(transform_config.batch_size) ) - if endpoint_url: + if not endpoint_url and task_config.get("use_vllm_offline"): + embed_model = ( + task_config.get("embed_model_path") or task_config.get("embed_model_name") or str(model_name) + ) + content_embeddings = _vllm_offline_runner( + filtered_content_batches, + model=str(embed_model), + batch_size=int(task_config.get("local_batch_size") or transform_config.batch_size), + dimensions=dimensions, + llm=task_config.get("vllm_llm"), + ) + elif endpoint_url and task_config.get("use_vllm_compat"): + content_embeddings = _vllm_compat_runner( + filtered_content_batches, + api_key=api_key, + endpoint_url=str(endpoint_url), + embedding_model=str(model_name), + encoding_format=str(transform_config.encoding_format), + dimensions=dimensions, + batch_size=int(transform_config.batch_size), + ) + elif endpoint_url: content_embeddings = _async_runner( filtered_content_batches, api_key, diff --git a/nemo_retriever/src/nemo_retriever/text_embed/processor.py b/nemo_retriever/src/nemo_retriever/text_embed/processor.py index 555f7e28b..2aaa7ecf3 100644 --- a/nemo_retriever/src/nemo_retriever/text_embed/processor.py +++ b/nemo_retriever/src/nemo_retriever/text_embed/processor.py @@ -76,6 +76,10 @@ def maybe_inject_local_hf_embedder(task_config: Dict[str, Any], transform_config if callable(task_config.get("embedder")): return + # When vLLM offline is requested, do not inject HF embedder; create_text_embeddings_for_df will use vLLM offline. + if task_config.get("use_vllm_offline"): + return + # Resolve endpoint_url with explicit None override support. if "endpoint_url" in task_config: endpoint_url = task_config.get("endpoint_url") diff --git a/nemo_retriever/src/nemo_retriever/text_embed/vllm_http.py b/nemo_retriever/src/nemo_retriever/text_embed/vllm_http.py new file mode 100644 index 000000000..a15fa1662 --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/text_embed/vllm_http.py @@ -0,0 +1,189 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +vLLM-compatible HTTP embedding client. + +vLLM's OpenAI-compatible embeddings API expects a minimal payload (model, input, +encoding_format). It does not use NIM-specific fields like input_type or truncate. +Use this module when pointing the retriever at a vLLM embedding server (e.g. for +llama-nemotron-embed-1b-v2) to avoid request schema mismatches. +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + +# Reduce HTTP client logging verbosity +logging.getLogger("httpx").setLevel(logging.ERROR) +logging.getLogger("httpcore").setLevel(logging.ERROR) + + +def _normalize_embeddings_endpoint(endpoint_url: str) -> str: + """Normalize endpoint to a concrete /embeddings URL.""" + s = (endpoint_url or "").strip().rstrip("/") + if not s: + raise ValueError("endpoint_url is empty") + if s.endswith("/embeddings"): + return s + return f"{s}/embeddings" + + +def get_model_id_from_server(endpoint_url: str, timeout_s: float = 10.0) -> str | None: + """ + GET {endpoint}/models (or {endpoint}/v1/models) and return the first model id. + Returns None if the request fails or the response has no models. + """ + try: + import httpx # type: ignore + except ImportError: + return None + base = (endpoint_url or "").strip().rstrip("/") + if not base: + return None + # Try base/models (e.g. .../v1/models) then root .../models + base_no_v1 = base.removesuffix("/v1").rstrip("/") if base.endswith("/v1") else base + candidates = [f"{base}/models", f"{base_no_v1}/models"] + for url in candidates: + try: + with httpx.Client(timeout=timeout_s) as client: + resp = client.get(url) + if resp.status_code != 200: + continue + data = resp.json() + if not isinstance(data, dict): + continue + models = data.get("data") + if isinstance(models, list) and len(models) > 0 and isinstance(models[0], dict): + return models[0].get("id") + except Exception: + continue + return None + + +def embed_via_vllm_http( + prompts: List[str], + *, + endpoint_url: str, + model_name: str, + api_key: Optional[str] = None, + dimensions: Optional[int] = None, + encoding_format: str = "float", + batch_size: int = 256, + timeout_s: float = 120.0, + prefix: Optional[str] = None, +) -> List[List[float]]: + """ + Request embeddings from a vLLM (or other OpenAI-compatible) server using a + minimal payload: model, input, encoding_format. No input_type or truncate. + + Parameters + ---------- + prompts : list of str + Texts to embed. If prefix is set, each prompt is prefixed (e.g. "query: " or "passage: "). + endpoint_url : str + Base URL of the server (e.g. http://localhost:8000/v1). + model_name : str + Model name as returned by the server (e.g. from models.list()). + api_key : str, optional + Optional Bearer token. + dimensions : int, optional + Optional embedding dimension (if supported by server). + encoding_format : str + "float" or "base64". + batch_size : int + Max prompts per request. + timeout_s : float + Request timeout in seconds. + + Returns + ------- + list of list of float + One embedding vector per prompt, in order. + """ + try: + import httpx # type: ignore + except ImportError as e: + raise RuntimeError("vLLM HTTP embedding requires httpx.") from e + + # Resolve model name from server if not provided + resolved_model = (model_name or "").strip() + if not resolved_model: + resolved_model = get_model_id_from_server(endpoint_url, timeout_s=min(10.0, timeout_s)) or "" + if not resolved_model: + raise ValueError( + "model_name is empty and could not discover model id from server; " + "pass --embed-model-name or ensure server exposes /v1/models or /models" + ) + + headers: Dict[str, str] = {"accept": "application/json", "content-type": "application/json"} + if (api_key or "").strip(): + headers["Authorization"] = f"Bearer {(api_key or '').strip()}" + + # Minimal OpenAI-compatible payload; no input_type or truncate. + def make_payload(batch: List[str]) -> Dict[str, Any]: + payload: Dict[str, Any] = { + "model": resolved_model, + "input": batch, + "encoding_format": encoding_format, + } + if dimensions is not None: + payload["dimensions"] = int(dimensions) + return payload + + if prefix: + prompts = [str(prefix) + p for p in prompts] + + # Build candidate URLs: primary (e.g. .../v1/embeddings) and fallback (.../embeddings) + base = (endpoint_url or "").strip().rstrip("/") + base_no_v1 = base.removesuffix("/v1").rstrip("/") if base.endswith("/v1") else base + candidate_urls = [ + _normalize_embeddings_endpoint(endpoint_url), + f"{base_no_v1}/embeddings" if base_no_v1 != base else None, + ] + candidate_urls = [u for u in candidate_urls if u] + + last_error: Exception | None = None + with httpx.Client(timeout=float(timeout_s)) as client: + for url in candidate_urls: + all_embeddings = [] + try: + for i in range(0, len(prompts), batch_size): + batch = prompts[i : i + batch_size] + resp = client.post(url, headers=headers, json=make_payload(batch)) + resp.raise_for_status() + data = resp.json() + items = data.get("data") if isinstance(data, dict) else None + if not isinstance(items, list): + raise RuntimeError("Unexpected embeddings response (missing 'data' list).") + for j, it in enumerate(items): + if not isinstance(it, dict): + all_embeddings.append([]) + continue + emb = it.get("embedding") + if isinstance(emb, list): + all_embeddings.append([float(x) for x in emb]) + else: + all_embeddings.append([]) + return all_embeddings + except Exception as e: + last_error = e + if hasattr(e, "response") and getattr(e.response, "status_code", None) == 404: + all_embeddings = [] + continue + raise + if last_error is not None: + tried = ", ".join(candidate_urls) + raise RuntimeError( + f"Embeddings request failed (tried: {tried}). " + "Ensure the vLLM embedding server is running (e.g. `vllm serve --runner pooling ...`) " + "and that the base URL is correct (e.g. http://localhost:8000/v1)." + ) from last_error + return all_embeddings + + +__all__ = ["embed_via_vllm_http", "_normalize_embeddings_endpoint", "get_model_id_from_server"] diff --git a/nemo_retriever/src/nemo_retriever/text_embed/vllm_offline.py b/nemo_retriever/src/nemo_retriever/text_embed/vllm_offline.py new file mode 100644 index 000000000..4f3660d4e --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/text_embed/vllm_offline.py @@ -0,0 +1,236 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +vLLM offline batched embedding inference. + +Uses vLLM's Python API (LLM with task="embed" / runner="pooling" and llm.embed()) +to compute embeddings without running a vLLM server. Use this when you want +the same embedding model (e.g. llama-nemotron-embed-1b-v2) with vLLM's batched +inference and no HTTP server. + +Troubleshooting enforce_eager=False (torch.compile): + If you see "failed to map segment from shared object", the cache dir is on a + filesystem that does not allow executing shared libs (e.g. noexec mount). /tmp + and /dev/shm are often noexec on Linux. We auto-pick a dir under /run/user/ + (if present), then ~/.cache/vllm/torch_compile_inductor, then /dev/shm. Set + compile_cache_dir to a path on a non-noexec filesystem if needed, or use + enforce_eager=True. +""" + +from __future__ import annotations + +import logging +import os +from typing import Any, List, Optional + +logger = logging.getLogger(__name__) + + +def _default_compile_cache_dir() -> Optional[str]: + """Return a cache dir for torch inductor/triton that supports loading shared libs (no noexec). + Prefer /run/user/ (often exec-enabled tmpfs), then ~/.cache, then /dev/shm. + Avoid /tmp and /dev/shm when they are mounted noexec (common on Linux).""" + candidates: List[tuple[str, str]] = [] + uid = os.getuid() if hasattr(os, "getuid") else 0 + run_user = f"/run/user/{uid}" + if os.path.isdir(run_user) and os.access(run_user, os.W_OK): + candidates.append((run_user, os.path.join(run_user, "vllm_torch_compile"))) + cache_home = os.environ.get("XDG_CACHE_HOME") or os.path.expanduser("~/.cache") + vllm_cache = os.path.join(cache_home, "vllm", "torch_compile_inductor") + if cache_home: + candidates.append((cache_home, vllm_cache)) + shm = "/dev/shm" + if os.path.isdir(shm) and os.access(shm, os.W_OK): + candidates.append((shm, os.path.join(shm, f"vllm_torch_compile_{uid}"))) + for _parent, path in candidates: + try: + os.makedirs(path, mode=0o700, exist_ok=True) + return path + except OSError: + continue + return None + + +def create_vllm_llm( + model: str, + *, + dimensions: Optional[int] = None, + tensor_parallel_size: int = 1, + dtype: str = "float32", + trust_remote_code: bool = True, + max_model_len: Optional[int] = None, + gpu_memory_utilization: float = 0.45, + enforce_eager: bool = False, + compile_cache_dir: Optional[str] = None, + attention_backend: Optional[str] = None, +) -> Any: + """ + Create and return a vLLM LLM instance for embedding (pooling runner). + Caller can reuse it across many embed batches to avoid repeated model load and CUDA graph capture. + + Note on attention backend: vLLM selects automatically by default. FLASH_ATTN only supports + fp16/bf16; with dtype float32 vLLM will fall back to TRITON_ATTN (or FLEX_ATTENTION). + To use FLASH_ATTN for better throughput, set dtype to "bfloat16" or "float16", and + optionally pass attention_backend="FLASH_ATTN". + """ + try: + from vllm import LLM + except ImportError as e: + raise RuntimeError( + "vLLM offline embedding requires the vllm package. " + "Install with: pip install vllm>=0.11.0 or uv pip install -e '.[vllm]'" + ) from e + + if not enforce_eager: + cache_dir = compile_cache_dir if compile_cache_dir is not None else _default_compile_cache_dir() + if cache_dir: + os.makedirs(cache_dir, mode=0o700, exist_ok=True) + os.environ["TORCHINDUCTOR_CACHE_DIR"] = cache_dir + os.environ["TRITON_CACHE_DIR"] = cache_dir + logger.debug("vLLM offline: using compile cache dir %s", cache_dir) + + pooler_config = None + try: + from vllm.config.pooler import PoolerConfig + + try: + pooler_config = PoolerConfig(seq_pooling_type="MEAN", dimensions=dimensions) + except TypeError: + pooler_config = PoolerConfig(pooling_type="MEAN", dimensions=dimensions) + except Exception: + pooler_config = None + + kwargs: dict = { + "model": model, + "trust_remote_code": trust_remote_code, + "tensor_parallel_size": tensor_parallel_size, + "dtype": dtype, + "runner": "pooling", + "gpu_memory_utilization": gpu_memory_utilization, + "enforce_eager": enforce_eager, + } + if max_model_len is not None: + kwargs["max_model_len"] = max_model_len + if pooler_config is not None: + kwargs["pooler_config"] = pooler_config + if attention_backend is not None: + kwargs["attention_backend"] = attention_backend + + return LLM(**kwargs) + + +def embed_with_vllm_llm( + prompts: List[str], + llm: Any, + *, + batch_size: int = 256, + prefix: Optional[str] = None, +) -> List[List[float]]: + """ + Compute embeddings using an existing vLLM LLM instance (no new model load). + Use this when the caller holds a shared LLM (e.g. one per Ray actor). + """ + if prefix: + prompts = [str(prefix) + p for p in prompts] + if not prompts: + return [] + + all_embeddings: List[List[float]] = [] + for i in range(0, len(prompts), max(1, batch_size)): + batch = prompts[i : i + max(1, batch_size)] + outputs = llm.embed(batch) + for out in outputs: + emb = getattr(getattr(out, "outputs", None), "embedding", None) + if emb is not None: + if hasattr(emb, "tolist"): + all_embeddings.append(emb.tolist()) + elif isinstance(emb, list): + all_embeddings.append([float(x) for x in emb]) + else: + all_embeddings.append(list(emb)) + else: + all_embeddings.append([]) + return all_embeddings + + +def embed_via_vllm_offline( + prompts: List[str], + *, + model: str, + batch_size: int = 256, + prefix: Optional[str] = None, + dimensions: Optional[int] = None, + tensor_parallel_size: int = 1, + dtype: str = "float32", + trust_remote_code: bool = True, + max_model_len: Optional[int] = None, + gpu_memory_utilization: float = 0.45, + enforce_eager: bool = False, + compile_cache_dir: Optional[str] = None, + attention_backend: Optional[str] = None, +) -> List[List[float]]: + """ + Compute embeddings via vLLM's offline Python API (no server). + + Parameters + ---------- + prompts : list of str + Texts to embed. If prefix is set, each prompt is prefixed (e.g. "query: " or "passage: "). + model : str + HuggingFace model id (e.g. nvidia/llama-nemotron-embed-1b-v2) or local path + to a clone that uses vLLM config (e.g. config_vllm.json for this model). + batch_size : int + Max prompts per llm.embed() call (vLLM may batch internally as well). + prefix : str, optional + Prefix to prepend to each prompt (e.g. "passage: ", "query: "). + dimensions : int, optional + Optional embedding dimension (Matryoshka); if supported by model and vLLM. + tensor_parallel_size : int + Number of GPUs for tensor parallelism. + dtype : str + Model dtype (e.g. "float32", "float16"). + trust_remote_code : bool + Passed to LLM when loading the model. + max_model_len : int, optional + Max sequence length; default uses model config. + gpu_memory_utilization : float + Fraction of GPU memory for vLLM (default 0.45). Kept low to avoid OOM; increase + (e.g. 0.6–0.8) for better throughput once stable. + enforce_eager : bool + If False (default), use torch.compile/CUDAGraphs for speed. If True, disable for + stability. When False, set compile_cache_dir (or use auto /dev/shm) to avoid + "failed to map segment from shared object" when /tmp is NFS or noexec. + compile_cache_dir : str, optional + Directory for PyTorch inductor/Triton JIT cache. Must be on a filesystem that + supports loading shared libs (e.g. local disk or /dev/shm). If None and + enforce_eager is False, we set TORCHINDUCTOR_CACHE_DIR and TRITON_CACHE_DIR to + a path under /dev/shm when available, so /tmp is not used. + attention_backend : str, optional + vLLM attention backend (e.g. "FLASH_ATTN", "TRITON_ATTN"). If None, vLLM selects + automatically. FLASH_ATTN only supports fp16/bf16; with dtype float32 vLLM uses + TRITON_ATTN or FLEX_ATTENTION. Use dtype "bfloat16" (or "float16") to enable + FLASH_ATTN for better throughput. + + Returns + ------- + list of list of float + One embedding vector per prompt, in order. + """ + llm = create_vllm_llm( + model, + dimensions=dimensions, + tensor_parallel_size=tensor_parallel_size, + dtype=dtype, + trust_remote_code=trust_remote_code, + max_model_len=max_model_len, + gpu_memory_utilization=gpu_memory_utilization, + enforce_eager=enforce_eager, + compile_cache_dir=compile_cache_dir, + attention_backend=attention_backend, + ) + return embed_with_vllm_llm(prompts, llm, batch_size=batch_size, prefix=prefix) + + +__all__ = ["create_vllm_llm", "embed_via_vllm_offline", "embed_with_vllm_llm"]