From 797e67f8e26082cd4d13670d6efde7463da807bd Mon Sep 17 00:00:00 2001 From: Khalil Buckmire Date: Wed, 13 May 2026 17:26:05 -0400 Subject: [PATCH] feat(rag): add RAG pipeline scaffold Adds the skeleton of the RAG pipeline with stubbed interfaces. Two collections (permits + reference) kept separate. Schema-aware chunking respects REVIEW_SCHEMA boundaries. All-local inference. 12 smoke tests pass; real implementations are TODOs marked in each module. See rag/README.md for the layout and suggested fill-in order. --- rag/README.md | 59 +++++++++++++++ rag/__init__.py | 1 + rag/chunker.py | 71 +++++++++++++++++ rag/embeddings.py | 51 +++++++++++++ rag/generator.py | 33 ++++++++ rag/loaders.py | 61 +++++++++++++++ rag/pipeline.py | 67 +++++++++++++++++ rag/prompts.py | 49 ++++++++++++ rag/retriever.py | 41 ++++++++++ rag/types.py | 82 ++++++++++++++++++++ rag/vectorstore.py | 65 ++++++++++++++++ requirements_for_rag.txt | 19 +++++ tests/__init__.py | 0 tests/test_rag.py | 159 +++++++++++++++++++++++++++++++++++++++ 14 files changed, 758 insertions(+) create mode 100644 rag/README.md create mode 100644 rag/__init__.py create mode 100644 rag/chunker.py create mode 100644 rag/embeddings.py create mode 100644 rag/generator.py create mode 100644 rag/loaders.py create mode 100644 rag/pipeline.py create mode 100644 rag/prompts.py create mode 100644 rag/retriever.py create mode 100644 rag/types.py create mode 100644 rag/vectorstore.py create mode 100644 requirements_for_rag.txt create mode 100644 tests/__init__.py create mode 100644 tests/test_rag.py diff --git a/rag/README.md b/rag/README.md new file mode 100644 index 0000000..75ab6b0 --- /dev/null +++ b/rag/README.md @@ -0,0 +1,59 @@ +# RAG scaffold + +Skeleton of the RAG pipeline for NittCarb AI's Class VI permit review. +Every module has its interface defined and a `# TODO` for the real work. + +## Layout + +``` +rag/ +├── types.py shared dataclasses + Collection enum +├── loaders.py PDF + DOCX loaders +├── chunker.py schema-aware chunking (the important one) +├── embeddings.py local sentence-transformers +├── vectorstore.py Chroma, two collections (permits + reference) +├── retriever.py picks collection + applies filters +├── prompts.py review-specific prompt templates +├── generator.py LLM placeholder +└── pipeline.py load -> chunk -> embed -> store +tests/ +└── test_rag.py 11 smoke tests for the public interfaces +``` + +## Run the tests + +```bash +pytest tests/ +``` + +You should see 11 tests pass. They exercise the interfaces, not the +real implementations (which are stubs that log warnings). + +## Key design decisions + +**Two collections, kept separate.** `Collection.PERMITS` holds chunks +from the seven approved permit applications. `Collection.REFERENCE` +holds chunks from 40 CFR Part 146 Subpart H and EPA guidance. A +retrieval for "what does the regulation say about casing" must never +return an applicant's casing description, and vice versa. + +**Schema-aware chunking.** Chunks respect REVIEW_SCHEMA section +boundaries and carry `section_id` / `subsection_id` in metadata. +This is what makes the MAIP validation chain enforceable later — every +chunk knows which section it came from, so cross-reference queries can +filter by section. + +**Everything runs locally.** Air-gapped deployment is a hard constraint. +No cloud API calls in the RAG path. + +## Filling in the TODOs + +Each module has TODOs marked with `# TODO`. Suggested order: + +1. `loaders.py` — implement `load_pdf` against one of your permit PDFs. +2. `chunker.py` — implement `chunk_document` and `detect_sections`. +3. `embeddings.py` — wire up `SentenceTransformer`. +4. `vectorstore.py` — wire up the Chroma client. +5. `pipeline.py` — should work end-to-end once 1–4 are done. +6. `retriever.py` — already works; tests will start returning real results. +7. `generator.py` — wait for the LLM benchmark decision before filling. diff --git a/rag/__init__.py b/rag/__init__.py new file mode 100644 index 0000000..8d60a44 --- /dev/null +++ b/rag/__init__.py @@ -0,0 +1 @@ +"""NittCarb RAG pipeline.""" diff --git a/rag/chunker.py b/rag/chunker.py new file mode 100644 index 0000000..785bb67 --- /dev/null +++ b/rag/chunker.py @@ -0,0 +1,71 @@ +""" +Schema-aware chunking for Class VI permit documents. + +This is the most important file in the scaffold. + +A blind character-based splitter would cut Section 7.2 (MAIP discussion) +mid-sentence and break the cross-reference logic the review engine needs. +Our chunker respects REVIEW_SCHEMA section boundaries: it detects section +headings, splits the document into section-bounded regions, and chunks +WITHIN regions — never across them. Every chunk carries its section_id +in metadata so the retriever can filter by section. +""" + +import logging +import re + +from rag.types import Chunk, ChunkMetadata, DocumentType + +logger = logging.getLogger(__name__) + + +# Default chunking parameters. Tuned for embedding model context windows +# (most sentence-transformers cap around 512 tokens / ~2000 chars). +CHUNK_SIZE = 1000 +CHUNK_OVERLAP = 150 + + +# Matches section headings like: +# "Section 7.2 Maximum Allowable Injection Pressure" +# "7.2 MAIP" +# "SECTION 7 - INJECTION OPERATIONS" +# Phase 2 will refine against real permit text. +SECTION_HEADING = re.compile( + r"^(?:section\s+)?(\d{1,2})(?:\.(\d{1,2}))?\s+([A-Z][^\n]{3,100})$", + re.IGNORECASE | re.MULTILINE, +) + + +def chunk_document(pages, source_path, document_type, project_name=""): + """Chunk a loaded document into Chunk objects. + + Args: + pages: list of (text, page_number) tuples from a loader. + source_path: where the document came from (for metadata). + document_type: DocumentType enum value. + project_name: for permits only (e.g. "adm_decatur"). + + Returns: + list of Chunk objects ready to embed. + + TODO (Phase 2): + 1. Concatenate pages into one text stream, tracking offsets. + 2. Detect section boundaries with SECTION_HEADING. + 3. Validate boundaries against REVIEW_SCHEMA keys. + 4. Split into section regions; chunk within each region. + 5. Tag every chunk with section_id and subsection_id. + """ + if not pages: + return [] + + logger.warning("chunk_document not yet implemented") + return [] + + +def detect_sections(text): + """Find all section/subsection headings in text. + + Returns a list of (section_id, subsection_id, start_offset, heading_text). + """ + # TODO: implement using SECTION_HEADING, cross-check against REVIEW_SCHEMA + return [] diff --git a/rag/embeddings.py b/rag/embeddings.py new file mode 100644 index 0000000..c21bbbd --- /dev/null +++ b/rag/embeddings.py @@ -0,0 +1,51 @@ +""" +Local embeddings using sentence-transformers. + +Runs locally — no API calls. Class VI permit data is sensitive enough +that we can't send it to a cloud embedding service. + +The default model BAAI/bge-base-en-v1.5 is a strong general retrieval +model. Phase 2 may benchmark alternatives. +""" + +import logging + +logger = logging.getLogger(__name__) + + +DEFAULT_MODEL = "BAAI/bge-base-en-v1.5" + + +class Embeddings: + """Wraps a sentence-transformers model for embedding text. + + The model loads on first use (lazy), so creating one of these is cheap. + """ + + def __init__(self, model_name=DEFAULT_MODEL): + self.model_name = model_name + self._model = None + + def _load(self): + if self._model is not None: + return + # TODO: from sentence_transformers import SentenceTransformer + # self._model = SentenceTransformer(self.model_name) + logger.warning("Embeddings._load not yet implemented") + + def encode(self, texts): + """Embed a list of document texts. Returns one vector per text.""" + self._load() + # TODO: return self._model.encode(texts, normalize_embeddings=True).tolist() + return [[] for _ in texts] + + def encode_query(self, text): + """Embed a single query string. + + BGE models recommend a prefix for queries — kept separate from + encode() so we can't accidentally embed queries like documents. + """ + self._load() + # TODO: prefixed = f"Represent this sentence for searching relevant passages: {text}" + # return self._model.encode(prefixed, normalize_embeddings=True).tolist() + return [] diff --git a/rag/generator.py b/rag/generator.py new file mode 100644 index 0000000..3854b5a --- /dev/null +++ b/rag/generator.py @@ -0,0 +1,33 @@ +""" +LLM generator — placeholder. + +Final choice between Llama 3 and Mistral comes from LLM benchmarking +in the next phase. Until then, this module just defines the interface. +""" + +import logging + +logger = logging.getLogger(__name__) + + +class Generator: + """Generates completions from a local LLM.""" + + def __init__(self, model_path=None, temperature=0.2, max_tokens=512): + self.model_path = model_path + self.temperature = temperature + self.max_tokens = max_tokens + self._llm = None + + def generate(self, prompt): + """Generate a completion for the given prompt.""" + if self._llm is None: + # TODO: from llama_cpp import Llama + # self._llm = Llama(model_path=str(self.model_path), + # n_ctx=4096, verbose=False) + logger.warning("Generator.generate not yet implemented") + return "[generator not yet implemented]" + # TODO: out = self._llm(prompt, max_tokens=self.max_tokens, + # temperature=self.temperature, stop=[""]) + # return out["choices"][0]["text"].strip() + return "" diff --git a/rag/loaders.py b/rag/loaders.py new file mode 100644 index 0000000..f4443be --- /dev/null +++ b/rag/loaders.py @@ -0,0 +1,61 @@ +""" +Document loaders for permit and reference documents. + +Takes a file on disk, returns a list of (text, page_number) tuples. +Phase 2 fills in the actual parsing. +""" + +import logging +from pathlib import Path + +logger = logging.getLogger(__name__) + + +def load_pdf(path): + """Load a PDF and return a list of (page_text, page_number) tuples. + + TODO: implement using pdfplumber. + import pdfplumber + with pdfplumber.open(path) as pdf: + return [(page.extract_text() or "", i + 1) + for i, page in enumerate(pdf.pages)] + """ + path = Path(path) + if not path.exists(): + raise FileNotFoundError(f"PDF not found: {path}") + if path.suffix.lower() != ".pdf": + raise ValueError(f"Not a PDF: {path}") + + logger.warning("load_pdf not yet implemented") + return [] + + +def load_docx(path): + """Load a DOCX and return a list of (paragraph_text, page_number) tuples. + + Note: DOCX doesn't have native page numbers — they're approximated. + + TODO: implement using python-docx. + from docx import Document + doc = Document(path) + # walk paragraphs, track page breaks + """ + path = Path(path) + if not path.exists(): + raise FileNotFoundError(f"DOCX not found: {path}") + if path.suffix.lower() != ".docx": + raise ValueError(f"Not a DOCX: {path}") + + logger.warning("load_docx not yet implemented") + return [] + + +def load_document(path): + """Dispatch to the right loader based on file extension.""" + path = Path(path) + suffix = path.suffix.lower() + if suffix == ".pdf": + return load_pdf(path) + if suffix == ".docx": + return load_docx(path) + raise ValueError(f"Unsupported file type: {suffix}") diff --git a/rag/pipeline.py b/rag/pipeline.py new file mode 100644 index 0000000..f81a923 --- /dev/null +++ b/rag/pipeline.py @@ -0,0 +1,67 @@ +""" +Ingestion pipeline: load -> chunk -> embed -> store. + +Runs offline, once per document. Separate from the online retrieval/ +generation path because the two have different performance profiles. +""" + +import logging +from pathlib import Path + +from rag.chunker import chunk_document +from rag.embeddings import Embeddings +from rag.loaders import load_document +from rag.types import Collection, DocumentType +from rag.vectorstore import VectorStore + +logger = logging.getLogger(__name__) + + +class IngestionPipeline: + """Load -> chunk -> embed -> store, one document at a time.""" + + def __init__(self, embeddings=None, store=None): + self.embeddings = embeddings or Embeddings() + self.store = store or VectorStore() + + def ingest_permit(self, path, project_name): + """Ingest a permit document into the PERMITS collection.""" + return self._ingest( + path, + collection=Collection.PERMITS, + document_type=DocumentType.PERMIT_APPLICATION, + project_name=project_name, + ) + + def ingest_reference(self, path, document_type=DocumentType.CFR_TEXT): + """Ingest a reference document into the REFERENCE collection.""" + return self._ingest( + path, + collection=Collection.REFERENCE, + document_type=document_type, + project_name="", + ) + + def _ingest(self, path, collection, document_type, project_name): + path = Path(path) + logger.info("Ingesting %s into %s", path, collection.value) + + pages = load_document(path) + if not pages: + logger.warning("No pages from %s", path) + return 0 + + chunks = chunk_document( + pages, + source_path=str(path), + document_type=document_type, + project_name=project_name, + ) + if not chunks: + logger.warning("No chunks from %s", path) + return 0 + + vectors = self.embeddings.encode([c.text for c in chunks]) + self.store.add(collection, chunks, vectors) + logger.info("Added %d chunks from %s", len(chunks), path) + return len(chunks) diff --git a/rag/prompts.py b/rag/prompts.py new file mode 100644 index 0000000..3075706 --- /dev/null +++ b/rag/prompts.py @@ -0,0 +1,49 @@ +""" +Prompt templates for Class VI permit review. + +Templates label reference (rules) and permit (precedent) context distinctly +so the model knows which is authoritative. +""" + +SYSTEM_PROMPT = """You are an expert reviewer of EPA Class VI Underground \ +Injection Control permit applications for geologic CO2 sequestration. \ +You evaluate applications against 40 CFR Part 146 Subpart H and precedent \ +from approved applications. + +Rules: +- Cite specific CFR provisions when identifying deficiencies. +- Distinguish what the regulation requires from what approved applicants have done. +- If the provided context is insufficient, say "Insufficient context to determine" \ +rather than guess. +- Quote section IDs (e.g. Section 7.2) when referring to permit content. +""" + + +def format_context(reference_results, permit_results): + """Render retrieval results into a labeled context block.""" + parts = [] + if reference_results: + parts.append("=== REGULATORY REFERENCE (authoritative) ===") + for i, r in enumerate(reference_results, 1): + cite = r.chunk.metadata.cfr_citation or "uncited" + parts.append(f"[REF-{i}] ({cite}) {r.chunk.text}") + if permit_results: + parts.append("\n=== PERMIT PRECEDENT (approved applications) ===") + for i, r in enumerate(permit_results, 1): + project = r.chunk.metadata.project_name or "unknown" + section = r.chunk.metadata.subsection_id or r.chunk.metadata.section_id or "?" + parts.append(f"[PERM-{i}] ({project}, Section {section}) {r.chunk.text}") + return "\n".join(parts) if parts else "No relevant context retrieved." + + +def build_review_prompt(question, reference_results, permit_results): + """Build the full prompt for a review question.""" + context = format_context(reference_results, permit_results) + return f"""{SYSTEM_PROMPT} + +Context: +{context} + +Question: {question} + +Answer:""" diff --git a/rag/retriever.py b/rag/retriever.py new file mode 100644 index 0000000..c900092 --- /dev/null +++ b/rag/retriever.py @@ -0,0 +1,41 @@ +""" +Schema-aware retriever. + +Knows which collection to query for different kinds of review tasks: +- "What does the regulation require?" -> REFERENCE collection +- "How have approved applicants done this?" -> PERMITS collection +- Cross-reference review -> PERMITS with section_id filter + +The schema-awareness comes from chunks carrying section_id in metadata +(set during chunking) which the retriever uses as a where-clause filter. +""" + +from rag.types import Collection + + +class Retriever: + """Coordinates embedding, collection choice, and metadata filtering.""" + + def __init__(self, embeddings, store): + self.embeddings = embeddings + self.store = store + + def retrieve_reference(self, query_text, section_id="", k=5): + """Get authoritative regulatory text for a query.""" + return self._retrieve(Collection.REFERENCE, query_text, section_id, k) + + def retrieve_permits(self, query_text, section_id="", k=5): + """Get precedent text from approved permit applications.""" + return self._retrieve(Collection.PERMITS, query_text, section_id, k) + + def retrieve_both(self, query_text, section_id="", k=5): + """Query both collections; return a dict keyed by collection.""" + return { + Collection.REFERENCE: self.retrieve_reference(query_text, section_id, k), + Collection.PERMITS: self.retrieve_permits(query_text, section_id, k), + } + + def _retrieve(self, collection, query_text, section_id, k): + query_vec = self.embeddings.encode_query(query_text) + where = {"section_id": section_id} if section_id else None + return self.store.query(collection, query_vec, k=k, where=where) diff --git a/rag/types.py b/rag/types.py new file mode 100644 index 0000000..6bd72cf --- /dev/null +++ b/rag/types.py @@ -0,0 +1,82 @@ +""" +Shared data types for the RAG pipeline. + +Every module agrees on what a Chunk is and which collection it goes into. +If you change something here, you change the contract for everything else. +""" + +from dataclasses import dataclass, field +from enum import Enum + + +class Collection(str, Enum): + """The two vector store collections. + + PERMITS — the seven Class VI permit applications (precedent). + Answers "how have approved applicants handled X?" + REFERENCE — 40 CFR Part 146 Subpart H and EPA guidance (rules). + Answers "what does the regulation require for X?" + + These are kept strictly separate. Mixing them would let an applicant's + interpretation be retrieved when the user asked for the rule. + """ + + PERMITS = "permits" + REFERENCE = "reference" + + +class DocumentType(str, Enum): + """What kind of source document a chunk came from.""" + + PERMIT_APPLICATION = "permit_application" + CFR_TEXT = "cfr_text" + EPA_GUIDANCE = "epa_guidance" + + +@dataclass +class ChunkMetadata: + """Metadata attached to every chunk in the vector store.""" + + source_document: str + document_type: DocumentType + project_name: str = "" # e.g. "adm_decatur" for permits, "" for reference + section_id: str = "" # e.g. "section_07" from REVIEW_SCHEMA + subsection_id: str = "" # e.g. "7.2" for MAIP + page_number: int = 0 + chunk_index: int = 0 + cfr_citation: str = "" # e.g. "146.82(a)(1)" for reference chunks + + def to_dict(self): + """Flatten to a plain dict for vector store storage.""" + return { + "source_document": self.source_document, + "document_type": self.document_type.value, + "project_name": self.project_name, + "section_id": self.section_id, + "subsection_id": self.subsection_id, + "page_number": self.page_number, + "chunk_index": self.chunk_index, + "cfr_citation": self.cfr_citation, + } + + +@dataclass +class Chunk: + """A piece of text ready to be embedded and stored.""" + + text: str + metadata: ChunkMetadata + chunk_id: str = "" + + def __post_init__(self): + if not self.text.strip(): + raise ValueError("Chunk text cannot be empty") + + +@dataclass +class RetrievalResult: + """A chunk returned by retrieval, with its similarity score.""" + + chunk: Chunk + score: float + collection: Collection diff --git a/rag/vectorstore.py b/rag/vectorstore.py new file mode 100644 index 0000000..eb91a44 --- /dev/null +++ b/rag/vectorstore.py @@ -0,0 +1,65 @@ +""" +Vector store with two collections: permits and reference. + +Why two collections (not one with a metadata filter)? +- A retrieval for "what does the regulation say about casing" must never + return an applicant's casing description. +- A retrieval for "how have approved applicants handled fracture pressure" + must never return CFR text. + +The retriever picks which collection to query based on the review task. +""" + +import logging + +from rag.types import Collection + +logger = logging.getLogger(__name__) + + +class VectorStore: + """Vector store with separate collections for permits and reference.""" + + def __init__(self, persist_directory="./chroma_data"): + self.persist_directory = persist_directory + self._client = None + self._collections = {} + + def _connect(self): + if self._client is not None: + return + # TODO: import chromadb + # self._client = chromadb.PersistentClient(path=self.persist_directory) + # for c in Collection: + # self._collections[c] = self._client.get_or_create_collection( + # name=c.value, metadata={"hnsw:space": "cosine"}) + logger.warning("VectorStore._connect not yet implemented") + + def add(self, collection, chunks, embeddings): + """Add chunks and their vectors to a collection.""" + if len(chunks) != len(embeddings): + raise ValueError( + f"Got {len(chunks)} chunks but {len(embeddings)} embeddings" + ) + self._connect() + # TODO: self._collections[collection].add( + # ids=[c.chunk_id or str(i) for i, c in enumerate(chunks)], + # documents=[c.text for c in chunks], + # metadatas=[c.metadata.to_dict() for c in chunks], + # embeddings=embeddings, + # ) + + def query(self, collection, query_embedding, k=5, where=None): + """Return the top-k most similar chunks from a collection. + + Args: + collection: Collection.PERMITS or Collection.REFERENCE. + query_embedding: the vector to search with. + k: number of results to return. + where: optional metadata filter, e.g. {"section_id": "section_07"}. + """ + self._connect() + # TODO: results = self._collections[collection].query( + # query_embeddings=[query_embedding], n_results=k, where=where) + # return [RetrievalResult(...) for row in results] + return [] diff --git a/requirements_for_rag.txt b/requirements_for_rag.txt new file mode 100644 index 0000000..d8092ae --- /dev/null +++ b/requirements_for_rag.txt @@ -0,0 +1,19 @@ +# Append these to your existing requirements.txt when you're ready to +# fill in the TODOs in rag/. Pinned to versions known to work together +# as of mid-2026. + +# Document parsing +pdfplumber>=0.10 +python-docx>=1.1 + +# Embeddings (local) +sentence-transformers>=2.5 + +# Vector store +chromadb>=0.5 + +# LLM serving (local) +llama-cpp-python>=0.2 + +# Testing +pytest>=8.0 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_rag.py b/tests/test_rag.py new file mode 100644 index 0000000..ffb071e --- /dev/null +++ b/tests/test_rag.py @@ -0,0 +1,159 @@ +""" +Smoke tests for the RAG scaffold. + +These verify the public interfaces wire together. They DON'T test +actual loading/chunking/embedding/retrieval — that's implementation work. +As real logic gets added, these tests should keep passing. +""" + +from pathlib import Path + +import pytest + +from rag.chunker import chunk_document +from rag.embeddings import Embeddings +from rag.generator import Generator +from rag.loaders import load_document +from rag.pipeline import IngestionPipeline +from rag.prompts import SYSTEM_PROMPT, build_review_prompt, format_context +from rag.retriever import Retriever +from rag.types import ( + Chunk, + ChunkMetadata, + Collection, + DocumentType, + RetrievalResult, +) +from rag.vectorstore import VectorStore + + +# ----- types ----- + +def test_two_collections_exist(): + """The two-collection design is load-bearing.""" + assert {c.value for c in Collection} == {"permits", "reference"} + + +def test_chunk_rejects_empty_text(): + """Empty chunks are a bug, not a valid state.""" + meta = ChunkMetadata( + source_document="x.pdf", + document_type=DocumentType.PERMIT_APPLICATION, + ) + with pytest.raises(ValueError): + Chunk(text=" ", metadata=meta) + + +def test_chunk_metadata_to_dict_uses_primitive_types(): + """to_dict must produce values a vector store can serialize.""" + meta = ChunkMetadata( + source_document="data/permits/adm_decatur/application.pdf", + document_type=DocumentType.PERMIT_APPLICATION, + project_name="adm_decatur", + section_id="section_07", + subsection_id="7.2", + ) + d = meta.to_dict() + assert d["document_type"] == "permit_application" + assert d["subsection_id"] == "7.2" + for value in d.values(): + assert isinstance(value, (str, int, float)) + + +# ----- loaders ----- + +def test_load_document_rejects_unsupported_format(): + with pytest.raises(ValueError): + load_document(Path("notes.txt")) + + +def test_load_pdf_rejects_missing_file(): + with pytest.raises(FileNotFoundError): + load_document(Path("does_not_exist.pdf")) + + +# ----- chunker ----- + +def test_chunker_handles_empty_input(): + """No pages -> no chunks. Not a crash.""" + result = chunk_document( + pages=[], + source_path="x.pdf", + document_type=DocumentType.PERMIT_APPLICATION, + ) + assert result == [] + + +# ----- vectorstore ----- + +def test_store_rejects_length_mismatch(): + """Different number of chunks vs embeddings is a programming bug.""" + store = VectorStore() + meta = ChunkMetadata(source_document="x", document_type=DocumentType.CFR_TEXT) + chunks = [Chunk(text="hello", metadata=meta)] + with pytest.raises(ValueError): + store.add(Collection.REFERENCE, chunks, []) + + +# ----- retriever ----- + +def test_retriever_hits_both_collections(): + """retrieve_both must return results for both collections.""" + retriever = Retriever(Embeddings(), VectorStore()) + results = retriever.retrieve_both("any query") + assert set(results.keys()) == {Collection.PERMITS, Collection.REFERENCE} + + +# ----- prompts ----- + +def test_prompt_labels_reference_and_permit_distinctly(): + """The two-collection distinction shows up in the prompt.""" + ref_meta = ChunkMetadata( + source_document="146.txt", + document_type=DocumentType.CFR_TEXT, + cfr_citation="146.82(a)", + ) + perm_meta = ChunkMetadata( + source_document="adm.pdf", + document_type=DocumentType.PERMIT_APPLICATION, + project_name="adm_decatur", + subsection_id="7.2", + ) + ref = [RetrievalResult( + chunk=Chunk(text="The owner shall...", metadata=ref_meta), + score=0.9, + collection=Collection.REFERENCE, + )] + perm = [RetrievalResult( + chunk=Chunk(text="ADM proposes 5300 psi...", metadata=perm_meta), + score=0.85, + collection=Collection.PERMITS, + )] + context = format_context(ref, perm) + assert "REGULATORY REFERENCE" in context + assert "PERMIT PRECEDENT" in context + assert "146.82(a)" in context + assert "adm_decatur" in context + + +def test_build_review_prompt_includes_system_and_question(): + prompt = build_review_prompt("What is the MAIP?", [], []) + assert SYSTEM_PROMPT.strip() in prompt + assert "What is the MAIP?" in prompt + + +# ----- generator ----- + +def test_generator_construction_is_cheap(): + """Construction must not load the model — that happens on first generate().""" + g = Generator() + assert g._llm is None + + +# ----- pipeline ----- + +def test_pipeline_wires_together(): + """The full pipeline can be constructed without errors.""" + pipeline = IngestionPipeline() + assert pipeline.embeddings is not None + assert pipeline.store is not None