Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Dozer samples

## LLM vector LangChain sample

The `llm-vector-langchain` sample shows how to expose customer, transaction,
and credit-card product CSV data through Dozer REST endpoints, then use
LangChain and Chroma to answer personalized recommendation questions.
64 changes: 64 additions & 0 deletions samples/llm-vector-langchain/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Dozer + LangChain customer profile sample

This sample turns Dozer REST endpoints into retrieval context for a credit-card
recommendation assistant. It follows the use case from the Dozer LLM article:
combine profile, transaction, and product data, index it in a vector database,
then answer with personalized product recommendations.

## What is included

- Three small CSV datasets under `data/`.
- `dozer-config.yaml`, which exposes the CSV files through Dozer LocalStorage
endpoints.
- A LangChain application that reads the Dozer REST API, writes documents to
Chroma, and answers a recommendation question.
- A small offline test for the recommendation/context-building logic.

## Run the sample

From this directory:

```bash
dozer --config-path dozer-config.yaml
```

In another terminal:

```bash
python -m venv .venv
. .venv/bin/activate
pip install -r requirements.txt
python app.py --customer-id cust_001
```

The application defaults to `http://localhost:8080`. Set `DOZER_BASE_URL` if
Dozer is running elsewhere.

```bash
DOZER_BASE_URL=http://localhost:8080 python app.py --customer-id cust_003
```

If `OPENAI_API_KEY` is set, the sample uses OpenAI through LangChain. Without an
API key it uses a deterministic local response, which keeps the sample runnable
for local development and CI.

## Dozer endpoints

The config exposes these endpoints:

- `/customer_profiles`: customer segment, income band, and declared goals.
- `/transactions`: transaction history grouped by customer.
- `/credit_card_products`: available card products and eligibility metadata.

The Python app queries each endpoint, builds one customer profile document plus
one product document per card, and stores them in Chroma. The retriever is then
used to choose the best product based on both customer fit and recent spend.

## Offline test

The core recommendation helpers do not require network access or LangChain, so
they can be checked with:

```bash
python -m unittest discover -s tests
```
168 changes: 168 additions & 0 deletions samples/llm-vector-langchain/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
from __future__ import annotations

import argparse
import hashlib
import os
from dataclasses import dataclass
from typing import Any

from dotenv import load_dotenv
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter

from dozer_client import DozerClient
from recommender import build_customer_context, build_product_context, rank_products


DEFAULT_BASE_URL = "http://localhost:8080"


@dataclass(frozen=True)
class RetrievalBundle:
documents: list[Document]
customer_context: str
ranked_products: list[dict[str, Any]]


class HashEmbedding:
"""Small deterministic embedding used when no OpenAI key is configured."""

def __init__(self, dimensions: int = 64) -> None:
self.dimensions = dimensions

def embed_documents(self, texts: list[str]) -> list[list[float]]:
return [self._embed(text) for text in texts]

def embed_query(self, text: str) -> list[float]:
return self._embed(text)

def _embed(self, text: str) -> list[float]:
vector = [0.0] * self.dimensions
for token in text.lower().split():
digest = hashlib.sha256(token.encode("utf-8")).digest()
index = int.from_bytes(digest[:4], byteorder="big") % self.dimensions
vector[index] += 1.0
magnitude = sum(value * value for value in vector) ** 0.5 or 1.0
return [value / magnitude for value in vector]


def load_bundle(client: DozerClient, customer_id: str) -> RetrievalBundle:
customers = client.fetch_endpoint("customer_profiles")
transactions = client.fetch_endpoint("transactions")
products = client.fetch_endpoint("credit_card_products")

customer = next(
(row for row in customers if row.get("customer_id") == customer_id),
None,
)
if customer is None:
raise ValueError(f"Customer {customer_id!r} was not returned by Dozer")

customer_transactions = [
row for row in transactions if row.get("customer_id") == customer_id
]
ranked_products = rank_products(customer, customer_transactions, products)

customer_context = build_customer_context(customer, customer_transactions)
product_contexts = [build_product_context(product) for product in products]
documents = [
Document(
page_content=customer_context,
metadata={"kind": "customer", "customer_id": customer_id},
)
]
documents.extend(
Document(
page_content=content,
metadata={"kind": "product", "product_id": product["product_id"]},
)
for content, product in zip(product_contexts, products)
)

return RetrievalBundle(
documents=documents,
customer_context=customer_context,
ranked_products=ranked_products,
)


def build_vector_store(documents: list[Document]) -> Chroma:
splitter = RecursiveCharacterTextSplitter(chunk_size=700, chunk_overlap=80)
chunks = splitter.split_documents(documents)
embeddings = (
OpenAIEmbeddings()
if os.getenv("OPENAI_API_KEY")
else HashEmbedding()
)
return Chroma.from_documents(chunks, embedding=embeddings)


def answer(bundle: RetrievalBundle, question: str) -> str:
vector_store = build_vector_store(bundle.documents)
retrieved = vector_store.as_retriever(search_kwargs={"k": 4}).invoke(question)
retrieved_context = "\n\n".join(doc.page_content for doc in retrieved)
ranked_context = "\n".join(
f"- {product['name']}: score={product['score']}, reason={product['reason']}"
for product in bundle.ranked_products[:3]
)

if not os.getenv("OPENAI_API_KEY"):
best = bundle.ranked_products[0]
return (
f"Recommended card: {best['name']}.\n"
f"Reason: {best['reason']}.\n\n"
f"Retrieved context:\n{retrieved_context}"
)

prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You recommend credit-card products using only the supplied "
"Dozer context. Mention the card, why it fits, and one caveat.",
),
(
"human",
"Customer context:\n{customer_context}\n\n"
"Ranked products:\n{ranked_context}\n\n"
"Retrieved context:\n{retrieved_context}\n\n"
"Question: {question}",
),
]
)
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
message = (prompt | model).invoke(
{
"customer_context": bundle.customer_context,
"ranked_context": ranked_context,
"retrieved_context": retrieved_context,
"question": question,
}
)
return str(message.content)


def main() -> None:
load_dotenv()
parser = argparse.ArgumentParser()
parser.add_argument("--customer-id", default="cust_001")
parser.add_argument(
"--question",
default="Which credit card should this customer be offered?",
)
parser.add_argument(
"--base-url",
default=os.getenv("DOZER_BASE_URL", DEFAULT_BASE_URL),
)
args = parser.parse_args()

client = DozerClient(args.base_url)
bundle = load_bundle(client, args.customer_id)
print(answer(bundle, args.question))


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
product_id,name,annual_fee_usd,rewards_focus,intro_offer,eligibility,caveat
card_001,Aurora Travel Signature,395,"travel dining lounge","75000 bonus points after qualifying spend","premium income and excellent credit","High annual fee only works for frequent travelers"
card_002,Everyday Cashback Plus,0,"groceries streaming cashback","$200 statement credit after qualifying spend","starter or mass market income","Lower earn rate on travel"
card_003,Family Balance Saver,95,"groceries utilities balance transfer","0 percent intro APR for 15 months","mass affluent income and good credit","Balance transfer fee applies"
card_004,Nomad Rewards Flex,150,"travel dining subscriptions","50000 points after qualifying spend","premium income and good credit","Subscription credits require activation"
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
customer_id,age,income_band,segment,goals,risk_tolerance
cust_001,34,premium,frequent_traveler,"travel rewards, airport lounge access, lower foreign transaction costs",medium
cust_002,27,starter,young_professional,"cashback on groceries, no annual fee, credit building",low
cust_003,42,mass_affluent,family_planner,"groceries, school expenses, balance transfer",low
cust_004,31,premium,digital_nomad,"travel rewards, dining perks, subscription credits",medium
13 changes: 13 additions & 0 deletions samples/llm-vector-langchain/data/transactions/transactions.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
transaction_id,customer_id,merchant_category,amount_usd,txn_date
txn_001,cust_001,travel,820.50,2026-01-12
txn_002,cust_001,dining,188.10,2026-01-15
txn_003,cust_001,travel,1320.00,2026-02-03
txn_004,cust_002,groceries,122.23,2026-01-17
txn_005,cust_002,streaming,18.99,2026-01-19
txn_006,cust_002,groceries,98.11,2026-02-02
txn_007,cust_003,groceries,240.42,2026-01-07
txn_008,cust_003,utilities,210.00,2026-01-10
txn_009,cust_003,education,510.00,2026-02-05
txn_010,cust_004,dining,305.45,2026-01-21
txn_011,cust_004,travel,940.00,2026-02-08
txn_012,cust_004,subscriptions,64.99,2026-02-11
77 changes: 77 additions & 0 deletions samples/llm-vector-langchain/dozer-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
app_name: llm-vector-langchain-sample
version: 1

api:
rest:
port: 8080
url: "[::0]"
cors: true
grpc:
port: 50051
url: "[::0]"
cors: true
web: true
auth: false
internal:
port: 50052
host: "[::1]"

connections:
- db_type: ObjectStore
name: local_customer_data
authentication: !LocalStorage
details:
path: ./data
tables:
- !Table
name: customer_profiles
config: !CSV
path: customer_profiles
extension: .csv
- !Table
name: transactions
config: !CSV
path: transactions
extension: .csv
- !Table
name: credit_card_products
config: !CSV
path: credit_card_products
extension: .csv

sources:
- name: customer_profiles
table_name: customer_profiles
connection: local_customer_data
columns:
- customer_id
- name: transactions
table_name: transactions
connection: local_customer_data
columns:
- transaction_id
- name: credit_card_products
table_name: credit_card_products
connection: local_customer_data
columns:
- product_id

endpoints:
- name: customer_profiles
path: /customer_profiles
sql: SELECT customer_id, age, income_band, segment, goals, risk_tolerance FROM customer_profiles;
index:
primary_key:
- customer_id
- name: transactions
path: /transactions
sql: SELECT transaction_id, customer_id, merchant_category, amount_usd, txn_date FROM transactions;
index:
primary_key:
- transaction_id
- name: credit_card_products
path: /credit_card_products
sql: SELECT product_id, name, annual_fee_usd, rewards_focus, intro_offer, eligibility, caveat FROM credit_card_products;
index:
primary_key:
- product_id
24 changes: 24 additions & 0 deletions samples/llm-vector-langchain/dozer_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from typing import Any

import requests


class DozerClient:
def __init__(self, base_url: str) -> None:
self.base_url = base_url.rstrip("/")

def fetch_endpoint(self, endpoint: str) -> list[dict[str, Any]]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
response = requests.get(url, timeout=15)
response.raise_for_status()
payload = response.json()
if isinstance(payload, list):
return [dict(row) for row in payload]
if isinstance(payload, dict):
for key in ("data", "records", "items"):
value = payload.get(key)
if isinstance(value, list):
return [dict(row) for row in value]
raise ValueError(f"Unexpected Dozer response shape from {url}: {payload!r}")
Loading