From ded7872f9bc1efc809ac92d25ee64971314775c4 Mon Sep 17 00:00:00 2001 From: Varun Singh Date: Thu, 11 Jun 2026 21:04:50 +0530 Subject: [PATCH] feat(projects): project-centric scoping foundation (UX redesign P1) - projects table (settings JSONB: default_timeline_window, auto_load_timeline, max_events) + project_id on log_chunks/watcher_configs/conversations/ investigations; real signature column on log_chunks (un-defers Path B from cluster_view) with idempotent backfill; Default project seed absorbs all pre-project rows - /projects CRUD + /projects/{id}/services; resolve_project shared name-or-id resolver (name get-or-create, uuid must exist, blank -> Default) - ingest stamps signature + project_id (API form field, worker via watcher_configs.project_id); /watchers accepts project_id - scoping: RetrievalFilters.project_id applies to both vector + FTS arms; every ReAct tool gains project_id injected via container closures (LLM never sees it; cache keys include it); chat + investigate accept/inherit project_id; known_services resolved per project - conversations list/detail return project_id + name for the sidebar Verified live: zk ingested into 'Infra', ssh into Default; Infra-scoped chat cites only Infra chunks; asking Infra about ssh-server clarifies with zero cross-project leakage. --- db/schema.sql | 50 ++++++++ repi/api/__init__.py | 2 + repi/api/chat.py | 16 ++- repi/api/conversations.py | 17 ++- repi/api/ingest.py | 13 +- repi/api/investigate.py | 34 ++++-- repi/api/projects.py | 202 +++++++++++++++++++++++++++++++ repi/api/watchers.py | 12 +- repi/core/container.py | 43 ++++++- repi/ingestion/log_ingestor.py | 11 +- repi/investigation/store.py | 6 +- repi/investigation/tools.py | 40 ++++-- repi/models/filters.py | 3 + repi/models/schema.py | 26 ++++ repi/retrieval/filter_builder.py | 3 + repi/retrieval/pgvector_store.py | 14 ++- repi/worker.py | 4 +- tests/api/test_projects.py | 97 +++++++++++++++ 18 files changed, 552 insertions(+), 41 deletions(-) create mode 100644 repi/api/projects.py create mode 100644 tests/api/test_projects.py diff --git a/db/schema.sql b/db/schema.sql index 7db6e9f..c3e846e 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -186,3 +186,53 @@ CREATE INDEX IF NOT EXISTS leaderboard_dataset_idx ON leaderboard (dataset); CREATE INDEX IF NOT EXISTS leaderboard_score_idx ON leaderboard (dataset, aggregate_score DESC); CREATE INDEX IF NOT EXISTS leaderboard_created_idx ON leaderboard (created_at DESC); CREATE INDEX IF NOT EXISTS leaderboard_backend_idx ON leaderboard (embedding_backend); + +-- ── Projects (UX redesign P1) ──────────────────────────────────────────────── +-- A project is a logical system/application (e.g. "Ecommerce Platform"). +-- Workers, services (via log_chunks), conversations and investigations are +-- scoped to a project. `settings` keys: default_timeline_window ("5h"), +-- auto_load_timeline (true), max_events (25) — read by /projects/{id}/overview. +CREATE TABLE IF NOT EXISTS projects ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name TEXT UNIQUE NOT NULL, + settings JSONB NOT NULL DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +ALTER TABLE log_chunks ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL; +ALTER TABLE watcher_configs ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL; +ALTER TABLE conversations ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL; +ALTER TABLE investigations ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL; + +CREATE INDEX IF NOT EXISTS log_chunks_project_ts_idx ON log_chunks (project_id, timestamp_start); + +-- Real signature column ("Path B" from cluster_view.py): enables corpus-wide +-- GROUP BY signature for the project overview's clusters and event feed, +-- instead of re-parsing it out of `text` per row in Python. The ingestor +-- writes it for new rows; the UPDATE below backfills pre-existing rows from +-- the templated text body ("Signature: \nExamples: ..."). +ALTER TABLE log_chunks ADD COLUMN IF NOT EXISTS signature TEXT; +UPDATE log_chunks + SET signature = split_part(split_part(text, E'\n', 1), 'Signature: ', 2) + WHERE signature IS NULL; +CREATE INDEX IF NOT EXISTS log_chunks_signature_idx ON log_chunks (project_id, signature); + +-- Seed a Default project on first run and absorb all pre-project rows into it. +-- Idempotent: the seed only fires when `projects` is empty, and the backfills +-- only touch NULL project_id rows. +DO $$ +DECLARE + default_id UUID; +BEGIN + IF NOT EXISTS (SELECT 1 FROM projects) THEN + INSERT INTO projects (name) VALUES ('Default'); + END IF; + SELECT id INTO default_id FROM projects WHERE name = 'Default'; + IF default_id IS NOT NULL THEN + UPDATE log_chunks SET project_id = default_id WHERE project_id IS NULL; + UPDATE watcher_configs SET project_id = default_id WHERE project_id IS NULL; + UPDATE conversations SET project_id = default_id WHERE project_id IS NULL; + UPDATE investigations SET project_id = default_id WHERE project_id IS NULL; + END IF; +END $$; diff --git a/repi/api/__init__.py b/repi/api/__init__.py index 20116f3..7c5544b 100644 --- a/repi/api/__init__.py +++ b/repi/api/__init__.py @@ -11,6 +11,7 @@ from repi.api.config import router as config_router from repi.api.chat import router as chat_router from repi.api.conversations import router as conversations_router +from repi.api.projects import router as projects_router logger = logging.getLogger("repi.api") @@ -44,6 +45,7 @@ async def lifespan(app: FastAPI): app.include_router(config_router, tags=["config"]) app.include_router(chat_router, tags=["chat"]) app.include_router(conversations_router, tags=["conversations"]) +app.include_router(projects_router, tags=["projects"]) @app.get("/health", tags=["health"]) diff --git a/repi/api/chat.py b/repi/api/chat.py index 57a39cf..7c19874 100644 --- a/repi/api/chat.py +++ b/repi/api/chat.py @@ -84,6 +84,9 @@ class ChatRequest(BaseModel): # payloads early. The legitimate caller only ever sends the last # assistant turn's citations (≤10 in practice). previous_chunk_ids: List[str] = Field(default_factory=list, max_length=50) + # UX P1: scopes retrieval + known-services resolution to one project. If + # omitted but the conversation has a project, that project applies. + project_id: Optional[UUID] = None # ── Module-level constants ──────────────────────────────────────────────────── @@ -171,9 +174,10 @@ async def event_generator(): # Resolve or create the conversation row up front so every event the # client sees can carry the (eventual) conversation_id. conversation_id = req.conversation_id + project_id = req.project_id async with container.async_session_maker() as session: if conversation_id is None: - conv = Conversation(title=req.query[:80]) + conv = Conversation(title=req.query[:80], project_id=project_id) session.add(conv) await session.commit() await session.refresh(conv) @@ -185,9 +189,12 @@ async def event_generator(): res = await session.exec(stmt) existing = res.first() if existing is None: - conv = Conversation(id=conversation_id, title=req.query[:80]) + conv = Conversation(id=conversation_id, title=req.query[:80], project_id=project_id) session.add(conv) await session.commit() + elif project_id is None: + # Inherit the conversation's project when not pinned. + project_id = existing.project_id # Persist the user turn immediately — the client gets a citation-free # echo if the LLM call errors out mid-stream. @@ -202,7 +209,7 @@ async def event_generator(): try: # ── Intent resolution ──────────────────────────────────────────── now = _dh.now() - known_services = container.known_services or [] + known_services = await container.get_known_services(project_id) or [] resolution = resolve_intent(req.query, known_services, now) if isinstance(resolution, ClarificationNeeded): @@ -314,6 +321,7 @@ async def event_generator(): source_service=service, time_from=time_from, time_to=time_to, + project_id=project_id, ) # search_diverse over-fetches then service-stratifies the top-k # so a noisy single service can't crowd out the cross-service @@ -342,7 +350,7 @@ async def event_generator(): # Entity-bias merge: UNION RRF + find_logs_by_id, deduped by chunk_id. if entities and container.pool is not None: for ent in entities: - extra = await find_logs_by_id(container.pool, entity=ent, top_k=20) + extra = await find_logs_by_id(container.pool, entity=ent, top_k=20, project_id=project_id) for c in extra: if c["chunk_id"] in seen: continue diff --git a/repi/api/conversations.py b/repi/api/conversations.py index bf2c105..13548d5 100644 --- a/repi/api/conversations.py +++ b/repi/api/conversations.py @@ -15,7 +15,7 @@ from sqlmodel import select from repi.core.container import get_container -from repi.models.schema import ChatMessage, Conversation, Investigation +from repi.models.schema import ChatMessage, Conversation, Investigation, Project logger = logging.getLogger("repi.api.conversations") @@ -25,6 +25,8 @@ class ConversationSummary(BaseModel): id: str title: Optional[str] + project_id: Optional[str] = None + project_name: Optional[str] = None created_at: str updated_at: str @@ -43,6 +45,8 @@ class TranscriptTurn(BaseModel): class ConversationDetail(BaseModel): id: str title: Optional[str] + project_id: Optional[str] = None + project_name: Optional[str] = None created_at: str updated_at: str turns: List[TranscriptTurn] @@ -59,10 +63,14 @@ async def list_conversations(limit: int = 50): ) res = await session.exec(stmt) rows = list(res.all()) + name_res = await session.exec(select(Project.id, Project.name)) + project_names = {pid: name for pid, name in name_res.all()} return [ ConversationSummary( id=str(c.id), title=c.title, + project_id=str(c.project_id) if c.project_id else None, + project_name=project_names.get(c.project_id), created_at=c.created_at.isoformat(), updated_at=c.updated_at.isoformat(), ) @@ -84,6 +92,11 @@ async def get_conversation(conversation_id: str): if conv is None: raise HTTPException(status_code=404, detail="Conversation not found") + project_name = None + if conv.project_id is not None: + proj = await session.get(Project, conv.project_id) + project_name = proj.name if proj else None + msg_res = await session.exec( select(ChatMessage) .where(ChatMessage.conversation_id == cid) @@ -125,6 +138,8 @@ async def get_conversation(conversation_id: str): return ConversationDetail( id=str(conv.id), title=conv.title, + project_id=str(conv.project_id) if conv.project_id else None, + project_name=project_name, created_at=conv.created_at.isoformat(), updated_at=conv.updated_at.isoformat(), turns=turns, diff --git a/repi/api/ingest.py b/repi/api/ingest.py index 7d74bc7..c765fd4 100644 --- a/repi/api/ingest.py +++ b/repi/api/ingest.py @@ -9,6 +9,7 @@ class IngestResponse(BaseModel): service: str + project: str chunk_count: int lines_total: int lines_with_timestamp: int @@ -18,18 +19,25 @@ class IngestResponse(BaseModel): @router.post("/ingest", response_model=IngestResponse) async def ingest( service: str = Form(...), - file: UploadFile = File(...) + file: UploadFile = File(...), + project: str | None = Form(None), ): """ Ingest logs from a file for a specific service. + + `project` is a project name (get-or-create) or id (must exist); omitted + → the Default project. """ + from repi.api.projects import resolve_project + content = await file.read() content_str = content.decode("utf-8") container = get_container() async with container.get_session() as session: + project_row = await resolve_project(session, project) ingestor = container.get_ingestor(session) - stats = await ingestor.ingest(content_str, service) + stats = await ingestor.ingest(content_str, service, project_id=project_row.id) # Refresh the in-memory service list so a brand-new service is visible to # the intent resolver immediately, not only after a restart or GET /services. @@ -44,6 +52,7 @@ async def ingest( return IngestResponse( service=service, + project=project_row.name, chunk_count=stats.chunk_count, lines_total=stats.lines_total, lines_with_timestamp=stats.lines_with_timestamp, diff --git a/repi/api/investigate.py b/repi/api/investigate.py index bb9bc43..56e09bd 100644 --- a/repi/api/investigate.py +++ b/repi/api/investigate.py @@ -22,6 +22,9 @@ class InvestigateRequest(BaseModel): # conversation row is created and its id returned so the UI can attach # subsequent /chat turns to the same thread. conversation_id: Optional[UUID] = None + # UX P1: scopes retrieval + every ReAct tool to one project. If omitted + # but the conversation has a project, the conversation's project applies. + project_id: Optional[UUID] = None class InvestigationStepModel(BaseModel): step_number: int @@ -93,8 +96,9 @@ async def investigate(request: InvestigateRequest): from sqlalchemy import text as sa_text conversation_id = request.conversation_id + project_id = request.project_id if conversation_id is None: - conv = Conversation(title=request.query[:80]) + conv = Conversation(title=request.query[:80], project_id=project_id) session.add(conv) await session.commit() await session.refresh(conv) @@ -103,12 +107,18 @@ async def investigate(request: InvestigateRequest): # Validate it exists; if not, materialise with the caller's id. stmt = sm_select(Conversation).where(Conversation.id == conversation_id) res = await session.exec(stmt) - if res.first() is None: - session.add(Conversation(id=conversation_id, title=request.query[:80])) + existing = res.first() + if existing is None: + session.add(Conversation(id=conversation_id, title=request.query[:80], project_id=project_id)) await session.commit() + elif project_id is None: + # Inherit the conversation's project when the caller didn't pin one. + project_id = existing.project_id store = container.get_investigation_store(session) - investigation = await store.get_or_create(request.query, conversation_id=conversation_id) + investigation = await store.get_or_create( + request.query, conversation_id=conversation_id, project_id=project_id + ) # Bump the conversation's updated_at so the sidebar surfaces the # thread to the top even when activity is investigation-side rather @@ -173,14 +183,24 @@ async def stream_investigation(investigation_id: str): async def event_generator(): container = get_container() async with container.get_session() as session: - loop = container.get_investigation_loop(session) - store = loop.store + store = container.get_investigation_store(session) investigation = await store.get_by_id(uuid_obj) - + if not investigation: yield f"data: {json.dumps({'type': 'error', 'data': {'message': 'Investigation not found'}})}\n\n" return + # Build the loop scoped to the investigation's project: every tool + # call carries project_id and the resolver sees only that + # project's services. + scoped_services = await container.get_known_services(investigation.project_id) + loop = container.get_investigation_loop( + session, + project_id=investigation.project_id, + known_services=scoped_services, + ) + store = loop.store + steps = await store.get_steps(uuid_obj) for s in steps: # Persisted shape is {name, args}; the UI consumes {tool, args} diff --git a/repi/api/projects.py b/repi/api/projects.py new file mode 100644 index 0000000..3d3c2a3 --- /dev/null +++ b/repi/api/projects.py @@ -0,0 +1,202 @@ +"""Projects API (UX redesign P1). + +A project is a logical system/application — workers, services, conversations +and investigations are scoped to one. `resolve_project` is the shared +name-or-id resolver used by /ingest (and the worker indirectly via +watcher_configs.project_id). +""" +from __future__ import annotations + +import logging +from datetime import datetime +from typing import Any, List, Optional +from uuid import UUID + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel +from sqlalchemy import text as sa_text +from sqlmodel import select + +from repi.core.container import get_container +from repi.models.schema import Project + +logger = logging.getLogger("repi.api.projects") + +router = APIRouter() + +DEFAULT_PROJECT_NAME = "Default" + +# Defaults merged under explicit per-project settings when read. +DEFAULT_SETTINGS: dict[str, Any] = { + "default_timeline_window": "5h", + "auto_load_timeline": True, + "max_events": 25, +} + + +def effective_settings(project: Project) -> dict[str, Any]: + return {**DEFAULT_SETTINGS, **(project.settings or {})} + + +async def resolve_project(session, project: Optional[str]) -> Project: + """Resolve a name-or-id reference to a Project row. + + - None/blank → the Default project (created if missing). + - UUID string → must exist (404 otherwise — a typo'd id should not + silently spawn a project named like a UUID). + - Anything else → get-or-create by name, so `curl -F project=payments` + works on first use. + """ + if not project or not project.strip(): + return await _get_or_create_by_name(session, DEFAULT_PROJECT_NAME) + + ref = project.strip() + try: + pid = UUID(ref) + except ValueError: + return await _get_or_create_by_name(session, ref) + + row = await session.get(Project, pid) + if row is None: + raise HTTPException(status_code=404, detail=f"Project {ref} not found") + return row + + +async def _get_or_create_by_name(session, name: str) -> Project: + res = await session.exec(select(Project).where(Project.name == name)) + row = res.first() + if row is not None: + return row + row = Project(name=name) + session.add(row) + await session.commit() + await session.refresh(row) + return row + + +# ── Models ─────────────────────────────────────────────────────────────────── + +class ProjectCreate(BaseModel): + name: str + settings: dict[str, Any] = {} + + +class ProjectUpdate(BaseModel): + name: Optional[str] = None + settings: Optional[dict[str, Any]] = None + + +class ProjectRead(BaseModel): + id: str + name: str + settings: dict[str, Any] + service_count: int = 0 + created_at: datetime + updated_at: datetime + + +class ProjectService(BaseModel): + name: str + chunk_count: int + last_seen: Optional[datetime] = None + + +# ── Endpoints ──────────────────────────────────────────────────────────────── + +@router.get("/projects", response_model=List[ProjectRead]) +async def list_projects(): + container = get_container() + async with container.get_session() as session: + res = await session.exec(select(Project).order_by(Project.created_at)) + projects = list(res.all()) + counts_res = await session.execute(sa_text( + "SELECT project_id, count(DISTINCT source_service) AS n " + "FROM log_chunks WHERE project_id IS NOT NULL GROUP BY project_id" + )) + counts = {row[0]: row[1] for row in counts_res} + return [ + ProjectRead( + id=str(p.id), + name=p.name, + settings=effective_settings(p), + service_count=counts.get(p.id, 0), + created_at=p.created_at, + updated_at=p.updated_at, + ) + for p in projects + ] + + +@router.post("/projects", response_model=ProjectRead) +async def create_project(body: ProjectCreate): + name = body.name.strip() + if not name: + raise HTTPException(status_code=400, detail="Project name must not be empty") + container = get_container() + async with container.get_session() as session: + res = await session.exec(select(Project).where(Project.name == name)) + if res.first() is not None: + raise HTTPException(status_code=409, detail=f"Project '{name}' already exists") + p = Project(name=name, settings=body.settings or {}) + session.add(p) + await session.commit() + await session.refresh(p) + return ProjectRead( + id=str(p.id), name=p.name, settings=effective_settings(p), + created_at=p.created_at, updated_at=p.updated_at, + ) + + +@router.get("/projects/{project_id}", response_model=ProjectRead) +async def get_project(project_id: UUID): + container = get_container() + async with container.get_session() as session: + p = await session.get(Project, project_id) + if p is None: + raise HTTPException(status_code=404, detail="Project not found") + return ProjectRead( + id=str(p.id), name=p.name, settings=effective_settings(p), + created_at=p.created_at, updated_at=p.updated_at, + ) + + +@router.patch("/projects/{project_id}", response_model=ProjectRead) +async def update_project(project_id: UUID, body: ProjectUpdate): + """Partial update; `settings` is merged over existing keys (same + merge-not-replace contract as PUT /config).""" + container = get_container() + async with container.get_session() as session: + p = await session.get(Project, project_id) + if p is None: + raise HTTPException(status_code=404, detail="Project not found") + if body.name is not None and body.name.strip(): + p.name = body.name.strip() + if body.settings is not None: + p.settings = {**(p.settings or {}), **body.settings} + p.updated_at = datetime.utcnow() + session.add(p) + await session.commit() + await session.refresh(p) + return ProjectRead( + id=str(p.id), name=p.name, settings=effective_settings(p), + created_at=p.created_at, updated_at=p.updated_at, + ) + + +@router.get("/projects/{project_id}/services", response_model=List[ProjectService]) +async def list_project_services(project_id: UUID): + container = get_container() + async with container.get_session() as session: + p = await session.get(Project, project_id) + if p is None: + raise HTTPException(status_code=404, detail="Project not found") + res = await session.execute(sa_text( + "SELECT source_service, count(*) AS n, max(timestamp_start) AS last_seen " + "FROM log_chunks WHERE project_id = :pid " + "GROUP BY source_service ORDER BY n DESC" + ), {"pid": project_id}) + rows = res.all() + return [ + ProjectService(name=r[0], chunk_count=r[1], last_seen=r[2]) + for r in rows + ] diff --git a/repi/api/watchers.py b/repi/api/watchers.py index 0227882..76d38e8 100644 --- a/repi/api/watchers.py +++ b/repi/api/watchers.py @@ -18,6 +18,7 @@ class WatcherConfigCreate(BaseModel): watch_path: str env: str = "production" enabled: bool = True + project_id: UUID | None = None class WatcherConfigRead(BaseModel): id: UUID @@ -25,6 +26,7 @@ class WatcherConfigRead(BaseModel): watch_path: str env: str enabled: bool + project_id: UUID | None = None created_at: datetime updated_at: datetime @@ -33,6 +35,7 @@ class WatcherConfigUpdate(BaseModel): watch_path: str = None env: str = None enabled: bool = None + project_id: UUID = None class WatcherStatus(BaseModel): file_path: str @@ -44,11 +47,18 @@ class WatcherStatus(BaseModel): async def create_watcher(config: WatcherConfigCreate): container = get_container() async with container.get_session() as session: + # No project given → Default, so every watcher (and the chunks its + # worker ingests) always lands in a project. + project_id = config.project_id + if project_id is None: + from repi.api.projects import resolve_project + project_id = (await resolve_project(session, None)).id db_config = WatcherConfig( service_name=config.service_name, watch_path=config.watch_path, env=config.env, - enabled=config.enabled + enabled=config.enabled, + project_id=project_id, ) session.add(db_config) await session.commit() diff --git a/repi/core/container.py b/repi/core/container.py index ca7885a..c4a9622 100644 --- a/repi/core/container.py +++ b/repi/core/container.py @@ -136,6 +136,24 @@ async def init_known_services(self) -> list[str]: logger.info(f"Loaded known services: {self.known_services}") return self.known_services + async def get_known_services(self, project_id=None) -> list[str]: + """Project-scoped service list (UX P1). Falls back to the global + cached list when no project is given. Queried per call — DISTINCT on + an indexed column; freshness matters more than the microseconds.""" + if project_id is None: + return self.known_services + services = await get_all_services(self.pool, project_id=project_id) + async with self.async_session_maker() as session: + from repi.models.schema import WatcherConfig + stmt = select(WatcherConfig.service_name).where( + WatcherConfig.enabled == True, WatcherConfig.project_id == project_id + ) + res = await session.exec(stmt) + for name in res.all(): + if name not in services: + services.append(name) + return services + def get_ingestor(self, session: AsyncSession) -> LogIngestor: vector_store = PgVectorStore(session) return LogIngestor(vector_store, self.embedding_func) @@ -145,13 +163,25 @@ def get_retrieval_service(self, session: AsyncSession) -> RRFRetrievalService: fts_retriever = PgFTSRetriever(session) return RRFRetrievalService(vector_store, fts_retriever, self.embedding_func) - def get_investigation_loop(self, session: AsyncSession) -> ReactInvestigationLoop: - """Create a ReAct loop with tools and persistence store.""" + def get_investigation_loop(self, session: AsyncSession, project_id=None, + known_services: list[str] | None = None) -> ReactInvestigationLoop: + """Create a ReAct loop with tools and persistence store. + + `project_id` scopes every tool to one project. It is injected here in + the tool closures — the LLM never sees (or controls) it, and the + cache key includes it so two projects can't share cached results. + """ llm = self.require_llm() retrieval_service = self.get_retrieval_service(session) store = InvestigationStore(session) + def scoped(kwargs: dict) -> dict: + if project_id is not None: + kwargs.setdefault("project_id", project_id) + return kwargs + async def cached_search_logs(**kwargs): + kwargs = scoped(kwargs) key = cache.make_key("search_logs", **kwargs) hit = await cache.get(key) if hit: return hit @@ -160,6 +190,7 @@ async def cached_search_logs(**kwargs): return res async def cached_service_summary(**kwargs): + kwargs = scoped(kwargs) key = cache.make_key("get_service_summary", **kwargs) hit = await cache.get(key) if hit: return hit @@ -168,6 +199,7 @@ async def cached_service_summary(**kwargs): return res async def cached_scan_window(**kwargs): + kwargs = scoped(kwargs) key = cache.make_key("scan_window", **kwargs) hit = await cache.get(key) if hit: return hit @@ -176,6 +208,7 @@ async def cached_scan_window(**kwargs): return res async def cached_find_logs_by_id(**kwargs): + kwargs = scoped(kwargs) key = cache.make_key("find_logs_by_id", **kwargs) hit = await cache.get(key) if hit: return hit @@ -185,16 +218,16 @@ async def cached_find_logs_by_id(**kwargs): tools = { "search_logs": cached_search_logs, - "get_timeline": lambda **kwargs: get_timeline(self.pool, **kwargs), + "get_timeline": lambda **kwargs: get_timeline(self.pool, **scoped(kwargs)), "scan_window": cached_scan_window, "get_service_summary": cached_service_summary, "find_logs_by_id": cached_find_logs_by_id, } - + return ReactInvestigationLoop( llm=llm, tools=tools, - known_services=self.known_services, + known_services=known_services if known_services is not None else self.known_services, pool=self.pool, store=store, enable_reflection=settings.ENABLE_REFLECTION, diff --git a/repi/ingestion/log_ingestor.py b/repi/ingestion/log_ingestor.py index cc0e6d5..2821359 100644 --- a/repi/ingestion/log_ingestor.py +++ b/repi/ingestion/log_ingestor.py @@ -7,6 +7,7 @@ from repi.ingestion.log_chunker import chunk_logs from repi.retrieval.pgvector_store import PgVectorStore import uuid +from uuid import UUID logger = logging.getLogger(__name__) @@ -25,9 +26,11 @@ def __init__(self, vector_store: PgVectorStore, embedding_func) -> None: self.vector_store = vector_store self.embedding_func = embedding_func - async def ingest(self, logs: str | List[str], source_service: str, source_env: str = "production") -> IngestStats: + async def ingest(self, logs: str | List[str], source_service: str, source_env: str = "production", + project_id: Optional[UUID] = None) -> IngestStats: """ - Ingest logs from a specific source. + Ingest logs from a specific source. `project_id` scopes the chunks to a + project (None = caller resolves to the Default project upstream). """ if not source_service: raise ValueError("source_service is required") @@ -65,7 +68,9 @@ async def ingest(self, logs: str | List[str], source_service: str, source_env: s source_env=source_env, log_level=chunk.log_level, timestamp_start=chunk.timestamp_start, - timestamp_end=chunk.timestamp_end + timestamp_end=chunk.timestamp_end, + signature=chunk.signature, + project_id=project_id, ) count += 1 diff --git a/repi/investigation/store.py b/repi/investigation/store.py index da3cba5..23ee4e1 100644 --- a/repi/investigation/store.py +++ b/repi/investigation/store.py @@ -28,6 +28,7 @@ async def get_or_create( self, query: str, conversation_id: Optional[UUID] = None, + project_id: Optional[UUID] = None, ) -> Investigation: """Find an existing active investigation for the same (query, conversation) or create a new one. Same query in a *different* conversation creates a @@ -45,15 +46,16 @@ async def get_or_create( logger.info(f"Resuming existing investigation: {investigation.id}") return investigation - return await self.create(query, conversation_id=conversation_id) + return await self.create(query, conversation_id=conversation_id, project_id=project_id) async def create( self, query: str, conversation_id: Optional[UUID] = None, + project_id: Optional[UUID] = None, ) -> Investigation: """Always create a fresh investigation.""" - investigation = Investigation(query=query, conversation_id=conversation_id) + investigation = Investigation(query=query, conversation_id=conversation_id, project_id=project_id) self.session.add(investigation) await self.session.commit() await self.session.refresh(investigation) diff --git a/repi/investigation/tools.py b/repi/investigation/tools.py index 4803a8c..92d130c 100644 --- a/repi/investigation/tools.py +++ b/repi/investigation/tools.py @@ -52,13 +52,15 @@ async def search_logs( time_to: str | None = None, level: str | None = None, top_k: int = 10, + project_id: uuid.UUID | None = None, ) -> list[dict]: """Search log chunks by query, service, time range, and level.""" filters = RetrievalFilters( source_service=service, log_level=level, time_from=_parse_iso_timestamp(time_from), - time_to=_parse_iso_timestamp(time_to) + time_to=_parse_iso_timestamp(time_to), + project_id=project_id, ) if query and query.strip(): @@ -89,14 +91,16 @@ async def search_logs( async def get_timeline( pool: asyncpg.Pool, chunk_ids: list[str], + project_id: uuid.UUID | None = None, ) -> list[dict]: """Sort chunks by timestamp to see the sequence of events.""" if not chunk_ids: return [] - + rows = await pool.fetch( - "SELECT chunk_id, source_service, log_level, timestamp_start, text FROM log_chunks WHERE chunk_id = ANY($1) ORDER BY timestamp_start", - chunk_ids + "SELECT chunk_id, source_service, log_level, timestamp_start, text FROM log_chunks " + "WHERE chunk_id = ANY($1) AND ($2::uuid IS NULL OR project_id = $2) ORDER BY timestamp_start", + chunk_ids, project_id ) return [{ @@ -116,6 +120,7 @@ async def scan_window( top_k: int = 50, pre_context_seconds: int = 60, pre_context_per_service_limit: int = 20, + project_id: uuid.UUID | None = None, ) -> dict: """ Two-phase scan: ERROR/WARNING symptoms + a level-agnostic walk-back that @@ -161,6 +166,7 @@ async def scan_window( WHERE timestamp_start BETWEEN $1 AND $2 AND log_level IN ('ERROR', 'WARNING') AND ($3::text[] IS NULL OR source_service = ANY($3)) + AND ($4::uuid IS NULL OR project_id = $4) GROUP BY source_service ORDER BY first_error NULLS LAST """ @@ -171,13 +177,14 @@ async def scan_window( WHERE timestamp_start BETWEEN $1 AND $2 AND log_level = ANY($3) AND ($4::text[] IS NULL OR source_service = ANY($4)) + AND ($6::uuid IS NULL OR project_id = $6) ORDER BY timestamp_start LIMIT $5 """ summary_rows, log_rows = await asyncio.gather( - pool.fetch(summary_sql, time_from_dt, time_to_dt, services), - pool.fetch(logs_sql, time_from_dt, time_to_dt, effective_level, services, top_k), + pool.fetch(summary_sql, time_from_dt, time_to_dt, services, project_id), + pool.fetch(logs_sql, time_from_dt, time_to_dt, effective_level, services, top_k, project_id), ) summary = { @@ -228,6 +235,7 @@ async def scan_window( WHERE timestamp_start BETWEEN $1 AND $2 AND log_level = 'ERROR' AND source_service = ANY($3) + AND ($6::uuid IS NULL OR project_id = $6) GROUP BY source_service ), candidates AS ( @@ -243,6 +251,7 @@ async def scan_window( AND lc.timestamp_start < fe.first_error AND lc.log_level IS DISTINCT FROM 'ERROR' AND lc.log_level IS DISTINCT FROM 'DEBUG' + AND ($6::uuid IS NULL OR lc.project_id = $6) ) SELECT chunk_id, source_service, log_level, timestamp_start, text FROM candidates @@ -256,6 +265,7 @@ async def scan_window( services_with_errors, str(pre_context_seconds), pre_context_per_service_limit, + project_id, ) pre_context_logs = [ { @@ -285,6 +295,7 @@ async def find_logs_by_id( entity: str, top_k: int = 50, min_similarity: float = 0.6, + project_id: uuid.UUID | None = None, ) -> list[dict]: """Find log chunks containing (or fuzzy-matching) an entity token. @@ -312,13 +323,15 @@ async def find_logs_by_id( CASE WHEN text ILIKE '%' || $1 || '%' THEN 1.0::real ELSE word_similarity($1, text) END AS sim FROM log_chunks - WHERE text ILIKE '%' || $1 || '%' - OR text %> $1 + WHERE (text ILIKE '%' || $1 || '%' + OR text %> $1) + AND ($3::uuid IS NULL OR project_id = $3) ORDER BY sim DESC, timestamp_start DESC LIMIT $2 """, entity.strip(), top_k, + project_id, ) return [ { @@ -339,6 +352,7 @@ async def get_service_summary( service: str, time_from: str | None = None, time_to: str | None = None, + project_id: uuid.UUID | None = None, ) -> dict: """Get high-level statistics for a service using raw SQL (Bug 1 Fix).""" time_from_dt = _parse_iso_timestamp(time_from) @@ -358,8 +372,9 @@ async def get_service_summary( WHERE source_service = $1 AND ($2::timestamptz IS NULL OR timestamp_start >= $2) AND ($3::timestamptz IS NULL OR timestamp_end <= $3) + AND ($4::uuid IS NULL OR project_id = $4) """, - service, time_from_dt, time_to_dt, + service, time_from_dt, time_to_dt, project_id, ) return { @@ -373,9 +388,12 @@ async def get_service_summary( "latest": DateHandler.to_iso(row["latest"]), } -async def get_all_services(pool: asyncpg.Pool) -> list[str]: +async def get_all_services(pool: asyncpg.Pool, project_id: uuid.UUID | None = None) -> list[str]: """Dynamically fetch all unique services currently in the database.""" - rows = await pool.fetch("SELECT DISTINCT source_service FROM log_chunks") + rows = await pool.fetch( + "SELECT DISTINCT source_service FROM log_chunks WHERE ($1::uuid IS NULL OR project_id = $1)", + project_id, + ) return [r["source_service"] for r in rows] TOOL_SCHEMAS = { diff --git a/repi/models/filters.py b/repi/models/filters.py index 7107a79..009a78d 100644 --- a/repi/models/filters.py +++ b/repi/models/filters.py @@ -1,6 +1,7 @@ from __future__ import annotations from dataclasses import dataclass from datetime import datetime +from uuid import UUID @dataclass class RetrievalFilters: @@ -9,3 +10,5 @@ class RetrievalFilters: log_level: str | list[str] | None = None time_from: datetime | None = None time_to: datetime | None = None + # Scopes retrieval to one project (UX P1). None = no project filter. + project_id: UUID | None = None diff --git a/repi/models/schema.py b/repi/models/schema.py index f726127..bb62aaa 100644 --- a/repi/models/schema.py +++ b/repi/models/schema.py @@ -8,6 +8,24 @@ from pydantic import field_validator from sqlalchemy.dialects.postgresql import JSONB, TSVECTOR +class Project(SQLModel, table=True): + __tablename__ = "projects" + + id: UUID = Field(default_factory=uuid4, primary_key=True) + name: str = Field(sa_column=Column(TEXT, unique=True, nullable=False)) + # Keys: default_timeline_window ("5h"), auto_load_timeline (true), + # max_events (25). Read by /projects/{id}/overview. + settings: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSONB, nullable=False)) + created_at: datetime = Field( + default_factory=datetime.utcnow, + sa_column=Column(DateTime(timezone=True), nullable=False), + ) + updated_at: datetime = Field( + default_factory=datetime.utcnow, + sa_column=Column(DateTime(timezone=True), nullable=False), + ) + + class LogChunk(SQLModel, table=True): __tablename__ = "log_chunks" @@ -24,6 +42,10 @@ def coerce_embedding(cls, v): id: UUID = Field(default_factory=uuid4, primary_key=True) chunk_id: str = Field(index=True, unique=True) + project_id: Optional[UUID] = Field(default=None, index=True) + # Extracted at ingest (chunk_logs signature); enables corpus-wide + # GROUP BY signature for the project overview without re-parsing `text`. + signature: Optional[str] = Field(default=None) source_service: str = Field(index=True) source_env: str = Field(default="production", index=True) log_level: Optional[str] = Field(default=None, index=True) @@ -70,6 +92,7 @@ class Conversation(SQLModel, table=True): id: UUID = Field(default_factory=uuid4, primary_key=True) title: Optional[str] = Field(default=None, sa_column=Column(TEXT)) + project_id: Optional[UUID] = Field(default=None, index=True) created_at: datetime = Field( default_factory=datetime.utcnow, sa_column=Column(DateTime(timezone=True), nullable=False), @@ -114,6 +137,8 @@ class Investigation(SQLModel, table=True): # an interleaved transcript. Not read by the ReAct loop — Deep Research is # intentionally stateless w.r.t. prior chat turns. conversation_id: Optional[UUID] = Field(default=None, index=True) + # UX P1: scopes the investigation's retrieval + tools to one project. + project_id: Optional[UUID] = Field(default=None, index=True) class InvestigationStep(SQLModel, table=True): __tablename__ = "investigation_steps" @@ -144,6 +169,7 @@ class WatcherConfig(SQLModel, table=True): id: UUID = Field(default_factory=uuid4, primary_key=True) service_name: str = Field(index=True) watch_path: str = Field(index=True) + project_id: Optional[UUID] = Field(default=None, index=True) env: str = Field(default="production") enabled: bool = Field(default=True) created_at: datetime = Field(default_factory=datetime.utcnow) diff --git a/repi/retrieval/filter_builder.py b/repi/retrieval/filter_builder.py index c96fcf9..417eb75 100644 --- a/repi/retrieval/filter_builder.py +++ b/repi/retrieval/filter_builder.py @@ -12,6 +12,9 @@ def build_filter_expressions(filters: RetrievalFilters) -> list[Any]: if filters.source_service: exprs.append(LogChunk.source_service == filters.source_service) + if filters.project_id: + exprs.append(LogChunk.project_id == filters.project_id) + if filters.source_env: exprs.append(LogChunk.source_env == filters.source_env) diff --git a/repi/retrieval/pgvector_store.py b/repi/retrieval/pgvector_store.py index d323585..1b12ef3 100644 --- a/repi/retrieval/pgvector_store.py +++ b/repi/retrieval/pgvector_store.py @@ -1,5 +1,6 @@ from datetime import datetime from typing import List, Tuple, Dict, Any, Optional +from uuid import UUID from sqlmodel import select, Session, and_ from sqlmodel.ext.asyncio.session import AsyncSession from repi.core.dates import DateHandler @@ -15,10 +16,11 @@ class PgVectorStore: def __init__(self, session: AsyncSession) -> None: self.session = session - async def upsert(self, chunk_id: str, embedding: List[float], text: str, source_service: str, - source_env: str = "production", log_level: Optional[str] = None, + async def upsert(self, chunk_id: str, embedding: List[float], text: str, source_service: str, + source_env: str = "production", log_level: Optional[str] = None, timestamp_start: Optional[datetime] = None, timestamp_end: Optional[datetime] = None, - log_metadata: Optional[Dict[str, Any]] = None) -> None: + log_metadata: Optional[Dict[str, Any]] = None, + signature: Optional[str] = None, project_id: Optional[UUID] = None) -> None: """Upsert a log chunk with its embedding and metadata.""" # Check if exists statement = select(LogChunk).where(LogChunk.chunk_id == chunk_id) @@ -43,6 +45,8 @@ async def upsert(self, chunk_id: str, embedding: List[float], text: str, source_ chunk.timestamp_start = ts_start chunk.timestamp_end = ts_end chunk.log_metadata = log_metadata + chunk.signature = signature + chunk.project_id = project_id else: chunk = LogChunk( chunk_id=chunk_id, @@ -53,7 +57,9 @@ async def upsert(self, chunk_id: str, embedding: List[float], text: str, source_ log_level=log_level, timestamp_start=ts_start, timestamp_end=ts_end, - log_metadata=log_metadata + log_metadata=log_metadata, + signature=signature, + project_id=project_id, ) self.session.add(chunk) diff --git a/repi/worker.py b/repi/worker.py index 1615663..443c309 100644 --- a/repi/worker.py +++ b/repi/worker.py @@ -112,7 +112,9 @@ async def handle_file_change(self, file_path: str): return ingestor = self.container.get_ingestor(session) - stats = await ingestor.ingest(new_content, config.service_name) + stats = await ingestor.ingest( + new_content, config.service_name, project_id=config.project_id + ) logger.info(f"Ingested {stats.chunk_count} chunks from {file_path}") await self.update_offset(session, config.id, file_path, file_size) diff --git a/tests/api/test_projects.py b/tests/api/test_projects.py new file mode 100644 index 0000000..0451c64 --- /dev/null +++ b/tests/api/test_projects.py @@ -0,0 +1,97 @@ +"""Projects (UX P1): resolver branches + settings merge + filter clause. + +DB-backed CRUD is exercised in the live walkthrough; here we pin the pure +logic: name-or-id resolution, settings defaulting, and the retrieval filter. +""" +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock +from uuid import uuid4 + +import pytest +from fastapi import HTTPException + +from repi.api.projects import ( + DEFAULT_PROJECT_NAME, + DEFAULT_SETTINGS, + effective_settings, + resolve_project, +) +from repi.models.filters import RetrievalFilters +from repi.models.schema import Project +from repi.retrieval.filter_builder import build_filter_expressions + + +# ── effective_settings ──────────────────────────────────────────────────────── + +def test_effective_settings_defaults_when_empty(): + p = Project(name="x", settings={}) + assert effective_settings(p) == DEFAULT_SETTINGS + + +def test_effective_settings_overrides_win(): + p = Project(name="x", settings={"default_timeline_window": "24h"}) + s = effective_settings(p) + assert s["default_timeline_window"] == "24h" + assert s["max_events"] == DEFAULT_SETTINGS["max_events"] + + +# ── resolve_project ─────────────────────────────────────────────────────────── + +def _session_returning(row): + session = MagicMock() + res = MagicMock() + res.first.return_value = row + session.exec = AsyncMock(return_value=res) + session.get = AsyncMock(return_value=row) + session.add = MagicMock() + session.commit = AsyncMock() + session.refresh = AsyncMock() + return session + + +@pytest.mark.asyncio +async def test_resolve_blank_falls_back_to_default_name(): + existing = Project(name=DEFAULT_PROJECT_NAME) + session = _session_returning(existing) + assert (await resolve_project(session, None)) is existing + assert (await resolve_project(session, " ")) is existing + + +@pytest.mark.asyncio +async def test_resolve_name_creates_when_missing(): + session = _session_returning(None) + created = await resolve_project(session, "payments") + session.add.assert_called_once() + assert created.name == "payments" + + +@pytest.mark.asyncio +async def test_resolve_uuid_must_exist(): + session = _session_returning(None) + with pytest.raises(HTTPException) as exc: + await resolve_project(session, str(uuid4())) + assert exc.value.status_code == 404 + # A typo'd id must NOT silently create a project named like a UUID. + session.add.assert_not_called() + + +@pytest.mark.asyncio +async def test_resolve_uuid_returns_existing(): + pid = uuid4() + existing = Project(id=pid, name="infra") + session = _session_returning(existing) + assert (await resolve_project(session, str(pid))) is existing + + +# ── retrieval filter clause ─────────────────────────────────────────────────── + +def test_filter_builder_no_project_no_clause(): + exprs = build_filter_expressions(RetrievalFilters()) + assert all("project_id" not in str(e) for e in exprs) + + +def test_filter_builder_project_clause_present(): + pid = uuid4() + exprs = build_filter_expressions(RetrievalFilters(project_id=pid)) + assert any("project_id" in str(e) for e in exprs)