Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions rag/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# RAG scaffold

Skeleton of the RAG pipeline for NittCarb AI's Class VI permit review.
Every module has its interface defined and a `# TODO` for the real work.

## Layout

```
rag/
├── types.py shared dataclasses + Collection enum
├── loaders.py PDF + DOCX loaders
├── chunker.py schema-aware chunking (the important one)
├── embeddings.py local sentence-transformers
├── vectorstore.py Chroma, two collections (permits + reference)
├── retriever.py picks collection + applies filters
├── prompts.py review-specific prompt templates
├── generator.py LLM placeholder
└── pipeline.py load -> chunk -> embed -> store
tests/
└── test_rag.py 11 smoke tests for the public interfaces
```

## Run the tests

```bash
pytest tests/
```

You should see 11 tests pass. They exercise the interfaces, not the
real implementations (which are stubs that log warnings).

## Key design decisions

**Two collections, kept separate.** `Collection.PERMITS` holds chunks
from the seven approved permit applications. `Collection.REFERENCE`
holds chunks from 40 CFR Part 146 Subpart H and EPA guidance. A
retrieval for "what does the regulation say about casing" must never
return an applicant's casing description, and vice versa.

**Schema-aware chunking.** Chunks respect REVIEW_SCHEMA section
boundaries and carry `section_id` / `subsection_id` in metadata.
This is what makes the MAIP validation chain enforceable later — every
chunk knows which section it came from, so cross-reference queries can
filter by section.

**Everything runs locally.** Air-gapped deployment is a hard constraint.
No cloud API calls in the RAG path.

## Filling in the TODOs

Each module has TODOs marked with `# TODO`. Suggested order:

1. `loaders.py` — implement `load_pdf` against one of your permit PDFs.
2. `chunker.py` — implement `chunk_document` and `detect_sections`.
3. `embeddings.py` — wire up `SentenceTransformer`.
4. `vectorstore.py` — wire up the Chroma client.
5. `pipeline.py` — should work end-to-end once 1–4 are done.
6. `retriever.py` — already works; tests will start returning real results.
7. `generator.py` — wait for the LLM benchmark decision before filling.
1 change: 1 addition & 0 deletions rag/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""NittCarb RAG pipeline."""
71 changes: 71 additions & 0 deletions rag/chunker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
Schema-aware chunking for Class VI permit documents.

This is the most important file in the scaffold.

A blind character-based splitter would cut Section 7.2 (MAIP discussion)
mid-sentence and break the cross-reference logic the review engine needs.
Our chunker respects REVIEW_SCHEMA section boundaries: it detects section
headings, splits the document into section-bounded regions, and chunks
WITHIN regions — never across them. Every chunk carries its section_id
in metadata so the retriever can filter by section.
"""

import logging
import re

from rag.types import Chunk, ChunkMetadata, DocumentType

logger = logging.getLogger(__name__)


# Default chunking parameters. Tuned for embedding model context windows
# (most sentence-transformers cap around 512 tokens / ~2000 chars).
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 150


# Matches section headings like:
# "Section 7.2 Maximum Allowable Injection Pressure"
# "7.2 MAIP"
# "SECTION 7 - INJECTION OPERATIONS"
# Phase 2 will refine against real permit text.
SECTION_HEADING = re.compile(
r"^(?:section\s+)?(\d{1,2})(?:\.(\d{1,2}))?\s+([A-Z][^\n]{3,100})$",
re.IGNORECASE | re.MULTILINE,
)


def chunk_document(pages, source_path, document_type, project_name=""):
"""Chunk a loaded document into Chunk objects.

Args:
pages: list of (text, page_number) tuples from a loader.
source_path: where the document came from (for metadata).
document_type: DocumentType enum value.
project_name: for permits only (e.g. "adm_decatur").

Returns:
list of Chunk objects ready to embed.

TODO (Phase 2):
1. Concatenate pages into one text stream, tracking offsets.
2. Detect section boundaries with SECTION_HEADING.
3. Validate boundaries against REVIEW_SCHEMA keys.
4. Split into section regions; chunk within each region.
5. Tag every chunk with section_id and subsection_id.
"""
if not pages:
return []

logger.warning("chunk_document not yet implemented")
return []


def detect_sections(text):
"""Find all section/subsection headings in text.

Returns a list of (section_id, subsection_id, start_offset, heading_text).
"""
# TODO: implement using SECTION_HEADING, cross-check against REVIEW_SCHEMA
return []
51 changes: 51 additions & 0 deletions rag/embeddings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Local embeddings using sentence-transformers.

Runs locally — no API calls. Class VI permit data is sensitive enough
that we can't send it to a cloud embedding service.

The default model BAAI/bge-base-en-v1.5 is a strong general retrieval
model. Phase 2 may benchmark alternatives.
"""

import logging

logger = logging.getLogger(__name__)


DEFAULT_MODEL = "BAAI/bge-base-en-v1.5"


class Embeddings:
"""Wraps a sentence-transformers model for embedding text.

The model loads on first use (lazy), so creating one of these is cheap.
"""

def __init__(self, model_name=DEFAULT_MODEL):
self.model_name = model_name
self._model = None

def _load(self):
if self._model is not None:
return
# TODO: from sentence_transformers import SentenceTransformer
# self._model = SentenceTransformer(self.model_name)
logger.warning("Embeddings._load not yet implemented")

def encode(self, texts):
"""Embed a list of document texts. Returns one vector per text."""
self._load()
# TODO: return self._model.encode(texts, normalize_embeddings=True).tolist()
return [[] for _ in texts]

def encode_query(self, text):
"""Embed a single query string.

BGE models recommend a prefix for queries — kept separate from
encode() so we can't accidentally embed queries like documents.
"""
self._load()
# TODO: prefixed = f"Represent this sentence for searching relevant passages: {text}"
# return self._model.encode(prefixed, normalize_embeddings=True).tolist()
return []
33 changes: 33 additions & 0 deletions rag/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""
LLM generator — placeholder.

Final choice between Llama 3 and Mistral comes from LLM benchmarking
in the next phase. Until then, this module just defines the interface.
"""

import logging

logger = logging.getLogger(__name__)


class Generator:
"""Generates completions from a local LLM."""

def __init__(self, model_path=None, temperature=0.2, max_tokens=512):
self.model_path = model_path
self.temperature = temperature
self.max_tokens = max_tokens
self._llm = None

def generate(self, prompt):
"""Generate a completion for the given prompt."""
if self._llm is None:
# TODO: from llama_cpp import Llama
# self._llm = Llama(model_path=str(self.model_path),
# n_ctx=4096, verbose=False)
logger.warning("Generator.generate not yet implemented")
return "[generator not yet implemented]"
# TODO: out = self._llm(prompt, max_tokens=self.max_tokens,
# temperature=self.temperature, stop=["</s>"])
# return out["choices"][0]["text"].strip()
return ""
61 changes: 61 additions & 0 deletions rag/loaders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Document loaders for permit and reference documents.

Takes a file on disk, returns a list of (text, page_number) tuples.
Phase 2 fills in the actual parsing.
"""

import logging
from pathlib import Path

logger = logging.getLogger(__name__)


def load_pdf(path):
"""Load a PDF and return a list of (page_text, page_number) tuples.

TODO: implement using pdfplumber.
import pdfplumber
with pdfplumber.open(path) as pdf:
return [(page.extract_text() or "", i + 1)
for i, page in enumerate(pdf.pages)]
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"PDF not found: {path}")
if path.suffix.lower() != ".pdf":
raise ValueError(f"Not a PDF: {path}")

logger.warning("load_pdf not yet implemented")
return []


def load_docx(path):
"""Load a DOCX and return a list of (paragraph_text, page_number) tuples.

Note: DOCX doesn't have native page numbers — they're approximated.

TODO: implement using python-docx.
from docx import Document
doc = Document(path)
# walk paragraphs, track page breaks
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"DOCX not found: {path}")
if path.suffix.lower() != ".docx":
raise ValueError(f"Not a DOCX: {path}")

logger.warning("load_docx not yet implemented")
return []


def load_document(path):
"""Dispatch to the right loader based on file extension."""
path = Path(path)
suffix = path.suffix.lower()
if suffix == ".pdf":
return load_pdf(path)
if suffix == ".docx":
return load_docx(path)
raise ValueError(f"Unsupported file type: {suffix}")
67 changes: 67 additions & 0 deletions rag/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
Ingestion pipeline: load -> chunk -> embed -> store.

Runs offline, once per document. Separate from the online retrieval/
generation path because the two have different performance profiles.
"""

import logging
from pathlib import Path

from rag.chunker import chunk_document
from rag.embeddings import Embeddings
from rag.loaders import load_document
from rag.types import Collection, DocumentType
from rag.vectorstore import VectorStore

logger = logging.getLogger(__name__)


class IngestionPipeline:
"""Load -> chunk -> embed -> store, one document at a time."""

def __init__(self, embeddings=None, store=None):
self.embeddings = embeddings or Embeddings()
self.store = store or VectorStore()

def ingest_permit(self, path, project_name):
"""Ingest a permit document into the PERMITS collection."""
return self._ingest(
path,
collection=Collection.PERMITS,
document_type=DocumentType.PERMIT_APPLICATION,
project_name=project_name,
)

def ingest_reference(self, path, document_type=DocumentType.CFR_TEXT):
"""Ingest a reference document into the REFERENCE collection."""
return self._ingest(
path,
collection=Collection.REFERENCE,
document_type=document_type,
project_name="",
)

def _ingest(self, path, collection, document_type, project_name):
path = Path(path)
logger.info("Ingesting %s into %s", path, collection.value)

pages = load_document(path)
if not pages:
logger.warning("No pages from %s", path)
return 0

chunks = chunk_document(
pages,
source_path=str(path),
document_type=document_type,
project_name=project_name,
)
if not chunks:
logger.warning("No chunks from %s", path)
return 0

vectors = self.embeddings.encode([c.text for c in chunks])
self.store.add(collection, chunks, vectors)
logger.info("Added %d chunks from %s", len(chunks), path)
return len(chunks)
49 changes: 49 additions & 0 deletions rag/prompts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
Prompt templates for Class VI permit review.

Templates label reference (rules) and permit (precedent) context distinctly
so the model knows which is authoritative.
"""

SYSTEM_PROMPT = """You are an expert reviewer of EPA Class VI Underground \
Injection Control permit applications for geologic CO2 sequestration. \
You evaluate applications against 40 CFR Part 146 Subpart H and precedent \
from approved applications.

Rules:
- Cite specific CFR provisions when identifying deficiencies.
- Distinguish what the regulation requires from what approved applicants have done.
- If the provided context is insufficient, say "Insufficient context to determine" \
rather than guess.
- Quote section IDs (e.g. Section 7.2) when referring to permit content.
"""


def format_context(reference_results, permit_results):
"""Render retrieval results into a labeled context block."""
parts = []
if reference_results:
parts.append("=== REGULATORY REFERENCE (authoritative) ===")
for i, r in enumerate(reference_results, 1):
cite = r.chunk.metadata.cfr_citation or "uncited"
parts.append(f"[REF-{i}] ({cite}) {r.chunk.text}")
if permit_results:
parts.append("\n=== PERMIT PRECEDENT (approved applications) ===")
for i, r in enumerate(permit_results, 1):
project = r.chunk.metadata.project_name or "unknown"
section = r.chunk.metadata.subsection_id or r.chunk.metadata.section_id or "?"
parts.append(f"[PERM-{i}] ({project}, Section {section}) {r.chunk.text}")
return "\n".join(parts) if parts else "No relevant context retrieved."


def build_review_prompt(question, reference_results, permit_results):
"""Build the full prompt for a review question."""
context = format_context(reference_results, permit_results)
return f"""{SYSTEM_PROMPT}

Context:
{context}

Question: {question}

Answer:"""
Loading