Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 46 additions & 28 deletions tools/harness/src/nv_ingest_harness/cases/e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@
from nv_ingest_client.util.document_analysis import analyze_document_chunks
from nv_ingest_client.util.milvus import nvingest_retrieval

from nv_ingest_harness.utils.interact import embed_info, kv_event_log, segment_results
from nv_ingest_harness.utils.interact import (
embed_info,
get_embed_task_endpoint,
get_embedding_api_base,
is_embedding_endpoint_reachable,
kv_event_log,
segment_results,
)
from nv_ingest_harness.utils.milvus import milvus_chunks
from nv_ingest_harness.utils.pdf import pdf_page_count
from nv_ingest_harness.utils.vdb import get_lancedb_path
Expand Down Expand Up @@ -78,7 +85,8 @@ def main(config=None, log_path: str = "test_results") -> int:
split_chunk_size = config.split_chunk_size
split_chunk_overlap = config.split_chunk_overlap

model_name, dense_dim = embed_info()
model_name, dense_dim = embed_info(config.embedding_model)
embedding_endpoint = get_embedding_api_base(hostname)

# Log configuration for transparency
print("=== Test Configuration ===")
Expand Down Expand Up @@ -181,7 +189,7 @@ def main(config=None, log_path: str = "test_results") -> int:
)

# Embed (must come before storage per pipeline ordering)
ingestor = ingestor.embed(model_name=model_name)
ingestor = ingestor.embed(model_name=model_name, endpoint_url=get_embed_task_endpoint())

# Store images to disk (server-side image storage) - optional
# Note: Supports both MinIO (s3://) and local disk (file://) via storage_uri
Expand Down Expand Up @@ -275,32 +283,42 @@ def main(config=None, log_path: str = "test_results") -> int:
"How many dollars does a power drill cost?",
]
querying_start = time.time()
if vdb_backend == "lancedb":
try:
from nv_ingest_client.util.vdb.lancedb import LanceDB
except ImportError as exc:
print(f"Warning: LanceDB retrieval not available ({exc}). Skipping retrieval sanity check.")
else:
lancedb_client = LanceDB(uri=lancedb_path, table_name=collection_name, hybrid=hybrid)
_ = lancedb_client.retrieval(
queries,
hybrid=hybrid,
embedding_endpoint=f"http://{hostname}:8012/v1",
model_name=model_name,
top_k=5,
)
if not is_embedding_endpoint_reachable(hostname):
print("Warning: Embedding endpoint is not reachable from host; skipping retrieval sanity check.")
retrieval_time = 0.0
else:
_ = nvingest_retrieval(
queries,
collection_name,
hybrid=sparse,
embedding_endpoint=f"http://{hostname}:8012/v1",
model_name=model_name,
top_k=5,
gpu_search=gpu_search,
nv_ranker=False,
)
retrieval_time = time.time() - querying_start
if vdb_backend == "lancedb":
try:
from nv_ingest_client.util.vdb.lancedb import LanceDB
except ImportError as exc:
print(f"Warning: LanceDB retrieval not available ({exc}). Skipping retrieval sanity check.")
else:
lancedb_client = LanceDB(uri=lancedb_path, table_name=collection_name, hybrid=hybrid)
try:
_ = lancedb_client.retrieval(
queries,
hybrid=hybrid,
embedding_endpoint=embedding_endpoint,
model_name=model_name,
top_k=5,
)
except Exception as exc:
print(f"Warning: LanceDB retrieval sanity check failed; continuing test run ({exc})")
else:
try:
_ = nvingest_retrieval(
queries,
collection_name,
hybrid=sparse,
embedding_endpoint=embedding_endpoint,
model_name=model_name,
top_k=5,
gpu_search=gpu_search,
nv_ranker=False,
)
except Exception as exc:
print(f"Warning: Milvus retrieval sanity check failed; continuing test run ({exc})")
retrieval_time = time.time() - querying_start
kv_event_log("retrieval_time_s", retrieval_time, log_path)

# Summarize - Build comprehensive results dict
Expand Down
37 changes: 24 additions & 13 deletions tools/harness/src/nv_ingest_harness/cases/e2e_with_llm_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from nv_ingest_harness.utils.cases import get_repo_root
from nv_ingest_harness.utils.interact import (
embed_info,
get_embed_task_endpoint,
get_embedding_api_base,
is_embedding_endpoint_reachable,
segment_results,
kv_event_log,
) # noqa: E402
Expand Down Expand Up @@ -84,7 +87,8 @@ def main(config=None, log_path: str = "test_results") -> int:
print(f"Path to User-Defined Function: {str(udf_path)}")
llm_model = config.llm_summarization_model

model_name, dense_dim = embed_info()
model_name, dense_dim = embed_info(config.embedding_model)
embedding_endpoint = get_embedding_api_base(hostname)

# Log configuration for transparency
print("=== Configuration ===")
Expand Down Expand Up @@ -134,7 +138,7 @@ def main(config=None, log_path: str = "test_results") -> int:
# Embed and upload (core pipeline)
print("Uploading to collection:", collection_name)
ingestor = (
ingestor.embed(model_name=model_name)
ingestor.embed(model_name=model_name, endpoint_url=get_embed_task_endpoint())
.vdb_upload(
collection_name=collection_name,
dense_dim=dense_dim,
Expand Down Expand Up @@ -190,17 +194,24 @@ def main(config=None, log_path: str = "test_results") -> int:
"How many dollars does a power drill cost?",
]
querying_start = time.time()
_ = nvingest_retrieval(
queries,
collection_name,
hybrid=sparse,
embedding_endpoint=f"http://{hostname}:8012/v1",
embedding_model_name=model_name,
model_name=model_name,
top_k=5,
gpu_search=gpu_search,
)
kv_event_log("retrieval_time_s", time.time() - querying_start, log_path)
if not is_embedding_endpoint_reachable(hostname):
print("Warning: Embedding endpoint is not reachable from host; skipping retrieval sanity check.")
kv_event_log("retrieval_time_s", 0.0, log_path)
else:
try:
_ = nvingest_retrieval(
queries,
collection_name,
hybrid=sparse,
embedding_endpoint=embedding_endpoint,
embedding_model_name=model_name,
model_name=model_name,
top_k=5,
gpu_search=gpu_search,
)
except Exception as exc:
print(f"Warning: retrieval sanity check failed; continuing test run ({exc})")
kv_event_log("retrieval_time_s", time.time() - querying_start, log_path)

# Summarize
dataset_name = os.path.basename(data_dir.rstrip("/")) if data_dir else "unknown"
Expand Down
82 changes: 69 additions & 13 deletions tools/harness/src/nv_ingest_harness/utils/interact.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import os
import shutil
from urllib.parse import urlparse
import time
import zipfile
import urllib.request
Expand All @@ -20,7 +21,67 @@ def run_cmd(cmd: list[str], cwd: Path | None = None) -> int:
return subprocess.call(cmd, cwd=cwd)


MODEL_DIMENSIONS = {
"nvidia/llama-3.2-nemoretriever-1b-vlm-embed-v1": 2048,
"nvidia/llama-3.2-nv-embedqa-1b-v2": 2048,
"nvidia/llama-3.2-nemoretriever-300m-embed-v1": 2048,
"nvidia/nv-embedqa-e5-v5": 1024,
}
DEFAULT_MODEL = "nvidia/llama-3.2-nv-embedqa-1b-v2"
DEFAULT_DIMENSION = 2048


def _ensure_v1_base(url: str) -> str:
base = url.rstrip("/")
parsed = urlparse(base)
if parsed.path.endswith("/v1"):
return base
if parsed.path in ("", "/"):
return f"{base}/v1"
return base


def get_embedding_api_base(hostname: str = "localhost") -> str:
explicit = os.getenv("HARNESS_EMBEDDING_ENDPOINT")
if explicit:
return _ensure_v1_base(explicit)

env_endpoint = os.getenv("EMBEDDING_NIM_ENDPOINT")
if env_endpoint:
parsed = urlparse(env_endpoint)
endpoint_host = (parsed.hostname or "").lower()
host_mode = hostname in {"localhost", "127.0.0.1"}
if not (host_mode and endpoint_host in {"embedding"}):
return _ensure_v1_base(env_endpoint)

return f"http://{hostname}:8012/v1"


def get_embedding_models_url(hostname: str = "localhost") -> str:
return f"{get_embedding_api_base(hostname).rstrip('/')}/models"


def get_embedding_health_url(hostname: str = "localhost") -> str:
return f"{get_embedding_api_base(hostname).rstrip('/')}/health/ready"


def is_embedding_endpoint_reachable(hostname: str = "localhost", timeout_s: float = 1.5) -> bool:
try:
with urllib.request.urlopen(get_embedding_health_url(hostname), timeout=timeout_s) as response:
return response.status == 200
except Exception:
return False


def get_embed_task_endpoint() -> str:
env_endpoint = os.getenv("EMBEDDING_NIM_ENDPOINT")
if env_endpoint and env_endpoint.strip():
return _ensure_v1_base(env_endpoint.strip())
return "http://embedding:8000/v1"


def embed_info(
preferred_model: str | None = None,
max_retries: int = 5,
initial_backoff: float = 1.0,
backoff_multiplier: float = 2.0,
Expand All @@ -41,19 +102,14 @@ def embed_info(
tuple: A tuple containing (model_name: str, embedding_dimension: int).
Returns a default model if the embedding service is not available after retries.
"""
# Model name to embedding dimension mapping
MODEL_DIMENSIONS = {
"nvidia/llama-3.2-nemoretriever-1b-vlm-embed-v1": 2048,
"nvidia/llama-3.2-nv-embedqa-1b-v2": 2048,
"nvidia/llama-3.2-nemoretriever-300m-embed-v1": 2048,
"nvidia/nv-embedqa-e5-v5": 1024,
}

# Default model
DEFAULT_MODEL = "nvidia/nv-embedqa-e5-v5"
DEFAULT_DIMENSION = 1024

url = "http://localhost:8012/v1/models"
if preferred_model and preferred_model.lower() != "auto":
return preferred_model, MODEL_DIMENSIONS.get(preferred_model, DEFAULT_DIMENSION)

env_model = os.getenv("EMBEDDING_NIM_MODEL_NAME")
if env_model:
return env_model, MODEL_DIMENSIONS.get(env_model, DEFAULT_DIMENSION)

url = get_embedding_models_url("localhost")

# Try to fetch model info from embedding service API with retry/backoff
for attempt in range(max_retries):
Expand Down
9 changes: 5 additions & 4 deletions tools/harness/src/nv_ingest_harness/utils/recall.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from nv_ingest_client.util.milvus import nvingest_retrieval

from nv_ingest_harness.utils.cases import get_repo_root
from nv_ingest_harness.utils.interact import get_embedding_api_base


def _get_retrieval_func(
Expand Down Expand Up @@ -123,7 +124,7 @@ def get_recall_scores(
if vdb_backend == "lancedb":
batch_answers = retrieval_func(
batch_queries,
embedding_endpoint=f"http://{hostname}:8012/v1",
embedding_endpoint=get_embedding_api_base(hostname),
model_name=model_name,
top_k=top_k,
nv_ranker=nv_ranker,
Expand All @@ -135,7 +136,7 @@ def get_recall_scores(
batch_queries,
collection_name,
hybrid=sparse,
embedding_endpoint=f"http://{hostname}:8012/v1",
embedding_endpoint=get_embedding_api_base(hostname),
model_name=model_name,
top_k=top_k,
gpu_search=gpu_search,
Expand Down Expand Up @@ -242,7 +243,7 @@ def get_recall_scores_pdf_only(
if vdb_backend == "lancedb":
batch_answers = retrieval_func(
batch_queries,
embedding_endpoint=f"http://{hostname}:8012/v1",
embedding_endpoint=get_embedding_api_base(hostname),
model_name=model_name,
top_k=top_k,
nv_ranker=nv_ranker,
Expand All @@ -254,7 +255,7 @@ def get_recall_scores_pdf_only(
batch_queries,
collection_name,
hybrid=sparse,
embedding_endpoint=f"http://{hostname}:8012/v1",
embedding_endpoint=get_embedding_api_base(hostname),
model_name=model_name,
top_k=top_k,
gpu_search=gpu_search,
Expand Down
Loading