From ded7872f9bc1efc809ac92d25ee64971314775c4 Mon Sep 17 00:00:00 2001 From: Varun Singh Date: Thu, 11 Jun 2026 21:04:50 +0530 Subject: [PATCH 1/7] feat(projects): project-centric scoping foundation (UX redesign P1) - projects table (settings JSONB: default_timeline_window, auto_load_timeline, max_events) + project_id on log_chunks/watcher_configs/conversations/ investigations; real signature column on log_chunks (un-defers Path B from cluster_view) with idempotent backfill; Default project seed absorbs all pre-project rows - /projects CRUD + /projects/{id}/services; resolve_project shared name-or-id resolver (name get-or-create, uuid must exist, blank -> Default) - ingest stamps signature + project_id (API form field, worker via watcher_configs.project_id); /watchers accepts project_id - scoping: RetrievalFilters.project_id applies to both vector + FTS arms; every ReAct tool gains project_id injected via container closures (LLM never sees it; cache keys include it); chat + investigate accept/inherit project_id; known_services resolved per project - conversations list/detail return project_id + name for the sidebar Verified live: zk ingested into 'Infra', ssh into Default; Infra-scoped chat cites only Infra chunks; asking Infra about ssh-server clarifies with zero cross-project leakage. --- db/schema.sql | 50 ++++++++ repi/api/__init__.py | 2 + repi/api/chat.py | 16 ++- repi/api/conversations.py | 17 ++- repi/api/ingest.py | 13 +- repi/api/investigate.py | 34 ++++-- repi/api/projects.py | 202 +++++++++++++++++++++++++++++++ repi/api/watchers.py | 12 +- repi/core/container.py | 43 ++++++- repi/ingestion/log_ingestor.py | 11 +- repi/investigation/store.py | 6 +- repi/investigation/tools.py | 40 ++++-- repi/models/filters.py | 3 + repi/models/schema.py | 26 ++++ repi/retrieval/filter_builder.py | 3 + repi/retrieval/pgvector_store.py | 14 ++- repi/worker.py | 4 +- tests/api/test_projects.py | 97 +++++++++++++++ 18 files changed, 552 insertions(+), 41 deletions(-) create mode 100644 repi/api/projects.py create mode 100644 tests/api/test_projects.py diff --git a/db/schema.sql b/db/schema.sql index 7db6e9f..c3e846e 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -186,3 +186,53 @@ CREATE INDEX IF NOT EXISTS leaderboard_dataset_idx ON leaderboard (dataset); CREATE INDEX IF NOT EXISTS leaderboard_score_idx ON leaderboard (dataset, aggregate_score DESC); CREATE INDEX IF NOT EXISTS leaderboard_created_idx ON leaderboard (created_at DESC); CREATE INDEX IF NOT EXISTS leaderboard_backend_idx ON leaderboard (embedding_backend); + +-- ── Projects (UX redesign P1) ──────────────────────────────────────────────── +-- A project is a logical system/application (e.g. "Ecommerce Platform"). +-- Workers, services (via log_chunks), conversations and investigations are +-- scoped to a project. `settings` keys: default_timeline_window ("5h"), +-- auto_load_timeline (true), max_events (25) — read by /projects/{id}/overview. +CREATE TABLE IF NOT EXISTS projects ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name TEXT UNIQUE NOT NULL, + settings JSONB NOT NULL DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +ALTER TABLE log_chunks ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL; +ALTER TABLE watcher_configs ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL; +ALTER TABLE conversations ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL; +ALTER TABLE investigations ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL; + +CREATE INDEX IF NOT EXISTS log_chunks_project_ts_idx ON log_chunks (project_id, timestamp_start); + +-- Real signature column ("Path B" from cluster_view.py): enables corpus-wide +-- GROUP BY signature for the project overview's clusters and event feed, +-- instead of re-parsing it out of `text` per row in Python. The ingestor +-- writes it for new rows; the UPDATE below backfills pre-existing rows from +-- the templated text body ("Signature: \nExamples: ..."). +ALTER TABLE log_chunks ADD COLUMN IF NOT EXISTS signature TEXT; +UPDATE log_chunks + SET signature = split_part(split_part(text, E'\n', 1), 'Signature: ', 2) + WHERE signature IS NULL; +CREATE INDEX IF NOT EXISTS log_chunks_signature_idx ON log_chunks (project_id, signature); + +-- Seed a Default project on first run and absorb all pre-project rows into it. +-- Idempotent: the seed only fires when `projects` is empty, and the backfills +-- only touch NULL project_id rows. +DO $$ +DECLARE + default_id UUID; +BEGIN + IF NOT EXISTS (SELECT 1 FROM projects) THEN + INSERT INTO projects (name) VALUES ('Default'); + END IF; + SELECT id INTO default_id FROM projects WHERE name = 'Default'; + IF default_id IS NOT NULL THEN + UPDATE log_chunks SET project_id = default_id WHERE project_id IS NULL; + UPDATE watcher_configs SET project_id = default_id WHERE project_id IS NULL; + UPDATE conversations SET project_id = default_id WHERE project_id IS NULL; + UPDATE investigations SET project_id = default_id WHERE project_id IS NULL; + END IF; +END $$; diff --git a/repi/api/__init__.py b/repi/api/__init__.py index 20116f3..7c5544b 100644 --- a/repi/api/__init__.py +++ b/repi/api/__init__.py @@ -11,6 +11,7 @@ from repi.api.config import router as config_router from repi.api.chat import router as chat_router from repi.api.conversations import router as conversations_router +from repi.api.projects import router as projects_router logger = logging.getLogger("repi.api") @@ -44,6 +45,7 @@ async def lifespan(app: FastAPI): app.include_router(config_router, tags=["config"]) app.include_router(chat_router, tags=["chat"]) app.include_router(conversations_router, tags=["conversations"]) +app.include_router(projects_router, tags=["projects"]) @app.get("/health", tags=["health"]) diff --git a/repi/api/chat.py b/repi/api/chat.py index 57a39cf..7c19874 100644 --- a/repi/api/chat.py +++ b/repi/api/chat.py @@ -84,6 +84,9 @@ class ChatRequest(BaseModel): # payloads early. The legitimate caller only ever sends the last # assistant turn's citations (≤10 in practice). previous_chunk_ids: List[str] = Field(default_factory=list, max_length=50) + # UX P1: scopes retrieval + known-services resolution to one project. If + # omitted but the conversation has a project, that project applies. + project_id: Optional[UUID] = None # ── Module-level constants ──────────────────────────────────────────────────── @@ -171,9 +174,10 @@ async def event_generator(): # Resolve or create the conversation row up front so every event the # client sees can carry the (eventual) conversation_id. conversation_id = req.conversation_id + project_id = req.project_id async with container.async_session_maker() as session: if conversation_id is None: - conv = Conversation(title=req.query[:80]) + conv = Conversation(title=req.query[:80], project_id=project_id) session.add(conv) await session.commit() await session.refresh(conv) @@ -185,9 +189,12 @@ async def event_generator(): res = await session.exec(stmt) existing = res.first() if existing is None: - conv = Conversation(id=conversation_id, title=req.query[:80]) + conv = Conversation(id=conversation_id, title=req.query[:80], project_id=project_id) session.add(conv) await session.commit() + elif project_id is None: + # Inherit the conversation's project when not pinned. + project_id = existing.project_id # Persist the user turn immediately — the client gets a citation-free # echo if the LLM call errors out mid-stream. @@ -202,7 +209,7 @@ async def event_generator(): try: # ── Intent resolution ──────────────────────────────────────────── now = _dh.now() - known_services = container.known_services or [] + known_services = await container.get_known_services(project_id) or [] resolution = resolve_intent(req.query, known_services, now) if isinstance(resolution, ClarificationNeeded): @@ -314,6 +321,7 @@ async def event_generator(): source_service=service, time_from=time_from, time_to=time_to, + project_id=project_id, ) # search_diverse over-fetches then service-stratifies the top-k # so a noisy single service can't crowd out the cross-service @@ -342,7 +350,7 @@ async def event_generator(): # Entity-bias merge: UNION RRF + find_logs_by_id, deduped by chunk_id. if entities and container.pool is not None: for ent in entities: - extra = await find_logs_by_id(container.pool, entity=ent, top_k=20) + extra = await find_logs_by_id(container.pool, entity=ent, top_k=20, project_id=project_id) for c in extra: if c["chunk_id"] in seen: continue diff --git a/repi/api/conversations.py b/repi/api/conversations.py index bf2c105..13548d5 100644 --- a/repi/api/conversations.py +++ b/repi/api/conversations.py @@ -15,7 +15,7 @@ from sqlmodel import select from repi.core.container import get_container -from repi.models.schema import ChatMessage, Conversation, Investigation +from repi.models.schema import ChatMessage, Conversation, Investigation, Project logger = logging.getLogger("repi.api.conversations") @@ -25,6 +25,8 @@ class ConversationSummary(BaseModel): id: str title: Optional[str] + project_id: Optional[str] = None + project_name: Optional[str] = None created_at: str updated_at: str @@ -43,6 +45,8 @@ class TranscriptTurn(BaseModel): class ConversationDetail(BaseModel): id: str title: Optional[str] + project_id: Optional[str] = None + project_name: Optional[str] = None created_at: str updated_at: str turns: List[TranscriptTurn] @@ -59,10 +63,14 @@ async def list_conversations(limit: int = 50): ) res = await session.exec(stmt) rows = list(res.all()) + name_res = await session.exec(select(Project.id, Project.name)) + project_names = {pid: name for pid, name in name_res.all()} return [ ConversationSummary( id=str(c.id), title=c.title, + project_id=str(c.project_id) if c.project_id else None, + project_name=project_names.get(c.project_id), created_at=c.created_at.isoformat(), updated_at=c.updated_at.isoformat(), ) @@ -84,6 +92,11 @@ async def get_conversation(conversation_id: str): if conv is None: raise HTTPException(status_code=404, detail="Conversation not found") + project_name = None + if conv.project_id is not None: + proj = await session.get(Project, conv.project_id) + project_name = proj.name if proj else None + msg_res = await session.exec( select(ChatMessage) .where(ChatMessage.conversation_id == cid) @@ -125,6 +138,8 @@ async def get_conversation(conversation_id: str): return ConversationDetail( id=str(conv.id), title=conv.title, + project_id=str(conv.project_id) if conv.project_id else None, + project_name=project_name, created_at=conv.created_at.isoformat(), updated_at=conv.updated_at.isoformat(), turns=turns, diff --git a/repi/api/ingest.py b/repi/api/ingest.py index 7d74bc7..c765fd4 100644 --- a/repi/api/ingest.py +++ b/repi/api/ingest.py @@ -9,6 +9,7 @@ class IngestResponse(BaseModel): service: str + project: str chunk_count: int lines_total: int lines_with_timestamp: int @@ -18,18 +19,25 @@ class IngestResponse(BaseModel): @router.post("/ingest", response_model=IngestResponse) async def ingest( service: str = Form(...), - file: UploadFile = File(...) + file: UploadFile = File(...), + project: str | None = Form(None), ): """ Ingest logs from a file for a specific service. + + `project` is a project name (get-or-create) or id (must exist); omitted + → the Default project. """ + from repi.api.projects import resolve_project + content = await file.read() content_str = content.decode("utf-8") container = get_container() async with container.get_session() as session: + project_row = await resolve_project(session, project) ingestor = container.get_ingestor(session) - stats = await ingestor.ingest(content_str, service) + stats = await ingestor.ingest(content_str, service, project_id=project_row.id) # Refresh the in-memory service list so a brand-new service is visible to # the intent resolver immediately, not only after a restart or GET /services. @@ -44,6 +52,7 @@ async def ingest( return IngestResponse( service=service, + project=project_row.name, chunk_count=stats.chunk_count, lines_total=stats.lines_total, lines_with_timestamp=stats.lines_with_timestamp, diff --git a/repi/api/investigate.py b/repi/api/investigate.py index bb9bc43..56e09bd 100644 --- a/repi/api/investigate.py +++ b/repi/api/investigate.py @@ -22,6 +22,9 @@ class InvestigateRequest(BaseModel): # conversation row is created and its id returned so the UI can attach # subsequent /chat turns to the same thread. conversation_id: Optional[UUID] = None + # UX P1: scopes retrieval + every ReAct tool to one project. If omitted + # but the conversation has a project, the conversation's project applies. + project_id: Optional[UUID] = None class InvestigationStepModel(BaseModel): step_number: int @@ -93,8 +96,9 @@ async def investigate(request: InvestigateRequest): from sqlalchemy import text as sa_text conversation_id = request.conversation_id + project_id = request.project_id if conversation_id is None: - conv = Conversation(title=request.query[:80]) + conv = Conversation(title=request.query[:80], project_id=project_id) session.add(conv) await session.commit() await session.refresh(conv) @@ -103,12 +107,18 @@ async def investigate(request: InvestigateRequest): # Validate it exists; if not, materialise with the caller's id. stmt = sm_select(Conversation).where(Conversation.id == conversation_id) res = await session.exec(stmt) - if res.first() is None: - session.add(Conversation(id=conversation_id, title=request.query[:80])) + existing = res.first() + if existing is None: + session.add(Conversation(id=conversation_id, title=request.query[:80], project_id=project_id)) await session.commit() + elif project_id is None: + # Inherit the conversation's project when the caller didn't pin one. + project_id = existing.project_id store = container.get_investigation_store(session) - investigation = await store.get_or_create(request.query, conversation_id=conversation_id) + investigation = await store.get_or_create( + request.query, conversation_id=conversation_id, project_id=project_id + ) # Bump the conversation's updated_at so the sidebar surfaces the # thread to the top even when activity is investigation-side rather @@ -173,14 +183,24 @@ async def stream_investigation(investigation_id: str): async def event_generator(): container = get_container() async with container.get_session() as session: - loop = container.get_investigation_loop(session) - store = loop.store + store = container.get_investigation_store(session) investigation = await store.get_by_id(uuid_obj) - + if not investigation: yield f"data: {json.dumps({'type': 'error', 'data': {'message': 'Investigation not found'}})}\n\n" return + # Build the loop scoped to the investigation's project: every tool + # call carries project_id and the resolver sees only that + # project's services. + scoped_services = await container.get_known_services(investigation.project_id) + loop = container.get_investigation_loop( + session, + project_id=investigation.project_id, + known_services=scoped_services, + ) + store = loop.store + steps = await store.get_steps(uuid_obj) for s in steps: # Persisted shape is {name, args}; the UI consumes {tool, args} diff --git a/repi/api/projects.py b/repi/api/projects.py new file mode 100644 index 0000000..3d3c2a3 --- /dev/null +++ b/repi/api/projects.py @@ -0,0 +1,202 @@ +"""Projects API (UX redesign P1). + +A project is a logical system/application — workers, services, conversations +and investigations are scoped to one. `resolve_project` is the shared +name-or-id resolver used by /ingest (and the worker indirectly via +watcher_configs.project_id). +""" +from __future__ import annotations + +import logging +from datetime import datetime +from typing import Any, List, Optional +from uuid import UUID + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel +from sqlalchemy import text as sa_text +from sqlmodel import select + +from repi.core.container import get_container +from repi.models.schema import Project + +logger = logging.getLogger("repi.api.projects") + +router = APIRouter() + +DEFAULT_PROJECT_NAME = "Default" + +# Defaults merged under explicit per-project settings when read. +DEFAULT_SETTINGS: dict[str, Any] = { + "default_timeline_window": "5h", + "auto_load_timeline": True, + "max_events": 25, +} + + +def effective_settings(project: Project) -> dict[str, Any]: + return {**DEFAULT_SETTINGS, **(project.settings or {})} + + +async def resolve_project(session, project: Optional[str]) -> Project: + """Resolve a name-or-id reference to a Project row. + + - None/blank → the Default project (created if missing). + - UUID string → must exist (404 otherwise — a typo'd id should not + silently spawn a project named like a UUID). + - Anything else → get-or-create by name, so `curl -F project=payments` + works on first use. + """ + if not project or not project.strip(): + return await _get_or_create_by_name(session, DEFAULT_PROJECT_NAME) + + ref = project.strip() + try: + pid = UUID(ref) + except ValueError: + return await _get_or_create_by_name(session, ref) + + row = await session.get(Project, pid) + if row is None: + raise HTTPException(status_code=404, detail=f"Project {ref} not found") + return row + + +async def _get_or_create_by_name(session, name: str) -> Project: + res = await session.exec(select(Project).where(Project.name == name)) + row = res.first() + if row is not None: + return row + row = Project(name=name) + session.add(row) + await session.commit() + await session.refresh(row) + return row + + +# ── Models ─────────────────────────────────────────────────────────────────── + +class ProjectCreate(BaseModel): + name: str + settings: dict[str, Any] = {} + + +class ProjectUpdate(BaseModel): + name: Optional[str] = None + settings: Optional[dict[str, Any]] = None + + +class ProjectRead(BaseModel): + id: str + name: str + settings: dict[str, Any] + service_count: int = 0 + created_at: datetime + updated_at: datetime + + +class ProjectService(BaseModel): + name: str + chunk_count: int + last_seen: Optional[datetime] = None + + +# ── Endpoints ──────────────────────────────────────────────────────────────── + +@router.get("/projects", response_model=List[ProjectRead]) +async def list_projects(): + container = get_container() + async with container.get_session() as session: + res = await session.exec(select(Project).order_by(Project.created_at)) + projects = list(res.all()) + counts_res = await session.execute(sa_text( + "SELECT project_id, count(DISTINCT source_service) AS n " + "FROM log_chunks WHERE project_id IS NOT NULL GROUP BY project_id" + )) + counts = {row[0]: row[1] for row in counts_res} + return [ + ProjectRead( + id=str(p.id), + name=p.name, + settings=effective_settings(p), + service_count=counts.get(p.id, 0), + created_at=p.created_at, + updated_at=p.updated_at, + ) + for p in projects + ] + + +@router.post("/projects", response_model=ProjectRead) +async def create_project(body: ProjectCreate): + name = body.name.strip() + if not name: + raise HTTPException(status_code=400, detail="Project name must not be empty") + container = get_container() + async with container.get_session() as session: + res = await session.exec(select(Project).where(Project.name == name)) + if res.first() is not None: + raise HTTPException(status_code=409, detail=f"Project '{name}' already exists") + p = Project(name=name, settings=body.settings or {}) + session.add(p) + await session.commit() + await session.refresh(p) + return ProjectRead( + id=str(p.id), name=p.name, settings=effective_settings(p), + created_at=p.created_at, updated_at=p.updated_at, + ) + + +@router.get("/projects/{project_id}", response_model=ProjectRead) +async def get_project(project_id: UUID): + container = get_container() + async with container.get_session() as session: + p = await session.get(Project, project_id) + if p is None: + raise HTTPException(status_code=404, detail="Project not found") + return ProjectRead( + id=str(p.id), name=p.name, settings=effective_settings(p), + created_at=p.created_at, updated_at=p.updated_at, + ) + + +@router.patch("/projects/{project_id}", response_model=ProjectRead) +async def update_project(project_id: UUID, body: ProjectUpdate): + """Partial update; `settings` is merged over existing keys (same + merge-not-replace contract as PUT /config).""" + container = get_container() + async with container.get_session() as session: + p = await session.get(Project, project_id) + if p is None: + raise HTTPException(status_code=404, detail="Project not found") + if body.name is not None and body.name.strip(): + p.name = body.name.strip() + if body.settings is not None: + p.settings = {**(p.settings or {}), **body.settings} + p.updated_at = datetime.utcnow() + session.add(p) + await session.commit() + await session.refresh(p) + return ProjectRead( + id=str(p.id), name=p.name, settings=effective_settings(p), + created_at=p.created_at, updated_at=p.updated_at, + ) + + +@router.get("/projects/{project_id}/services", response_model=List[ProjectService]) +async def list_project_services(project_id: UUID): + container = get_container() + async with container.get_session() as session: + p = await session.get(Project, project_id) + if p is None: + raise HTTPException(status_code=404, detail="Project not found") + res = await session.execute(sa_text( + "SELECT source_service, count(*) AS n, max(timestamp_start) AS last_seen " + "FROM log_chunks WHERE project_id = :pid " + "GROUP BY source_service ORDER BY n DESC" + ), {"pid": project_id}) + rows = res.all() + return [ + ProjectService(name=r[0], chunk_count=r[1], last_seen=r[2]) + for r in rows + ] diff --git a/repi/api/watchers.py b/repi/api/watchers.py index 0227882..76d38e8 100644 --- a/repi/api/watchers.py +++ b/repi/api/watchers.py @@ -18,6 +18,7 @@ class WatcherConfigCreate(BaseModel): watch_path: str env: str = "production" enabled: bool = True + project_id: UUID | None = None class WatcherConfigRead(BaseModel): id: UUID @@ -25,6 +26,7 @@ class WatcherConfigRead(BaseModel): watch_path: str env: str enabled: bool + project_id: UUID | None = None created_at: datetime updated_at: datetime @@ -33,6 +35,7 @@ class WatcherConfigUpdate(BaseModel): watch_path: str = None env: str = None enabled: bool = None + project_id: UUID = None class WatcherStatus(BaseModel): file_path: str @@ -44,11 +47,18 @@ class WatcherStatus(BaseModel): async def create_watcher(config: WatcherConfigCreate): container = get_container() async with container.get_session() as session: + # No project given → Default, so every watcher (and the chunks its + # worker ingests) always lands in a project. + project_id = config.project_id + if project_id is None: + from repi.api.projects import resolve_project + project_id = (await resolve_project(session, None)).id db_config = WatcherConfig( service_name=config.service_name, watch_path=config.watch_path, env=config.env, - enabled=config.enabled + enabled=config.enabled, + project_id=project_id, ) session.add(db_config) await session.commit() diff --git a/repi/core/container.py b/repi/core/container.py index ca7885a..c4a9622 100644 --- a/repi/core/container.py +++ b/repi/core/container.py @@ -136,6 +136,24 @@ async def init_known_services(self) -> list[str]: logger.info(f"Loaded known services: {self.known_services}") return self.known_services + async def get_known_services(self, project_id=None) -> list[str]: + """Project-scoped service list (UX P1). Falls back to the global + cached list when no project is given. Queried per call — DISTINCT on + an indexed column; freshness matters more than the microseconds.""" + if project_id is None: + return self.known_services + services = await get_all_services(self.pool, project_id=project_id) + async with self.async_session_maker() as session: + from repi.models.schema import WatcherConfig + stmt = select(WatcherConfig.service_name).where( + WatcherConfig.enabled == True, WatcherConfig.project_id == project_id + ) + res = await session.exec(stmt) + for name in res.all(): + if name not in services: + services.append(name) + return services + def get_ingestor(self, session: AsyncSession) -> LogIngestor: vector_store = PgVectorStore(session) return LogIngestor(vector_store, self.embedding_func) @@ -145,13 +163,25 @@ def get_retrieval_service(self, session: AsyncSession) -> RRFRetrievalService: fts_retriever = PgFTSRetriever(session) return RRFRetrievalService(vector_store, fts_retriever, self.embedding_func) - def get_investigation_loop(self, session: AsyncSession) -> ReactInvestigationLoop: - """Create a ReAct loop with tools and persistence store.""" + def get_investigation_loop(self, session: AsyncSession, project_id=None, + known_services: list[str] | None = None) -> ReactInvestigationLoop: + """Create a ReAct loop with tools and persistence store. + + `project_id` scopes every tool to one project. It is injected here in + the tool closures — the LLM never sees (or controls) it, and the + cache key includes it so two projects can't share cached results. + """ llm = self.require_llm() retrieval_service = self.get_retrieval_service(session) store = InvestigationStore(session) + def scoped(kwargs: dict) -> dict: + if project_id is not None: + kwargs.setdefault("project_id", project_id) + return kwargs + async def cached_search_logs(**kwargs): + kwargs = scoped(kwargs) key = cache.make_key("search_logs", **kwargs) hit = await cache.get(key) if hit: return hit @@ -160,6 +190,7 @@ async def cached_search_logs(**kwargs): return res async def cached_service_summary(**kwargs): + kwargs = scoped(kwargs) key = cache.make_key("get_service_summary", **kwargs) hit = await cache.get(key) if hit: return hit @@ -168,6 +199,7 @@ async def cached_service_summary(**kwargs): return res async def cached_scan_window(**kwargs): + kwargs = scoped(kwargs) key = cache.make_key("scan_window", **kwargs) hit = await cache.get(key) if hit: return hit @@ -176,6 +208,7 @@ async def cached_scan_window(**kwargs): return res async def cached_find_logs_by_id(**kwargs): + kwargs = scoped(kwargs) key = cache.make_key("find_logs_by_id", **kwargs) hit = await cache.get(key) if hit: return hit @@ -185,16 +218,16 @@ async def cached_find_logs_by_id(**kwargs): tools = { "search_logs": cached_search_logs, - "get_timeline": lambda **kwargs: get_timeline(self.pool, **kwargs), + "get_timeline": lambda **kwargs: get_timeline(self.pool, **scoped(kwargs)), "scan_window": cached_scan_window, "get_service_summary": cached_service_summary, "find_logs_by_id": cached_find_logs_by_id, } - + return ReactInvestigationLoop( llm=llm, tools=tools, - known_services=self.known_services, + known_services=known_services if known_services is not None else self.known_services, pool=self.pool, store=store, enable_reflection=settings.ENABLE_REFLECTION, diff --git a/repi/ingestion/log_ingestor.py b/repi/ingestion/log_ingestor.py index cc0e6d5..2821359 100644 --- a/repi/ingestion/log_ingestor.py +++ b/repi/ingestion/log_ingestor.py @@ -7,6 +7,7 @@ from repi.ingestion.log_chunker import chunk_logs from repi.retrieval.pgvector_store import PgVectorStore import uuid +from uuid import UUID logger = logging.getLogger(__name__) @@ -25,9 +26,11 @@ def __init__(self, vector_store: PgVectorStore, embedding_func) -> None: self.vector_store = vector_store self.embedding_func = embedding_func - async def ingest(self, logs: str | List[str], source_service: str, source_env: str = "production") -> IngestStats: + async def ingest(self, logs: str | List[str], source_service: str, source_env: str = "production", + project_id: Optional[UUID] = None) -> IngestStats: """ - Ingest logs from a specific source. + Ingest logs from a specific source. `project_id` scopes the chunks to a + project (None = caller resolves to the Default project upstream). """ if not source_service: raise ValueError("source_service is required") @@ -65,7 +68,9 @@ async def ingest(self, logs: str | List[str], source_service: str, source_env: s source_env=source_env, log_level=chunk.log_level, timestamp_start=chunk.timestamp_start, - timestamp_end=chunk.timestamp_end + timestamp_end=chunk.timestamp_end, + signature=chunk.signature, + project_id=project_id, ) count += 1 diff --git a/repi/investigation/store.py b/repi/investigation/store.py index da3cba5..23ee4e1 100644 --- a/repi/investigation/store.py +++ b/repi/investigation/store.py @@ -28,6 +28,7 @@ async def get_or_create( self, query: str, conversation_id: Optional[UUID] = None, + project_id: Optional[UUID] = None, ) -> Investigation: """Find an existing active investigation for the same (query, conversation) or create a new one. Same query in a *different* conversation creates a @@ -45,15 +46,16 @@ async def get_or_create( logger.info(f"Resuming existing investigation: {investigation.id}") return investigation - return await self.create(query, conversation_id=conversation_id) + return await self.create(query, conversation_id=conversation_id, project_id=project_id) async def create( self, query: str, conversation_id: Optional[UUID] = None, + project_id: Optional[UUID] = None, ) -> Investigation: """Always create a fresh investigation.""" - investigation = Investigation(query=query, conversation_id=conversation_id) + investigation = Investigation(query=query, conversation_id=conversation_id, project_id=project_id) self.session.add(investigation) await self.session.commit() await self.session.refresh(investigation) diff --git a/repi/investigation/tools.py b/repi/investigation/tools.py index 4803a8c..92d130c 100644 --- a/repi/investigation/tools.py +++ b/repi/investigation/tools.py @@ -52,13 +52,15 @@ async def search_logs( time_to: str | None = None, level: str | None = None, top_k: int = 10, + project_id: uuid.UUID | None = None, ) -> list[dict]: """Search log chunks by query, service, time range, and level.""" filters = RetrievalFilters( source_service=service, log_level=level, time_from=_parse_iso_timestamp(time_from), - time_to=_parse_iso_timestamp(time_to) + time_to=_parse_iso_timestamp(time_to), + project_id=project_id, ) if query and query.strip(): @@ -89,14 +91,16 @@ async def search_logs( async def get_timeline( pool: asyncpg.Pool, chunk_ids: list[str], + project_id: uuid.UUID | None = None, ) -> list[dict]: """Sort chunks by timestamp to see the sequence of events.""" if not chunk_ids: return [] - + rows = await pool.fetch( - "SELECT chunk_id, source_service, log_level, timestamp_start, text FROM log_chunks WHERE chunk_id = ANY($1) ORDER BY timestamp_start", - chunk_ids + "SELECT chunk_id, source_service, log_level, timestamp_start, text FROM log_chunks " + "WHERE chunk_id = ANY($1) AND ($2::uuid IS NULL OR project_id = $2) ORDER BY timestamp_start", + chunk_ids, project_id ) return [{ @@ -116,6 +120,7 @@ async def scan_window( top_k: int = 50, pre_context_seconds: int = 60, pre_context_per_service_limit: int = 20, + project_id: uuid.UUID | None = None, ) -> dict: """ Two-phase scan: ERROR/WARNING symptoms + a level-agnostic walk-back that @@ -161,6 +166,7 @@ async def scan_window( WHERE timestamp_start BETWEEN $1 AND $2 AND log_level IN ('ERROR', 'WARNING') AND ($3::text[] IS NULL OR source_service = ANY($3)) + AND ($4::uuid IS NULL OR project_id = $4) GROUP BY source_service ORDER BY first_error NULLS LAST """ @@ -171,13 +177,14 @@ async def scan_window( WHERE timestamp_start BETWEEN $1 AND $2 AND log_level = ANY($3) AND ($4::text[] IS NULL OR source_service = ANY($4)) + AND ($6::uuid IS NULL OR project_id = $6) ORDER BY timestamp_start LIMIT $5 """ summary_rows, log_rows = await asyncio.gather( - pool.fetch(summary_sql, time_from_dt, time_to_dt, services), - pool.fetch(logs_sql, time_from_dt, time_to_dt, effective_level, services, top_k), + pool.fetch(summary_sql, time_from_dt, time_to_dt, services, project_id), + pool.fetch(logs_sql, time_from_dt, time_to_dt, effective_level, services, top_k, project_id), ) summary = { @@ -228,6 +235,7 @@ async def scan_window( WHERE timestamp_start BETWEEN $1 AND $2 AND log_level = 'ERROR' AND source_service = ANY($3) + AND ($6::uuid IS NULL OR project_id = $6) GROUP BY source_service ), candidates AS ( @@ -243,6 +251,7 @@ async def scan_window( AND lc.timestamp_start < fe.first_error AND lc.log_level IS DISTINCT FROM 'ERROR' AND lc.log_level IS DISTINCT FROM 'DEBUG' + AND ($6::uuid IS NULL OR lc.project_id = $6) ) SELECT chunk_id, source_service, log_level, timestamp_start, text FROM candidates @@ -256,6 +265,7 @@ async def scan_window( services_with_errors, str(pre_context_seconds), pre_context_per_service_limit, + project_id, ) pre_context_logs = [ { @@ -285,6 +295,7 @@ async def find_logs_by_id( entity: str, top_k: int = 50, min_similarity: float = 0.6, + project_id: uuid.UUID | None = None, ) -> list[dict]: """Find log chunks containing (or fuzzy-matching) an entity token. @@ -312,13 +323,15 @@ async def find_logs_by_id( CASE WHEN text ILIKE '%' || $1 || '%' THEN 1.0::real ELSE word_similarity($1, text) END AS sim FROM log_chunks - WHERE text ILIKE '%' || $1 || '%' - OR text %> $1 + WHERE (text ILIKE '%' || $1 || '%' + OR text %> $1) + AND ($3::uuid IS NULL OR project_id = $3) ORDER BY sim DESC, timestamp_start DESC LIMIT $2 """, entity.strip(), top_k, + project_id, ) return [ { @@ -339,6 +352,7 @@ async def get_service_summary( service: str, time_from: str | None = None, time_to: str | None = None, + project_id: uuid.UUID | None = None, ) -> dict: """Get high-level statistics for a service using raw SQL (Bug 1 Fix).""" time_from_dt = _parse_iso_timestamp(time_from) @@ -358,8 +372,9 @@ async def get_service_summary( WHERE source_service = $1 AND ($2::timestamptz IS NULL OR timestamp_start >= $2) AND ($3::timestamptz IS NULL OR timestamp_end <= $3) + AND ($4::uuid IS NULL OR project_id = $4) """, - service, time_from_dt, time_to_dt, + service, time_from_dt, time_to_dt, project_id, ) return { @@ -373,9 +388,12 @@ async def get_service_summary( "latest": DateHandler.to_iso(row["latest"]), } -async def get_all_services(pool: asyncpg.Pool) -> list[str]: +async def get_all_services(pool: asyncpg.Pool, project_id: uuid.UUID | None = None) -> list[str]: """Dynamically fetch all unique services currently in the database.""" - rows = await pool.fetch("SELECT DISTINCT source_service FROM log_chunks") + rows = await pool.fetch( + "SELECT DISTINCT source_service FROM log_chunks WHERE ($1::uuid IS NULL OR project_id = $1)", + project_id, + ) return [r["source_service"] for r in rows] TOOL_SCHEMAS = { diff --git a/repi/models/filters.py b/repi/models/filters.py index 7107a79..009a78d 100644 --- a/repi/models/filters.py +++ b/repi/models/filters.py @@ -1,6 +1,7 @@ from __future__ import annotations from dataclasses import dataclass from datetime import datetime +from uuid import UUID @dataclass class RetrievalFilters: @@ -9,3 +10,5 @@ class RetrievalFilters: log_level: str | list[str] | None = None time_from: datetime | None = None time_to: datetime | None = None + # Scopes retrieval to one project (UX P1). None = no project filter. + project_id: UUID | None = None diff --git a/repi/models/schema.py b/repi/models/schema.py index f726127..bb62aaa 100644 --- a/repi/models/schema.py +++ b/repi/models/schema.py @@ -8,6 +8,24 @@ from pydantic import field_validator from sqlalchemy.dialects.postgresql import JSONB, TSVECTOR +class Project(SQLModel, table=True): + __tablename__ = "projects" + + id: UUID = Field(default_factory=uuid4, primary_key=True) + name: str = Field(sa_column=Column(TEXT, unique=True, nullable=False)) + # Keys: default_timeline_window ("5h"), auto_load_timeline (true), + # max_events (25). Read by /projects/{id}/overview. + settings: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSONB, nullable=False)) + created_at: datetime = Field( + default_factory=datetime.utcnow, + sa_column=Column(DateTime(timezone=True), nullable=False), + ) + updated_at: datetime = Field( + default_factory=datetime.utcnow, + sa_column=Column(DateTime(timezone=True), nullable=False), + ) + + class LogChunk(SQLModel, table=True): __tablename__ = "log_chunks" @@ -24,6 +42,10 @@ def coerce_embedding(cls, v): id: UUID = Field(default_factory=uuid4, primary_key=True) chunk_id: str = Field(index=True, unique=True) + project_id: Optional[UUID] = Field(default=None, index=True) + # Extracted at ingest (chunk_logs signature); enables corpus-wide + # GROUP BY signature for the project overview without re-parsing `text`. + signature: Optional[str] = Field(default=None) source_service: str = Field(index=True) source_env: str = Field(default="production", index=True) log_level: Optional[str] = Field(default=None, index=True) @@ -70,6 +92,7 @@ class Conversation(SQLModel, table=True): id: UUID = Field(default_factory=uuid4, primary_key=True) title: Optional[str] = Field(default=None, sa_column=Column(TEXT)) + project_id: Optional[UUID] = Field(default=None, index=True) created_at: datetime = Field( default_factory=datetime.utcnow, sa_column=Column(DateTime(timezone=True), nullable=False), @@ -114,6 +137,8 @@ class Investigation(SQLModel, table=True): # an interleaved transcript. Not read by the ReAct loop — Deep Research is # intentionally stateless w.r.t. prior chat turns. conversation_id: Optional[UUID] = Field(default=None, index=True) + # UX P1: scopes the investigation's retrieval + tools to one project. + project_id: Optional[UUID] = Field(default=None, index=True) class InvestigationStep(SQLModel, table=True): __tablename__ = "investigation_steps" @@ -144,6 +169,7 @@ class WatcherConfig(SQLModel, table=True): id: UUID = Field(default_factory=uuid4, primary_key=True) service_name: str = Field(index=True) watch_path: str = Field(index=True) + project_id: Optional[UUID] = Field(default=None, index=True) env: str = Field(default="production") enabled: bool = Field(default=True) created_at: datetime = Field(default_factory=datetime.utcnow) diff --git a/repi/retrieval/filter_builder.py b/repi/retrieval/filter_builder.py index c96fcf9..417eb75 100644 --- a/repi/retrieval/filter_builder.py +++ b/repi/retrieval/filter_builder.py @@ -12,6 +12,9 @@ def build_filter_expressions(filters: RetrievalFilters) -> list[Any]: if filters.source_service: exprs.append(LogChunk.source_service == filters.source_service) + if filters.project_id: + exprs.append(LogChunk.project_id == filters.project_id) + if filters.source_env: exprs.append(LogChunk.source_env == filters.source_env) diff --git a/repi/retrieval/pgvector_store.py b/repi/retrieval/pgvector_store.py index d323585..1b12ef3 100644 --- a/repi/retrieval/pgvector_store.py +++ b/repi/retrieval/pgvector_store.py @@ -1,5 +1,6 @@ from datetime import datetime from typing import List, Tuple, Dict, Any, Optional +from uuid import UUID from sqlmodel import select, Session, and_ from sqlmodel.ext.asyncio.session import AsyncSession from repi.core.dates import DateHandler @@ -15,10 +16,11 @@ class PgVectorStore: def __init__(self, session: AsyncSession) -> None: self.session = session - async def upsert(self, chunk_id: str, embedding: List[float], text: str, source_service: str, - source_env: str = "production", log_level: Optional[str] = None, + async def upsert(self, chunk_id: str, embedding: List[float], text: str, source_service: str, + source_env: str = "production", log_level: Optional[str] = None, timestamp_start: Optional[datetime] = None, timestamp_end: Optional[datetime] = None, - log_metadata: Optional[Dict[str, Any]] = None) -> None: + log_metadata: Optional[Dict[str, Any]] = None, + signature: Optional[str] = None, project_id: Optional[UUID] = None) -> None: """Upsert a log chunk with its embedding and metadata.""" # Check if exists statement = select(LogChunk).where(LogChunk.chunk_id == chunk_id) @@ -43,6 +45,8 @@ async def upsert(self, chunk_id: str, embedding: List[float], text: str, source_ chunk.timestamp_start = ts_start chunk.timestamp_end = ts_end chunk.log_metadata = log_metadata + chunk.signature = signature + chunk.project_id = project_id else: chunk = LogChunk( chunk_id=chunk_id, @@ -53,7 +57,9 @@ async def upsert(self, chunk_id: str, embedding: List[float], text: str, source_ log_level=log_level, timestamp_start=ts_start, timestamp_end=ts_end, - log_metadata=log_metadata + log_metadata=log_metadata, + signature=signature, + project_id=project_id, ) self.session.add(chunk) diff --git a/repi/worker.py b/repi/worker.py index 1615663..443c309 100644 --- a/repi/worker.py +++ b/repi/worker.py @@ -112,7 +112,9 @@ async def handle_file_change(self, file_path: str): return ingestor = self.container.get_ingestor(session) - stats = await ingestor.ingest(new_content, config.service_name) + stats = await ingestor.ingest( + new_content, config.service_name, project_id=config.project_id + ) logger.info(f"Ingested {stats.chunk_count} chunks from {file_path}") await self.update_offset(session, config.id, file_path, file_size) diff --git a/tests/api/test_projects.py b/tests/api/test_projects.py new file mode 100644 index 0000000..0451c64 --- /dev/null +++ b/tests/api/test_projects.py @@ -0,0 +1,97 @@ +"""Projects (UX P1): resolver branches + settings merge + filter clause. + +DB-backed CRUD is exercised in the live walkthrough; here we pin the pure +logic: name-or-id resolution, settings defaulting, and the retrieval filter. +""" +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock +from uuid import uuid4 + +import pytest +from fastapi import HTTPException + +from repi.api.projects import ( + DEFAULT_PROJECT_NAME, + DEFAULT_SETTINGS, + effective_settings, + resolve_project, +) +from repi.models.filters import RetrievalFilters +from repi.models.schema import Project +from repi.retrieval.filter_builder import build_filter_expressions + + +# ── effective_settings ──────────────────────────────────────────────────────── + +def test_effective_settings_defaults_when_empty(): + p = Project(name="x", settings={}) + assert effective_settings(p) == DEFAULT_SETTINGS + + +def test_effective_settings_overrides_win(): + p = Project(name="x", settings={"default_timeline_window": "24h"}) + s = effective_settings(p) + assert s["default_timeline_window"] == "24h" + assert s["max_events"] == DEFAULT_SETTINGS["max_events"] + + +# ── resolve_project ─────────────────────────────────────────────────────────── + +def _session_returning(row): + session = MagicMock() + res = MagicMock() + res.first.return_value = row + session.exec = AsyncMock(return_value=res) + session.get = AsyncMock(return_value=row) + session.add = MagicMock() + session.commit = AsyncMock() + session.refresh = AsyncMock() + return session + + +@pytest.mark.asyncio +async def test_resolve_blank_falls_back_to_default_name(): + existing = Project(name=DEFAULT_PROJECT_NAME) + session = _session_returning(existing) + assert (await resolve_project(session, None)) is existing + assert (await resolve_project(session, " ")) is existing + + +@pytest.mark.asyncio +async def test_resolve_name_creates_when_missing(): + session = _session_returning(None) + created = await resolve_project(session, "payments") + session.add.assert_called_once() + assert created.name == "payments" + + +@pytest.mark.asyncio +async def test_resolve_uuid_must_exist(): + session = _session_returning(None) + with pytest.raises(HTTPException) as exc: + await resolve_project(session, str(uuid4())) + assert exc.value.status_code == 404 + # A typo'd id must NOT silently create a project named like a UUID. + session.add.assert_not_called() + + +@pytest.mark.asyncio +async def test_resolve_uuid_returns_existing(): + pid = uuid4() + existing = Project(id=pid, name="infra") + session = _session_returning(existing) + assert (await resolve_project(session, str(pid))) is existing + + +# ── retrieval filter clause ─────────────────────────────────────────────────── + +def test_filter_builder_no_project_no_clause(): + exprs = build_filter_expressions(RetrievalFilters()) + assert all("project_id" not in str(e) for e in exprs) + + +def test_filter_builder_project_clause_present(): + pid = uuid4() + exprs = build_filter_expressions(RetrievalFilters(project_id=pid)) + assert any("project_id" in str(e) for e in exprs) From 3b3f38bd94727259f415d2dbbd4be566db66122a Mon Sep 17 00:00:00 2001 From: Varun Singh Date: Thu, 11 Jun 2026 21:29:05 +0530 Subject: [PATCH 2/7] =?UTF-8?q?feat(overview):=20timeline-first=20landing?= =?UTF-8?q?=20=E2=80=94=20event=20feed,=20project=20overview,=20guided=20a?= =?UTF-8?q?ctions=20(UX=20P2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - event_feed.py: deterministic event rules over per-(service, signature) time buckets — begins / spike / subsides / new_pattern / health transitions. Pure rule engine, no LLM cost per load; 15 unit tests. - GET /projects/{id}/overview: events + corpus-wide signature clusters (the un-deferred Path B) + services + derived suggested actions. Window anchors to now and falls back to the project's latest data so the landing page always tells the most recent story available. - UI: ProjectPicker (0 projects -> create, 1 -> auto-select, 2+ -> cards) replaces the empty-chat hero; ProjectOverview renders the timeline, clusters, services and suggested-action chips as the landing panel; chips route to Deep Research (grounded query: signature + service + time range pre-filled) or /chat; sidebar shows per-conversation project badges; chat + investigate carry the conversation's project. Verified live on real LogHub data: Infra overview shows 'zookeeper enters degraded state -> recovers' with clusters and 5 action chips; clicking an Investigate chip ran a scoped DR investigation end-to-end. 6/6 puppeteer flows pass against the new flow. --- repi/api/projects.py | 130 +++++++++ repi/retrieval/event_feed.py | 266 ++++++++++++++++++ tests/retrieval/test_event_feed.py | 150 ++++++++++ web/app/page.tsx | 67 ++++- .../conversations/ConversationSidebar.tsx | 10 +- web/components/projects/ProjectOverview.tsx | 191 +++++++++++++ web/components/projects/ProjectPicker.tsx | 114 ++++++++ web/lib/api.ts | 21 +- 8 files changed, 931 insertions(+), 18 deletions(-) create mode 100644 repi/retrieval/event_feed.py create mode 100644 tests/retrieval/test_event_feed.py create mode 100644 web/components/projects/ProjectOverview.tsx create mode 100644 web/components/projects/ProjectPicker.tsx diff --git a/repi/api/projects.py b/repi/api/projects.py index 3d3c2a3..c179be8 100644 --- a/repi/api/projects.py +++ b/repi/api/projects.py @@ -18,7 +18,9 @@ from sqlmodel import select from repi.core.container import get_container +from repi.core.dates import DateHandler from repi.models.schema import Project +from repi.retrieval.event_feed import derive_events, fetch_window_aggregates, parse_window logger = logging.getLogger("repi.api.projects") @@ -183,6 +185,134 @@ async def update_project(project_id: UUID, body: ProjectUpdate): ) +@router.get("/projects/{project_id}/overview") +async def project_overview( + project_id: UUID, + window: Optional[str] = None, + service: Optional[str] = None, +): + """Landing-page payload: heuristic timeline events, corpus-wide error + clusters, services, and suggested actions for one project + time window. + + Window anchors to NOW; when the window contains no data (historical + imports, idle systems) it re-anchors to the project's latest chunk so the + landing page always tells the most recent story available + (`anchored_to_latest: true` flags this for the UI). + """ + from datetime import datetime, timezone + + container = get_container() + async with container.get_session() as session: + p = await session.get(Project, project_id) + if p is None: + raise HTTPException(status_code=404, detail="Project not found") + settings = effective_settings(p) + window_str = window or settings["default_timeline_window"] + span = parse_window(window_str) + max_events = int(settings.get("max_events", 25)) + + pool = container.pool + time_to = datetime.now(timezone.utc) + time_from = time_to - span + + anchored_to_latest = False + in_window = await pool.fetchval( + "SELECT 1 FROM log_chunks WHERE project_id = $1 " + "AND timestamp_start >= $2 AND timestamp_start < $3 LIMIT 1", + project_id, time_from, time_to, + ) + if in_window is None: + latest = await pool.fetchval( + "SELECT max(timestamp_start) FROM log_chunks WHERE project_id = $1", + project_id, + ) + if latest is not None: + time_to = latest + (span / 100) # nudge so the latest row is < time_to + time_from = time_to - span + anchored_to_latest = True + + buckets, first_seen = await fetch_window_aggregates( + pool, project_id, time_from, time_to, service=service, + ) + events = derive_events(buckets, first_seen, time_from, time_to, max_events=max_events) + + cluster_rows = await pool.fetch( + """ + SELECT signature, count(*) AS n, + array_agg(DISTINCT source_service) AS services, + min(timestamp_start) AS first_ts, max(timestamp_start) AS last_ts + FROM log_chunks + WHERE project_id = $1 + AND timestamp_start >= $2 AND timestamp_start < $3 + AND signature IS NOT NULL AND signature <> '' + AND log_level IN ('ERROR', 'CRITICAL', 'FATAL', 'WARN', 'WARNING') + AND ($4::text IS NULL OR source_service = $4) + GROUP BY signature + ORDER BY n DESC + LIMIT 10 + """, + project_id, time_from, time_to, service, + ) + clusters = [ + { + "signature": r["signature"], + "count": r["n"], + "services": list(r["services"]), + "first_ts": DateHandler.to_iso(r["first_ts"]), + "last_ts": DateHandler.to_iso(r["last_ts"]), + } + for r in cluster_rows + ] + + svc_rows = await pool.fetch( + "SELECT source_service, count(*) AS n, max(timestamp_start) AS last_seen " + "FROM log_chunks WHERE project_id = $1 GROUP BY source_service ORDER BY n DESC", + project_id, + ) + services = [ + {"name": r["source_service"], "chunk_count": r["n"], + "last_seen": DateHandler.to_iso(r["last_seen"])} + for r in svc_rows + ] + + # Suggested actions — derived, never LLM-generated. Top clusters become + # Deep-Research entry points with the service + time range pre-filled so + # the investigation starts grounded instead of from a bare phrase. + suggested: list[dict] = [] + for c in clusters[:3]: + svc_part = f" on {c['services'][0]}" if c["services"] else "" + suggested.append({ + "kind": "investigate", + "label": f"Investigate: {c['signature'][:60]}", + "query": ( + f"Investigate '{c['signature']}'{svc_part} " + f"between {c['first_ts']} and {c['last_ts']}" + ), + }) + suggested.append({ + "kind": "chat", + "label": f"Summarize the last {window_str}", + "query": f"summarize what happened in the last {window_str}", + }) + suggested.append({ + "kind": "chat", + "label": "Show affected services", + "query": "which services are having problems?", + }) + + return { + "project_id": str(project_id), + "window": window_str, + "time_from": DateHandler.to_iso(time_from), + "time_to": DateHandler.to_iso(time_to), + "anchored_to_latest": anchored_to_latest, + "events": events, + "clusters": clusters, + "services": services, + "suggested_actions": suggested, + } + + @router.get("/projects/{project_id}/services", response_model=List[ProjectService]) async def list_project_services(project_id: UUID): container = get_container() diff --git a/repi/retrieval/event_feed.py b/repi/retrieval/event_feed.py new file mode 100644 index 0000000..5f35801 --- /dev/null +++ b/repi/retrieval/event_feed.py @@ -0,0 +1,266 @@ +"""Heuristic timeline events for the project overview (UX redesign P2). + +The landing page answers "what happened recently?" with EVENTS, not raw +logs: "JWT verification failures begin", "checkout retries spike (×340)", +"auth-service enters degraded state". Events are derived deterministically +from per-(service, signature) time-bucket aggregates — no LLM call, so the +overview is free and instant to load. + +Split into a SQL fetch (`fetch_window_aggregates`) and a pure rule engine +(`derive_events`) so the rules are unit-testable without a database. + +Rules (per error-class (service, signature) series over N buckets): +- begins: first active bucket has count ≥ BEGINS_MIN and the previous + bucket was quiet (skipped when the series is already active + at the window edge — we didn't see it begin). +- spike: a bucket ≥ SPIKE_RATIO × the trailing active average and + ≥ SPIKE_MIN — emitted once, for the biggest such bucket. +- subsides: activity stops ≥ QUIET_BUCKETS before the window end + (total ≥ SUBSIDE_MIN so one stray line doesn't "subside"). +- new_pattern: the signature's first-ever occurrence falls inside the + window (replaces `begins` for that series). +- health: per-service error fraction crosses DEGRADED_FRAC with at + least HEALTH_MIN rows in the bucket → "enters degraded + state"; falls back below RECOVERED_FRAC → "recovers". +""" +from __future__ import annotations + +import logging +from dataclasses import dataclass, asdict +from datetime import datetime, timedelta +from typing import Optional +from uuid import UUID + +import asyncpg + +from repi.core.dates import DateHandler + +logger = logging.getLogger(__name__) + +ERROR_CLASS = {"ERROR", "CRITICAL", "FATAL", "WARN", "WARNING"} + +N_BUCKETS = 24 +BEGINS_MIN = 3 +SPIKE_RATIO = 3.0 +SPIKE_MIN = 5 +QUIET_BUCKETS = 2 +SUBSIDE_MIN = 5 +DEGRADED_FRAC = 0.5 +RECOVERED_FRAC = 0.25 +HEALTH_MIN = 10 + +# Rank used when the event list exceeds max_events: keep the most significant, +# then re-sort chronologically so the story still reads in order. +KIND_PRIORITY = { + "health_degraded": 0, + "new_pattern": 1, + "begins": 2, + "spike": 3, + "health_recovered": 4, + "subsides": 5, +} + + +@dataclass +class TimelineEvent: + kind: str + ts: str # ISO8601 — bucket boundary the event is anchored to + service: Optional[str] + signature: Optional[str] + level: Optional[str] + title: str + count: int = 0 + + def to_dict(self) -> dict: + return asdict(self) + + +def parse_window(window: str) -> timedelta: + """'5h' → timedelta(hours=5). Supports m/h/d suffixes; defaults to 5h on + anything unparseable (the overview should degrade, not 500).""" + try: + unit = window.strip()[-1].lower() + value = int(window.strip()[:-1]) + if value <= 0: + raise ValueError + return {"m": timedelta(minutes=value), + "h": timedelta(hours=value), + "d": timedelta(days=value)}[unit] + except (ValueError, KeyError, IndexError): + logger.warning("parse_window: unparseable window %r — defaulting to 5h", window) + return timedelta(hours=5) + + +async def fetch_window_aggregates( + pool: asyncpg.Pool, + project_id: UUID, + time_from: datetime, + time_to: datetime, + n_buckets: int = N_BUCKETS, + service: Optional[str] = None, +) -> tuple[list[dict], dict[str, datetime]]: + """Bucket counts per (service, signature, level) + first-ever timestamp + per signature (for new_pattern detection).""" + tf = DateHandler.to_aware_utc(time_from) + tt = DateHandler.to_aware_utc(time_to) + rows = await pool.fetch( + """ + SELECT source_service AS service, signature, log_level AS level, + width_bucket(extract(epoch FROM timestamp_start), + extract(epoch FROM $2::timestamptz), + extract(epoch FROM $3::timestamptz), $4) AS bucket, + count(*) AS n + FROM log_chunks + WHERE project_id = $1 + AND timestamp_start >= $2 AND timestamp_start < $3 + AND signature IS NOT NULL AND signature <> '' + AND ($5::text IS NULL OR source_service = $5) + GROUP BY 1, 2, 3, 4 + """, + project_id, tf, tt, n_buckets, service, + ) + buckets = [dict(r) for r in rows] + + sigs = sorted({r["signature"] for r in buckets}) + first_seen: dict[str, datetime] = {} + if sigs: + fs_rows = await pool.fetch( + """ + SELECT signature, MIN(timestamp_start) AS first_ever + FROM log_chunks + WHERE project_id = $1 AND signature = ANY($2) + GROUP BY signature + """, + project_id, sigs, + ) + first_seen = {r["signature"]: r["first_ever"] for r in fs_rows} + return buckets, first_seen + + +def derive_events( + buckets: list[dict], + first_seen: dict[str, datetime], + time_from: datetime, + time_to: datetime, + n_buckets: int = N_BUCKETS, + max_events: int = 25, +) -> list[dict]: + """Pure rule engine: bucket aggregates → chronological event dicts. + + `buckets` rows: {service, signature, level, bucket (1-based, from + width_bucket), n}. `first_seen`: signature → first-ever timestamp. + """ + tf = DateHandler.to_aware_utc(time_from) + tt = DateHandler.to_aware_utc(time_to) + bucket_span = (tt - tf) / n_buckets + + def bucket_start(b: int) -> str: + return DateHandler.to_iso(tf + bucket_span * (b - 1)) + + def bucket_end(b: int) -> str: + return DateHandler.to_iso(tf + bucket_span * b) + + # ── series per (service, signature), error-class rows only ────────────── + series: dict[tuple, dict] = {} + # ── per-service totals per bucket for health events ────────────────────── + svc_total: dict[str, dict[int, int]] = {} + svc_err: dict[str, dict[int, int]] = {} + + for r in buckets: + b = int(r["bucket"]) + if b < 1 or b > n_buckets: + continue + svc = r["service"] + level = (r["level"] or "").upper() + n = int(r["n"]) + svc_total.setdefault(svc, {}) + svc_total[svc][b] = svc_total[svc].get(b, 0) + n + if level not in ERROR_CLASS: + continue + svc_err.setdefault(svc, {}) + svc_err[svc][b] = svc_err[svc].get(b, 0) + n + key = (svc, r["signature"]) + s = series.setdefault(key, {"counts": {}, "level": level}) + s["counts"][b] = s["counts"].get(b, 0) + n + # Keep the most severe level seen for display. + rank = {"FATAL": 3, "CRITICAL": 3, "ERROR": 2, "WARNING": 1, "WARN": 1} + if rank.get(level, 0) > rank.get(s["level"], 0): + s["level"] = level + + events: list[TimelineEvent] = [] + + for (svc, sig), s in series.items(): + counts = s["counts"] + level = s["level"] + active = sorted(counts.keys()) + first_b, last_b = active[0], active[-1] + total = sum(counts.values()) + + is_new = False + fe = first_seen.get(sig) + if fe is not None and DateHandler.to_aware_utc(fe) >= tf: + is_new = True + events.append(TimelineEvent( + kind="new_pattern", ts=bucket_start(first_b), service=svc, + signature=sig, level=level, + title=f"New error pattern: {sig}", count=counts[first_b], + )) + + if not is_new and first_b > 1 and counts[first_b] >= BEGINS_MIN: + events.append(TimelineEvent( + kind="begins", ts=bucket_start(first_b), service=svc, + signature=sig, level=level, + title=f"{sig} begins", count=counts[first_b], + )) + + # Spike: biggest bucket vs trailing active average before it. + best = None + for b in active[1:]: + prior = [counts[x] for x in active if x < b] + trailing_avg = sum(prior) / len(prior) + if counts[b] >= SPIKE_MIN and counts[b] >= SPIKE_RATIO * trailing_avg: + if best is None or counts[b] > counts[best]: + best = b + if best is not None: + events.append(TimelineEvent( + kind="spike", ts=bucket_start(best), service=svc, + signature=sig, level=level, + title=f"{sig} spikes (×{counts[best]})", count=counts[best], + )) + + if total >= SUBSIDE_MIN and last_b <= n_buckets - QUIET_BUCKETS: + events.append(TimelineEvent( + kind="subsides", ts=bucket_end(last_b), service=svc, + signature=sig, level=level, + title=f"{sig} subsides", count=0, + )) + + # ── health transitions per service ─────────────────────────────────────── + for svc, totals in svc_total.items(): + errs = svc_err.get(svc, {}) + degraded = False + for b in range(1, n_buckets + 1): + tot = totals.get(b, 0) + if tot < HEALTH_MIN: + continue + frac = errs.get(b, 0) / tot + if not degraded and frac >= DEGRADED_FRAC: + degraded = True + events.append(TimelineEvent( + kind="health_degraded", ts=bucket_start(b), service=svc, + signature=None, level="ERROR", + title=f"{svc} enters degraded state", count=errs.get(b, 0), + )) + elif degraded and frac <= RECOVERED_FRAC: + degraded = False + events.append(TimelineEvent( + kind="health_recovered", ts=bucket_start(b), service=svc, + signature=None, level="INFO", + title=f"{svc} recovers", count=0, + )) + + if len(events) > max_events: + events.sort(key=lambda e: (KIND_PRIORITY.get(e.kind, 9), -e.count)) + events = events[:max_events] + events.sort(key=lambda e: e.ts) + return [e.to_dict() for e in events] diff --git a/tests/retrieval/test_event_feed.py b/tests/retrieval/test_event_feed.py new file mode 100644 index 0000000..9b244ee --- /dev/null +++ b/tests/retrieval/test_event_feed.py @@ -0,0 +1,150 @@ +"""Rule-engine tests for the project-overview event feed (UX P2). + +derive_events is a pure function over bucket aggregates — these tests pin +each rule (begins / spike / subsides / new_pattern / health) without a DB. +""" +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +from repi.retrieval.event_feed import N_BUCKETS, derive_events, parse_window + +TF = datetime(2026, 6, 1, 0, 0, 0, tzinfo=timezone.utc) +TT = TF + timedelta(hours=24) # bucket span = 1h with N_BUCKETS=24 + + +def _row(sig: str, bucket: int, n: int, service: str = "auth", level: str = "ERROR"): + return {"service": service, "signature": sig, "level": level, "bucket": bucket, "n": n} + + +def _events(buckets, first_seen=None, **kw): + return derive_events(buckets, first_seen or {}, TF, TT, **kw) + + +def _kinds(events, sig=None): + return [e["kind"] for e in events if sig is None or e.get("signature") == sig] + + +# ── parse_window ───────────────────────────────────────────────────────────── + +def test_parse_window_units(): + assert parse_window("5h") == timedelta(hours=5) + assert parse_window("30m") == timedelta(minutes=30) + assert parse_window("7d") == timedelta(days=7) + + +def test_parse_window_garbage_defaults_to_5h(): + assert parse_window("soon") == timedelta(hours=5) + assert parse_window("-3h") == timedelta(hours=5) + + +# ── begins ─────────────────────────────────────────────────────────────────── + +def test_begins_emitted_for_quiet_then_burst(): + events = _events([_row("jwt failed", bucket=5, n=10)]) + assert "begins" in _kinds(events, "jwt failed") + + +def test_no_begins_when_active_at_window_edge(): + """Already firing in bucket 1 — we didn't observe it begin.""" + events = _events([_row("jwt failed", bucket=1, n=10)]) + assert "begins" not in _kinds(events, "jwt failed") + + +def test_no_begins_below_threshold(): + events = _events([_row("jwt failed", bucket=5, n=2)]) + assert "begins" not in _kinds(events, "jwt failed") + + +def test_info_levels_never_produce_events(): + events = _events([_row("served request", bucket=5, n=500, level="INFO")]) + assert _kinds(events, "served request") == [] + + +# ── spike ──────────────────────────────────────────────────────────────────── + +def test_spike_on_3x_trailing_average(): + buckets = [ + _row("db timeout", 2, 4), + _row("db timeout", 3, 4), + _row("db timeout", 10, 40), # 10x the trailing average + ] + events = _events(buckets) + kinds = _kinds(events, "db timeout") + assert "spike" in kinds + spike = next(e for e in events if e["kind"] == "spike") + assert spike["count"] == 40 + assert "×40" in spike["title"] + + +def test_no_spike_for_flat_series(): + buckets = [_row("db timeout", b, 10) for b in range(2, 12)] + assert "spike" not in _kinds(_events(buckets), "db timeout") + + +# ── subsides ───────────────────────────────────────────────────────────────── + +def test_subsides_when_activity_stops_before_window_end(): + buckets = [_row("redis down", 4, 10), _row("redis down", 5, 8)] + events = _events(buckets) + assert "subsides" in _kinds(events, "redis down") + + +def test_no_subsides_when_still_active_at_end(): + buckets = [_row("redis down", N_BUCKETS - 1, 10), _row("redis down", N_BUCKETS, 8)] + assert "subsides" not in _kinds(_events(buckets), "redis down") + + +# ── new_pattern ────────────────────────────────────────────────────────────── + +def test_new_pattern_replaces_begins(): + first_seen = {"jwt sig mismatch": TF + timedelta(hours=4)} + events = _events([_row("jwt sig mismatch", 5, 10)], first_seen) + kinds = _kinds(events, "jwt sig mismatch") + assert "new_pattern" in kinds + assert "begins" not in kinds + + +def test_old_pattern_is_not_new(): + first_seen = {"jwt failed": TF - timedelta(days=30)} + events = _events([_row("jwt failed", 5, 10)], first_seen) + assert "new_pattern" not in _kinds(events, "jwt failed") + + +# ── health transitions ─────────────────────────────────────────────────────── + +def test_service_degraded_and_recovers(): + buckets = [ + # bucket 3: 20 rows, 15 errors → degraded (75%) + _row("boom", 3, 15, service="pay", level="ERROR"), + _row("ok", 3, 5, service="pay", level="INFO"), + # bucket 8: 20 rows, 2 errors → recovered (10%) + _row("boom", 8, 2, service="pay", level="ERROR"), + _row("ok", 8, 18, service="pay", level="INFO"), + ] + events = _events(buckets) + kinds = [e["kind"] for e in events if e["service"] == "pay"] + assert "health_degraded" in kinds + assert "health_recovered" in kinds + # Degraded must precede recovered chronologically. + assert kinds.index("health_degraded") < kinds.index("health_recovered") + + +def test_no_health_event_below_volume_floor(): + buckets = [ + _row("boom", 3, 4, service="pay", level="ERROR"), + _row("ok", 3, 1, service="pay", level="INFO"), + ] + events = _events(buckets) + assert all(e["kind"] != "health_degraded" for e in events) + + +# ── cap + ordering ─────────────────────────────────────────────────────────── + +def test_events_capped_and_chronological(): + buckets = [] + for i in range(40): + buckets.append(_row(f"sig-{i}", bucket=3 + (i % 10), n=10, service=f"svc{i}")) + events = _events(buckets, max_events=10) + assert len(events) == 10 + assert [e["ts"] for e in events] == sorted(e["ts"] for e in events) diff --git a/web/app/page.tsx b/web/app/page.tsx index 72ebb16..5e0d14a 100644 --- a/web/app/page.tsx +++ b/web/app/page.tsx @@ -6,6 +6,8 @@ import { ChatInput } from "@/components/chat/ChatInput" import { ChatMessageView, ChatMessageProps } from "@/components/chat/ChatMessage" import { ConversationSidebar } from "@/components/conversations/ConversationSidebar" import { InvestigationStepCard } from "@/components/investigation-step" +import { ProjectPicker } from "@/components/projects/ProjectPicker" +import { ProjectOverview, type SuggestedAction } from "@/components/projects/ProjectOverview" import { Step, useSSE } from "@/lib/sse" import { Badge } from "@/components/ui/badge" import { Sparkles } from "lucide-react" @@ -69,6 +71,9 @@ function buildChatHistory(turns: Turn[]): { role: "user" | "assistant"; content: export default function HomePage() { const [conversationId, setConversationId] = useState(null) + // Context-before-investigation: every conversation is scoped to a project. + // null → show the picker (which auto-selects when only one project exists). + const [project, setProject] = useState<{ id: string; name: string } | null>(null) const [deepResearch, setDeepResearch] = useState(false) const [turns, setTurns] = useState([]) const [busy, setBusy] = useState(false) @@ -97,11 +102,19 @@ export default function HomePage() { const loadConversation = useCallback(async (id: string | null) => { setConversationId(id) if (!id) { + // New conversation → back to project selection (the picker auto-skips + // itself when exactly one project exists). + setProject(null) setTurns([]) return } try { const detail = await api.conversations.get(id) + setProject( + detail.project_id + ? { id: detail.project_id, name: detail.project_name ?? "Project" } + : null, + ) const rendered: Turn[] = detail.turns.map((t: any, idx: number) => { if (t.mode === "chat") { return { @@ -136,7 +149,7 @@ export default function HomePage() { if (dr) { // Toggle ON → kick off a real investigation, embed its SSE stream. try { - const res = await api.investigations.create(query, conversationId ?? undefined) + const res = await api.investigations.create(query, conversationId ?? undefined, project?.id) if (!conversationId && res.conversation_id) { setConversationId(res.conversation_id) } @@ -170,6 +183,7 @@ export default function HomePage() { body: JSON.stringify({ query, conversation_id: conversationId ?? undefined, + project_id: project?.id ?? undefined, history, previous_chunk_ids: previousChunkIds, }), @@ -269,6 +283,34 @@ export default function HomePage() { const empty = turns.length === 0 + // Suggested-action chips from the overview: investigate → Deep Research + // path; chat → normal /chat turn. Both flow through handleSend so the + // conversation/threading behaviour is identical to typing the query. + function handleSuggestedAction(action: SuggestedAction) { + if (action.kind === "investigate") { + setDeepResearch(true) + handleSend(action.query, true) + } else { + handleSend(action.query, false) + } + } + + // New chat, no project yet → step 1 of the flow: pick (or create) a project. + if (!conversationId && !project) { + return ( +
+ +
+ setProject({ id: p.id, name: p.name })} /> +
+
+ ) + } + return (
{empty ? ( -
-
- -
-

Chat with your logs

-

- Hybrid retrieval over your ingested logs surfaces a chronological{" "} - timeline and the{" "} - event clusters behind your - question. Toggle{" "} - Deep Research for a - full autonomous root-cause investigation. -

-
+ // Timeline-first landing: no empty chat screen. The overview + // answers "what happened recently?" before the user types. + project ? ( + + ) : null ) : ( turns.map((t, i) => t.mode === "chat" ? ( diff --git a/web/components/conversations/ConversationSidebar.tsx b/web/components/conversations/ConversationSidebar.tsx index c9c1674..1f6be29 100644 --- a/web/components/conversations/ConversationSidebar.tsx +++ b/web/components/conversations/ConversationSidebar.tsx @@ -9,6 +9,7 @@ import { cn } from "@/lib/utils" type ConversationSummary = { id: string title: string | null + project_name?: string | null created_at: string updated_at: string } @@ -67,7 +68,14 @@ export function ConversationSidebar({ activeId, onSelect, refreshKey }: Conversa )} > - {c.title || "Untitled"} + + {c.title || "Untitled"} + {c.project_name && ( + + {c.project_name} + + )} + ))}
diff --git a/web/components/projects/ProjectOverview.tsx b/web/components/projects/ProjectOverview.tsx new file mode 100644 index 0000000..e23688f --- /dev/null +++ b/web/components/projects/ProjectOverview.tsx @@ -0,0 +1,191 @@ +"use client" + +import { useEffect, useState } from "react" +import { api } from "@/lib/api" +import { Badge } from "@/components/ui/badge" +import { Button } from "@/components/ui/button" +import { Spinner } from "@/components/ui/spinner" +import { EventClusters, type Cluster } from "@/components/chat/EventClusters" +import { cn } from "@/lib/utils" +import { levelTone } from "@/lib/log-levels" +import { Activity, Microscope, MessageSquare, Server } from "lucide-react" +import { toast } from "sonner" + +export type OverviewEvent = { + kind: string + ts: string + service: string | null + signature: string | null + level: string | null + title: string + count: number +} + +export type SuggestedAction = { + kind: "investigate" | "chat" + label: string + query: string +} + +type Overview = { + window: string + time_from: string + time_to: string + anchored_to_latest: boolean + events: OverviewEvent[] + clusters: Cluster[] + services: { name: string; chunk_count: number; last_seen: string | null }[] + suggested_actions: SuggestedAction[] +} + +function shortTs(iso: string): string { + if (!iso) return "" + return iso.includes("T") ? iso.split("T")[1].slice(0, 8) : iso +} + +function shortDate(iso: string): string { + return iso?.split("T")[0] ?? "" +} + +const KIND_LABEL: Record = { + begins: "begins", + spike: "spike", + subsides: "subsides", + new_pattern: "new pattern", + health_degraded: "degraded", + health_recovered: "recovered", +} + +interface ProjectOverviewProps { + projectId: string + projectName: string + onAction: (action: SuggestedAction) => void +} + +/** + * The landing panel of a new project conversation: heuristic event timeline + * ("what happened recently?"), top error clusters ("what is breaking?"), + * services, and suggested next actions. Fetched live on mount — it reflects + * the data at view time, deliberately not persisted as a chat message. + */ +export function ProjectOverview({ projectId, projectName, onAction }: ProjectOverviewProps) { + const [overview, setOverview] = useState(null) + const [failed, setFailed] = useState(false) + + useEffect(() => { + let cancelled = false + setOverview(null) + api.projects.overview(projectId) + .then((d: Overview) => { if (!cancelled) setOverview(d) }) + .catch((e: any) => { + if (!cancelled) { + setFailed(true) + toast.error("Could not load project overview: " + e.message) + } + }) + return () => { cancelled = true } + }, [projectId]) + + if (failed) return null + if (!overview) { + return ( +
+ +
+ ) + } + + const hasData = overview.events.length > 0 || overview.clusters.length > 0 + // Show the date alongside times when the window isn't "today". + const spansDays = shortDate(overview.time_from) !== shortDate(overview.time_to) + + return ( +
+
+ + {projectName} + last {overview.window} + {overview.anchored_to_latest && ( + + showing latest data ({shortDate(overview.time_to)}) + + )} +
+ + {!hasData ? ( +
+ No log data in this project yet. Ingest a file or register a watcher, + then this timeline fills in. +
+ ) : ( + <> + {overview.events.length > 0 && ( +
+
+ Timeline + + what happened recently + +
+
+ {overview.events.map((e, i) => ( +
+
+
{shortTs(e.ts)}
+ {spansDays &&
{shortDate(e.ts)}
} +
+
+ + {KIND_LABEL[e.kind] ?? e.kind} + + {e.service && ( + + {e.service} + + )} + {e.title} +
+
+ ))} +
+
+ )} + + {overview.clusters.length > 0 && ( + + )} + + )} + + {overview.services.length > 0 && ( +
+ + {overview.services.map((s) => ( + + {s.name} + + ))} +
+ )} + + {hasData && overview.suggested_actions.length > 0 && ( +
+ {overview.suggested_actions.map((a, i) => ( + + ))} +
+ )} +
+ ) +} diff --git a/web/components/projects/ProjectPicker.tsx b/web/components/projects/ProjectPicker.tsx new file mode 100644 index 0000000..65fea2f --- /dev/null +++ b/web/components/projects/ProjectPicker.tsx @@ -0,0 +1,114 @@ +"use client" + +import { useEffect, useState } from "react" +import { api } from "@/lib/api" +import { Button } from "@/components/ui/button" +import { Input } from "@/components/ui/input" +import { Spinner } from "@/components/ui/spinner" +import { FolderOpen, Plus } from "lucide-react" +import { toast } from "sonner" + +export type ProjectSummary = { + id: string + name: string + settings: Record + service_count: number +} + +interface ProjectPickerProps { + onSelect: (project: ProjectSummary) => void +} + +/** + * New-chat step 1: pick the project to scope the conversation to. + * - 0 projects → inline create form + * - exactly 1 → auto-select, no picker shown + * - 2+ → cards + */ +export function ProjectPicker({ onSelect }: ProjectPickerProps) { + const [projects, setProjects] = useState(null) + const [newName, setNewName] = useState("") + const [creating, setCreating] = useState(false) + + useEffect(() => { + let cancelled = false + api.projects.list() + .then((rows: ProjectSummary[]) => { + if (cancelled) return + if (rows.length === 1) onSelect(rows[0]) + else setProjects(rows) + }) + .catch((e: any) => { + if (!cancelled) toast.error("Could not load projects: " + e.message) + setProjects([]) + }) + return () => { cancelled = true } + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []) + + async function createProject() { + const name = newName.trim() + if (!name) return + setCreating(true) + try { + const p = await api.projects.create(name) + onSelect(p) + } catch (e: any) { + toast.error("Could not create project: " + e.message) + } finally { + setCreating(false) + } + } + + if (projects === null) { + return ( +
+ +
+ ) + } + + return ( +
+
+ +
+

Select a project

+

+ Conversations, timelines and investigations are scoped to one project. +

+ + {projects.length > 0 && ( +
+ {projects.map((p) => ( + + ))} +
+ )} + +
+ setNewName(e.target.value)} + onKeyDown={(e) => { if (e.key === "Enter") createProject() }} + /> + +
+
+ ) +} diff --git a/web/lib/api.ts b/web/lib/api.ts index dd7fff1..201794f 100644 --- a/web/lib/api.ts +++ b/web/lib/api.ts @@ -36,10 +36,14 @@ export const api = { investigations: { list: () => fetchApi("/investigations"), get: (id: string) => fetchApi(`/investigations/${id}`), - create: (query: string, conversation_id?: string) => + create: (query: string, conversation_id?: string, project_id?: string) => fetchApi("/investigate", { method: "POST", - body: JSON.stringify(conversation_id ? { query, conversation_id } : { query }), + body: JSON.stringify({ + query, + ...(conversation_id ? { conversation_id } : {}), + ...(project_id ? { project_id } : {}), + }), }), clarify: (id: string, reply: string) => fetchApi(`/investigations/${id}/clarify`, { method: "POST", body: JSON.stringify({ reply }) }), }, @@ -47,5 +51,18 @@ export const api = { list: () => fetchApi("/conversations"), get: (id: string) => fetchApi(`/conversations/${id}`), }, + projects: { + list: () => fetchApi("/projects"), + create: (name: string) => fetchApi("/projects", { method: "POST", body: JSON.stringify({ name }) }), + update: (id: string, data: any) => fetchApi(`/projects/${id}`, { method: "PATCH", body: JSON.stringify(data) }), + services: (id: string) => fetchApi(`/projects/${id}/services`), + overview: (id: string, window?: string, service?: string) => { + const params = new URLSearchParams(); + if (window) params.set("window", window); + if (service) params.set("service", service); + const qs = params.toString(); + return fetchApi(`/projects/${id}/overview${qs ? `?${qs}` : ""}`); + }, + }, }; From e2b2e78dd97a3e245e4b1caa6acaa738ed0001cb Mon Sep 17 00:00:00 2001 From: Varun Singh Date: Fri, 12 Jun 2026 11:20:05 +0530 Subject: [PATCH 3/7] feat(ingestion): real-log parser coverage + readable signatures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - parse Apache error-log format ([Sun Dec 04 ...] [error] ...) with level mapping - tag syslog pam/sshd auth-failure bodies WARNING so error-scans surface them - signature masking: collapse IPv4 to ; preserve HTTP status codes, protocol versions, and mid-identifier digits (jk2_init) — high-cardinality tokens still mask - sources.md records public datasets used for real-log testing --- repi/ingestion/log_chunker.py | 34 +++++++++++++- repi/ingestion/log_parser.py | 29 ++++++++++++ sources.md | 16 +++++++ tests/ingestion/test_log_chunker.py | 73 +++++++++++++++++++++++++++++ tests/ingestion/test_log_parser.py | 33 +++++++++++++ 5 files changed, 183 insertions(+), 2 deletions(-) create mode 100644 sources.md create mode 100644 tests/ingestion/test_log_chunker.py diff --git a/repi/ingestion/log_chunker.py b/repi/ingestion/log_chunker.py index 6e26975..95450f0 100644 --- a/repi/ingestion/log_chunker.py +++ b/repi/ingestion/log_chunker.py @@ -18,11 +18,41 @@ class ChunkedLog: time_range: str log_level: str = "INFO" +_IPV4_RE = re.compile(r'\b\d{1,3}(?:\.\d{1,3}){3}\b') +_NUM_RE = re.compile(r'\d+') +# Context immediately before a 3-digit number that marks it as an HTTP status +# code ('status 404', 'code=502', '"GET / HTTP/1.1" 200'). +_STATUS_CTX = re.compile(r'(?:status|code|http)[^a-zA-Z]{0,8}$', re.IGNORECASE) +# Protocol/version digits ('HTTP/1.1') — low-cardinality, keep readable. +_HTTP_VERSION_CTX = re.compile(r'http/[\d.]*$', re.IGNORECASE) + def get_signature(message: str) -> str: - """Mask numbers, hex IDs, and UUIDs to find log signatures.""" + """Mask high-cardinality tokens (hex IDs, UUIDs, IPs, numbers) to find log + signatures. Low-cardinality numbers that carry meaning are preserved: + HTTP status codes, protocol versions, and short digit runs inside + identifiers (jk2_init, utf8_decode) — masking those splits or mislabels + clusters without reducing cardinality.""" message = re.sub(r'0x[0-9a-fA-F]+', '', message) message = re.sub(r'[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}', '', message) - message = re.sub(r'\d+', '', message) + message = _IPV4_RE.sub('', message) + + def _mask(m: re.Match) -> str: + tok = m.group() + s, start, end = m.string, m.start(), m.end() + # Mid-identifier digits followed by more identifier chars (jk2_init). + # Trailing digits (node1, worker23) still mask so per-instance names + # collapse into one cluster. + if len(tok) <= 3 and start > 0 and s[start - 1].isalpha() \ + and end < len(s) and (s[end].isalpha() or s[end] == '_'): + return tok + prefix = s[max(0, start - 12):start] + if _HTTP_VERSION_CTX.search(prefix): + return tok + if len(tok) == 3 and tok[0] in '12345' and _STATUS_CTX.search(prefix): + return tok + return '' + + message = _NUM_RE.sub(_mask, message) return " ".join(message.split()) def chunk_logs(logs: List[ParsedLog], window_seconds: int = 30) -> List[ChunkedLog]: diff --git a/repi/ingestion/log_parser.py b/repi/ingestion/log_parser.py index fc2cfbf..00e490b 100644 --- a/repi/ingestion/log_parser.py +++ b/repi/ingestion/log_parser.py @@ -32,11 +32,29 @@ class ParsedLog: # ("error: maximum authentication attempts exceeded"). SYSLOG_LEVEL_HINT = re.compile(r"(?Perror|warning|fatal)\b[: ]", re.IGNORECASE) +# pam_unix/sshd log auth failures at default severity with no level token +# ("authentication failure; logname= ..."). Left as INFO, error-focused +# tools (scan_window) never surface real brute-force incidents. +SYSLOG_FAILURE_HINT = re.compile(r"authentication failure|failed password", re.IGNORECASE) + # Apache/nginx access log: '1.2.3.4 - - [10/Oct/2000:13:55:36 -0700] "GET / HTTP/1.0" 200 ...' ACCESS_LOG_PATTERN = re.compile( r"(?P\S+) \S+ \S+ \[(?P\d{2}/[A-Z][a-z]{2}/\d{4}:\d{2}:\d{2}:\d{2} [+-]\d{4})\] (?P.+)" ) +# Apache error log: '[Sun Dec 04 04:47:44 2005] [error] mod_jk child workerEnv in error state 6' +APACHE_ERROR_PATTERN = re.compile( + r"\[(?P[A-Z][a-z]{2} [A-Z][a-z]{2} {1,2}\d{1,2} \d{2}:\d{2}:\d{2} \d{4})\] \[(?P[a-z]+)\] (?P.+)" +) + +# Apache error-log severities → the level vocabulary the rest of the +# pipeline filters on (INFO/WARNING/ERROR/CRITICAL/DEBUG). +APACHE_LEVEL_MAP = { + "emerg": "CRITICAL", "alert": "CRITICAL", "crit": "CRITICAL", + "error": "ERROR", "warn": "WARNING", "notice": "INFO", + "info": "INFO", "debug": "DEBUG", +} + def _parse_timestamp(ts_str: str | None) -> datetime | None: if not ts_str: return None @@ -49,6 +67,7 @@ def _parse_timestamp(ts_str: str | None) -> datetime | None: "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S.%fZ", "%d/%b/%Y:%H:%M:%S %z", # apache/nginx access log + "%a %b %d %H:%M:%S %Y", # apache error log "%b %d %H:%M:%S" # Syslog ] @@ -107,6 +126,14 @@ def parse_log_line(line: str) -> ParsedLog: logger.debug(f"Parser: Matched text log (level={level})") return ParsedLog(timestamp=ts_str, level=level, message=message, parsed_timestamp=_parse_timestamp(ts_str)) + # Try apache error log ("[Sun Dec 04 04:47:44 2005] [error] message") + match = APACHE_ERROR_PATTERN.match(line) + if match: + ts_str = match.group("timestamp") + level = APACHE_LEVEL_MAP.get(match.group("level"), "INFO") + logger.debug(f"Parser: Matched apache error log (level={level})") + return ParsedLog(timestamp=ts_str, level=level, message=match.group("message"), parsed_timestamp=_parse_timestamp(ts_str)) + # Try syslog ("Dec 10 06:55:46 host sshd[24200]: message") match = SYSLOG_PATTERN.match(line) if match: @@ -118,6 +145,8 @@ def parse_log_line(line: str) -> ParsedLog: hint = SYSLOG_LEVEL_HINT.match(body) if sep else None if hint: level = hint.group("level").upper() + elif sep and SYSLOG_FAILURE_HINT.search(body): + level = "WARNING" logger.debug(f"Parser: Matched syslog (level={level})") return ParsedLog(timestamp=ts_str, level=level, message=message, parsed_timestamp=_parse_timestamp(ts_str)) diff --git a/sources.md b/sources.md new file mode 100644 index 0000000..7138b06 --- /dev/null +++ b/sources.md @@ -0,0 +1,16 @@ +# Test data sources + +Provenance for all public datasets used to test repi against real-world (non-synthetic) logs. +Raw files live under `tmp-ui-tests/real-logs/` (not committed). + +| Dataset | Source | Format | Why it was chosen | +|---|---|---|---| +| `Linux_2k.log` | [loghub — Linux](https://github.com/logpai/loghub/blob/master/Linux/Linux_2k.log) | syslog (`Mon DD HH:MM:SS host proc[pid]: msg`) | Exercises the syslog parser path, year-less timestamps, and security-incident content (sshd auth failures, ftpd connections) — realistic "why did login fail" investigations. | +| `Apache_2k.log` | [loghub — Apache](https://github.com/logpai/loghub/blob/master/Apache/Apache_2k.log) | Apache error log (`[Day Mon DD HH:MM:SS YYYY] [level] msg`) | A common production format the parser did **not** initially support (0% timestamps, levels lost) — regression target for parser coverage. | +| `Zookeeper_2k.log` | [loghub — Zookeeper](https://github.com/logpai/loghub/blob/master/Zookeeper/Zookeeper_2k.log) | log4j (`YYYY-MM-DD HH:MM:SS,mmm - LEVEL [thread] - msg`) | Distributed-system app logs with real WARN/ERROR bursts (leader election, connection breaks) — good for multi-cluster investigation quality. | + +All loghub datasets are published by the [LogPAI](https://github.com/logpai/loghub) project, collected from real systems and freely available for research purposes (see their repo for citation/terms). + +Notes: +- loghub timestamps are historical (2005–2015), so relative-time queries ("last hour") won't match them; tests use explicit or entity-based queries, which also exercises the no-time-window investigation path. +- The 2k-line "_2k" samples are used instead of full datasets to keep ingest fast and LLM costs bounded. diff --git a/tests/ingestion/test_log_chunker.py b/tests/ingestion/test_log_chunker.py new file mode 100644 index 0000000..0e7b47e --- /dev/null +++ b/tests/ingestion/test_log_chunker.py @@ -0,0 +1,73 @@ +"""Signature masking coverage against real-world log lines (LogHub-style). + +Masking exists to collapse high-cardinality tokens (IDs, IPs, counts) so logs +cluster by template. But blanket digit-masking also destroyed meaningful, +low-cardinality numbers — HTTP status codes, protocol versions, digits inside +identifiers — making signatures unreadable and occasionally splitting clusters. +""" +from repi.ingestion.log_chunker import get_signature + + +# ── High-cardinality tokens still mask ─────────────────────────────────────── + +def test_pids_and_counts_masked(): + sig = get_signature("sshd(pam_unix)[19939]: authentication failure; uid=0 euid=0") + assert "[]" in sig + assert "uid=" in sig + assert "19939" not in sig + + +def test_uuid_and_hex_masked(): + sig = get_signature("request 550e8400-e29b-41d4-a716-446655440000 failed at 0xDEADBEEF") + assert "" in sig + assert "" in sig + + +def test_trailing_instance_digits_masked_so_nodes_cluster_together(): + # node1/node2/node3 must produce the SAME signature + assert get_signature("connection lost to node1") == get_signature("connection lost to node2") + assert "" in get_signature("connection lost to node1") + + +# ── IPv4 collapses to one readable token ───────────────────────────────────── + +def test_ipv4_masked_as_single_ip_token(): + sig = get_signature("connection from 218.188.2.4 () at Sun Jul 10 03:55:14 2005") + assert "" in sig + assert "." not in sig + + +def test_two_hosts_same_signature(): + a = get_signature("Received connection request /10.10.34.11:45307") + b = get_signature("Received connection request /10.10.34.12:45308") + assert a == b + + +# ── Meaningful numbers survive ─────────────────────────────────────────────── + +def test_http_status_code_preserved_in_access_log(): + sig = get_signature('66.249.66.1 - - "GET /index.html HTTP/1.1" 404 2326') + assert "404" in sig + assert "HTTP/1.1" in sig + assert "2326" not in sig # response bytes are high-cardinality + + +def test_status_keyword_preserves_code(): + sig = get_signature("upstream returned status 502 for request 8812345") + assert "status 502" in sig + assert "8812345" not in sig + + +def test_mid_identifier_digits_preserved(): + sig = get_signature("jk2_init() Found child 6725 in scoreboard slot 10") + assert "jk2_init()" in sig + assert "6725" not in sig + assert "slot " in sig + + +# ── Stability: same template, different values → same signature ────────────── + +def test_apache_template_clusters_across_values(): + a = get_signature("mod_jk child workerEnv in error state 6") + b = get_signature("mod_jk child workerEnv in error state 7") + assert a == b diff --git a/tests/ingestion/test_log_parser.py b/tests/ingestion/test_log_parser.py index 4f53cef..3bd495a 100644 --- a/tests/ingestion/test_log_parser.py +++ b/tests/ingestion/test_log_parser.py @@ -89,3 +89,36 @@ def test_unknown_format_falls_back_to_plain_message(): assert p.level == "INFO" assert p.parsed_timestamp is None assert p.message == "completely unstructured line with no timestamp" + +# ── Apache error log ───────────────────────────────────────────────────────── + +def test_apache_error_log_timestamp_and_level(): + p = parse_log_line("[Sun Dec 04 04:51:18 2005] [error] mod_jk child workerEnv in error state 6") + assert p.level == "ERROR" + assert p.message == "mod_jk child workerEnv in error state 6" + assert p.parsed_timestamp == datetime(2005, 12, 4, 4, 51, 18) + + +def test_apache_notice_maps_to_info(): + p = parse_log_line("[Sun Dec 04 04:47:44 2005] [notice] workerEnv.init() ok /etc/httpd/conf/workers2.properties") + assert p.level == "INFO" + assert p.parsed_timestamp == datetime(2005, 12, 4, 4, 47, 44) + + +def test_apache_warn_maps_to_warning(): + p = parse_log_line("[Mon Dec 05 07:57:02 2005] [warn] jk2_init() Can't find child 2114 in scoreboard") + assert p.level == "WARNING" + + +# ── Syslog auth-failure severity hint ──────────────────────────────────────── + +def test_syslog_pam_auth_failure_tagged_warning(): + p = parse_log_line( + "Jun 14 15:16:01 combo sshd(pam_unix)[19939]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=218.188.2.4" + ) + assert p.level == "WARNING" + + +def test_syslog_failed_password_tagged_warning(): + p = parse_log_line("Dec 10 07:13:31 LabSZ sshd[24206]: Failed password for invalid user test9 from 52.80.34.196 port 36060 ssh2") + assert p.level == "WARNING" From 0dc20d0a70aafc04033bc803b8b30727e310b0ec Mon Sep 17 00:00:00 2001 From: Varun Singh Date: Fri, 12 Jun 2026 11:25:50 +0530 Subject: [PATCH 4/7] perf(llm): cut token usage per turn + wait out 429s on every provider MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - _compact_observation clips tool results fed back to the LLM (lists to 10 items, text to 300 chars, valid-JSON fallbacks) — full results still persisted to DB/ledger; sweep context no longer pretty-printed - shared _post_with_429_retry: Retry-After-aware 429 waits for OpenAI, Anthropic, Gemini (previously Mistral-only) - loop retries honor retry_after and never retry LLMBadRequestError --- repi/investigation/react_loop.py | 66 +++++- repi/llm/adapters.py | 189 +++++++++--------- .../investigation/test_compact_observation.py | 52 +++++ tests/llm/test_adapters.py | 76 +++++++ 4 files changed, 288 insertions(+), 95 deletions(-) create mode 100644 tests/investigation/test_compact_observation.py diff --git a/repi/investigation/react_loop.py b/repi/investigation/react_loop.py index f0e6453..a598c52 100644 --- a/repi/investigation/react_loop.py +++ b/repi/investigation/react_loop.py @@ -12,6 +12,7 @@ import asyncpg from repi.core.dates import DateHandler, default_date_handler as _dh +from repi.llm.adapters import LLMRateLimitError, LLMBadRequestError from repi.llm.provider import LLMProvider, Message from repi.investigation.tools import ToolCall, ToolResult, TOOL_SCHEMAS from repi.retrieval.heuristics import cluster_logs @@ -159,6 +160,47 @@ async def _wait_for_rate_limit(self): self._llm_call_timestamps = [t for t in self._llm_call_timestamps if now - t < 60] self._llm_call_timestamps.append(now) + # Caps for what a single observation may contribute to the LLM + # conversation. Tool results are re-sent on EVERY subsequent turn, so an + # uncapped scan_window result multiplies across the whole loop — this is + # the main driver of token-per-minute 429s. The full, untruncated result + # is still persisted to the DB and the tool ledger. + MAX_OBS_ITEMS = 10 + MAX_OBS_TEXT_CHARS = 300 + MAX_OBS_TOTAL_CHARS = 6000 + + @classmethod + def _compact_observation(cls, result: Any) -> str: + """Serialize a tool result for the LLM conversation, clipping long + lists and long text fields with explicit markers so the model knows + evidence was elided (and can narrow its next query).""" + + def _walk(node: Any, max_items: int, max_chars: int) -> Any: + if isinstance(node, dict): + return {k: _walk(v, max_items, max_chars) for k, v in node.items()} + if isinstance(node, list): + clipped = [_walk(x, max_items, max_chars) for x in node[:max_items]] + if len(node) > max_items: + clipped.append(f"... {len(node) - max_items} more items truncated") + return clipped + if isinstance(node, str) and len(node) > max_chars: + return node[:max_chars] + "...[truncated]" + return node + + s = json.dumps(_walk(result, cls.MAX_OBS_ITEMS, cls.MAX_OBS_TEXT_CHARS), default=str) + if len(s) > cls.MAX_OBS_TOTAL_CHARS: + # Re-walk with much tighter caps rather than slicing the JSON + # string (which would hand the model malformed JSON). + s = json.dumps(_walk(result, 3, 120), default=str) + if len(s) > cls.MAX_OBS_TOTAL_CHARS: + # Pathological shape (e.g. hundreds of keys) — wrap a hard slice + # inside a fresh JSON object so the payload stays parseable. + s = json.dumps({ + "note": "observation too large; showing a truncated excerpt", + "excerpt": s[:cls.MAX_OBS_TOTAL_CHARS], + }) + return s + def _extract_chunks(self, tool_result: Any) -> list[dict]: """Collect every dict-with-chunk_id from a tool result. Walks nested list-valued fields (e.g. scan_window's `logs` and `pre_context_logs`) @@ -334,7 +376,7 @@ async def investigate( exclude_services=[] ) - sweep_msg = f"SWEEP CONTEXT:\n{json.dumps(sweep_results, indent=2)}\n\n" + sweep_msg = f"SWEEP CONTEXT:\n{self._compact_observation(sweep_results)}\n\n" if resolved_intent.assumed: sweep_msg += "ASSUMPTIONS:\n" + "\n".join(f"- {a}" for a in resolved_intent.assumed) + "\n" @@ -375,7 +417,7 @@ async def investigate( messages.append(Message(role="assistant", content=json.dumps(llm_payload))) if observation: res = observation.tool_result.result if observation.tool_result.result is not None else {"error": observation.tool_result.error} - messages.append(Message(role="user", content=f"Observation:\n{json.dumps(res, default=str)}")) + messages.append(Message(role="user", content=f"Observation:\n{self._compact_observation(res)}")) # Seed the ledger from replayed steps so dedupe survives resume. if action and observation and observation.tool_result.result is not None: @@ -425,10 +467,17 @@ async def investigate( await self._wait_for_rate_limit() raw_reflection = await self.llm.complete(messages) break + except LLMBadRequestError as e: + # Bad payload/auth — retrying the identical request can't succeed. + logger.error(f"Reflection {next_step_number}: non-retryable LLM error: {e}") + break except Exception as e: logger.warning(f"Reflection {next_step_number} attempt {_refl_retry+1}/3 failed: {e}") if _refl_retry < 2: - await asyncio.sleep(15 * (2 ** _refl_retry)) + delay = 15 * (2 ** _refl_retry) + if isinstance(e, LLMRateLimitError) and e.retry_after: + delay = e.retry_after + 1 + await asyncio.sleep(delay) if raw_reflection is None: logger.error(f"Reflection {next_step_number}: LLM call failed after 3 retries, skipping") @@ -522,10 +571,17 @@ async def investigate( await self._wait_for_rate_limit() raw_response = await self.llm.complete(messages) break + except LLMBadRequestError as e: + # Bad payload/auth — retrying the identical request can't succeed. + logger.error(f"Step {next_step_number}: non-retryable LLM error: {e}") + break except Exception as e: logger.warning(f"Step {next_step_number} LLM call attempt {_llm_retry+1}/3 failed: {e}") if _llm_retry < 2: - await asyncio.sleep(15 * (2 ** _llm_retry)) + delay = 15 * (2 ** _llm_retry) + if isinstance(e, LLMRateLimitError) and e.retry_after: + delay = e.retry_after + 1 + await asyncio.sleep(delay) if raw_response is None: logger.error(f"Step {next_step_number}: LLM call failed after 3 retries, ending gathering") @@ -677,7 +733,7 @@ async def investigate( if observation and observation.tool_result: res = observation.tool_result.result if observation.tool_result.result is not None else {"error": observation.tool_result.error} prefix = "(repeat call — returning cached result)\n" if is_repeat_call else "" - messages.append(Message(role="user", content=f"{prefix}Observation:\n{json.dumps(res, default=str)}")) + messages.append(Message(role="user", content=f"{prefix}Observation:\n{self._compact_observation(res)}")) if tool_call_ledger: messages[0] = Message( diff --git a/repi/llm/adapters.py b/repi/llm/adapters.py index 4525af1..2f1f382 100644 --- a/repi/llm/adapters.py +++ b/repi/llm/adapters.py @@ -35,6 +35,46 @@ def __init__(self, provider: str, model: str, status_code: int, body: str): self.body = body +async def _post_with_429_retry(url: str, *, provider: str, model: str, + headers: Optional[dict] = None, json_body: dict, + max_rate_limit_waits: int = 5, + base_delay: float = 15.0) -> httpx.Response: + """POST, waiting out 429s instead of failing: honor Retry-After when the + provider sends it, exponential backoff with jitter otherwise. Raises + LLMRateLimitError once max_rate_limit_waits is exhausted. Non-429 + responses are returned as-is for the caller to classify.""" + import random + waits = 0 + while True: + async with httpx.AsyncClient(timeout=60.0) as client: + response = await client.post(url, headers=headers, json=json_body) + + if response.status_code != 429: + return response + + waits += 1 + if waits > max_rate_limit_waits: + raise LLMRateLimitError(provider, model) + + delay = None + retry_after = response.headers.get("Retry-After") + if retry_after: + try: + delay = float(retry_after) + random.uniform(0, 2) + source = "Retry-After header" + except ValueError: + delay = None + if delay is None: + delay = base_delay * (2 ** min(waits, 3)) + random.uniform(0, 5) + source = "exponential backoff" + + logger.warning( + "%s 429 — waiting %.1fs (source: %s) — wait #%d/%d", + provider, delay, source, waits, max_rate_limit_waits, + ) + await asyncio.sleep(delay) + + def _check_4xx(response: httpx.Response, provider: str, model: str) -> None: """Raise typed errors for 4xx responses. 429 surfaces as a retryable rate-limit error; other 4xx as bad-request (do not retry).""" @@ -64,20 +104,19 @@ def __init__(self, api_key: str, model: str = "gpt-4o"): async def complete(self, messages: List[Message], max_tokens: int = 2000, temperature: float = 0.0) -> str: try: - async with httpx.AsyncClient(timeout=60.0) as client: - response = await client.post( - self._url, - headers={"Authorization": f"Bearer {self._api_key}"}, - json={ - "model": self._model, - "messages": [{"role": m.role, "content": m.content} for m in messages], - "max_tokens": max_tokens, - "temperature": temperature - } - ) - _check_4xx(response, "openai", self._model) - response.raise_for_status() - return response.json()["choices"][0]["message"]["content"] + response = await _post_with_429_retry( + self._url, provider="openai", model=self._model, + headers={"Authorization": f"Bearer {self._api_key}"}, + json_body={ + "model": self._model, + "messages": [{"role": m.role, "content": m.content} for m in messages], + "max_tokens": max_tokens, + "temperature": temperature + } + ) + _check_4xx(response, "openai", self._model) + response.raise_for_status() + return response.json()["choices"][0]["message"]["content"] except LLMError: raise except Exception as e: @@ -97,26 +136,25 @@ async def complete(self, messages: List[Message], max_tokens: int = 2000, temper try: system_msg = next((m.content for m in messages if m.role == "system"), None) user_messages = [{"role": m.role, "content": m.content} for m in messages if m.role != "system"] - - async with httpx.AsyncClient(timeout=60.0) as client: - response = await client.post( - self._url, - headers={ - "x-api-key": self._api_key, - "anthropic-version": "2023-06-01", - "content-type": "application/json" - }, - json={ - "model": self._model, - "system": system_msg, - "messages": user_messages, - "max_tokens": max_tokens, - "temperature": temperature - } - ) - _check_4xx(response, "anthropic", self._model) - response.raise_for_status() - return response.json()["content"][0]["text"] + + response = await _post_with_429_retry( + self._url, provider="anthropic", model=self._model, + headers={ + "x-api-key": self._api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json" + }, + json_body={ + "model": self._model, + "system": system_msg, + "messages": user_messages, + "max_tokens": max_tokens, + "temperature": temperature + } + ) + _check_4xx(response, "anthropic", self._model) + response.raise_for_status() + return response.json()["content"][0]["text"] except LLMError: raise except Exception as e: @@ -172,20 +210,19 @@ async def complete(self, messages: List[Message], max_tokens: int = 2000, temper contents.append({"role": role, "parts": [{"text": m.content}]}) url = self._url_tmpl.format(model=self._model, key=self._api_key) - async with httpx.AsyncClient(timeout=60.0) as client: - response = await client.post( - url, - json={ - "contents": contents, - "generationConfig": { - "maxOutputTokens": max_tokens, - "temperature": temperature - } + response = await _post_with_429_retry( + url, provider="gemini", model=self._model, + json_body={ + "contents": contents, + "generationConfig": { + "maxOutputTokens": max_tokens, + "temperature": temperature } - ) - _check_4xx(response, "gemini", self._model) - response.raise_for_status() - return response.json()["candidates"][0]["content"]["parts"][0]["text"] + } + ) + _check_4xx(response, "gemini", self._model) + response.raise_for_status() + return response.json()["candidates"][0]["content"]["parts"][0]["text"] except LLMError: raise except Exception as e: @@ -204,54 +241,26 @@ def __init__(self, api_key: str, model: str = "mistral-large-latest"): async def complete(self, messages: List[Message], max_tokens: int = 2000, temperature: float = 0.0) -> str: MAX_TRANSIENT_RETRIES = 3 # network blips, timeouts, 5xx MAX_RATE_LIMIT_WAITS = 10 # 429s — these don't count as failures, they're "wait and try again" - BASE_DELAY = 15.0 # seconds — Mistral free tier resets per minute - import random transient_attempts = 0 - rate_limit_waits = 0 while True: try: - async with httpx.AsyncClient(timeout=60.0) as client: - response = await client.post( - self._url, - headers={"Authorization": f"Bearer {self._api_key}"}, - json={ - "model": self._model, - "messages": [{"role": m.role, "content": m.content} for m in messages], - "max_tokens": max_tokens, - "temperature": temperature - } - ) - - if response.status_code == 429: - rate_limit_waits += 1 - if rate_limit_waits > MAX_RATE_LIMIT_WAITS: - raise LLMRateLimitError("mistral", self._model) - - retry_after = response.headers.get("Retry-After") - if retry_after: - try: - delay = float(retry_after) + random.uniform(0, 2) - source = "Retry-After header" - except ValueError: - delay = BASE_DELAY * (2 ** min(rate_limit_waits, 3)) + random.uniform(0, 5) - source = "exponential backoff (invalid header)" - else: - delay = BASE_DELAY * (2 ** min(rate_limit_waits, 3)) + random.uniform(0, 5) - source = "exponential backoff" - - logger.warning( - "Mistral 429 — waiting %.1fs (source: %s) — wait #%d/%d", - delay, source, rate_limit_waits, MAX_RATE_LIMIT_WAITS, - ) - await asyncio.sleep(delay) - continue # does NOT increment transient_attempts - - # 4xx (non-429): typed bad-request, no retry. - _check_4xx(response, "mistral", self._model) - response.raise_for_status() - return response.json()["choices"][0]["message"]["content"] + response = await _post_with_429_retry( + self._url, provider="mistral", model=self._model, + headers={"Authorization": f"Bearer {self._api_key}"}, + json_body={ + "model": self._model, + "messages": [{"role": m.role, "content": m.content} for m in messages], + "max_tokens": max_tokens, + "temperature": temperature + }, + max_rate_limit_waits=MAX_RATE_LIMIT_WAITS, + ) + # 4xx (non-429): typed bad-request, no retry. + _check_4xx(response, "mistral", self._model) + response.raise_for_status() + return response.json()["choices"][0]["message"]["content"] except LLMRateLimitError: raise except LLMBadRequestError: diff --git a/tests/investigation/test_compact_observation.py b/tests/investigation/test_compact_observation.py new file mode 100644 index 0000000..db96637 --- /dev/null +++ b/tests/investigation/test_compact_observation.py @@ -0,0 +1,52 @@ +"""_compact_observation caps what a tool result contributes to the LLM +conversation. Tool results are re-sent on every subsequent turn, so uncapped +observations multiply across the loop — the main driver of token-per-minute +429s. The DB/ledger keep the full result; only the conversation is clipped. +""" +import json + +from repi.investigation.react_loop import ReactInvestigationLoop + +_compact = ReactInvestigationLoop._compact_observation + + +def test_short_result_passes_through_unchanged(): + result = {"logs": [{"chunk_id": "a", "text": "short"}], "count": 1} + assert json.loads(_compact(result)) == result + + +def test_long_list_clipped_with_explicit_marker(): + result = {"logs": [{"chunk_id": str(i), "text": f"line {i}"} for i in range(50)]} + out = json.loads(_compact(result)) + # 10 kept + 1 truncation marker + assert len(out["logs"]) == ReactInvestigationLoop.MAX_OBS_ITEMS + 1 + assert "40 more items truncated" in out["logs"][-1] + + +def test_long_text_field_clipped(): + result = {"text": "x" * 5000} + out = json.loads(_compact(result)) + assert len(out["text"]) < 400 + assert out["text"].endswith("...[truncated]") + + +def test_nested_lists_clipped_too(): + result = {"services": [{"logs": [{"text": "y" * 1000}] * 30}]} + out = json.loads(_compact(result)) + inner = out["services"][0]["logs"] + assert len(inner) == ReactInvestigationLoop.MAX_OBS_ITEMS + 1 + + +def test_oversized_result_falls_back_to_tighter_caps_and_stays_valid_json(): + # Many medium-sized entries: per-field caps alone exceed the total cap. + result = {f"key{i}": [{"text": "z" * 299} for _ in range(10)] for i in range(20)} + s = _compact(result) + assert len(s) <= ReactInvestigationLoop.MAX_OBS_TOTAL_CHARS + 2000 # tight-cap pass, not a hard slice + json.loads(s) # must be valid JSON, never a sliced string + + +def test_input_not_mutated(): + result = {"logs": [{"text": "x" * 5000}] * 20} + _compact(result) + assert len(result["logs"]) == 20 + assert len(result["logs"][0]["text"]) == 5000 diff --git a/tests/llm/test_adapters.py b/tests/llm/test_adapters.py index 45caf49..db9f38e 100644 --- a/tests/llm/test_adapters.py +++ b/tests/llm/test_adapters.py @@ -147,3 +147,79 @@ async def _no_sleep(_): assert call_count["n"] == 6 # 5 rate-limit sleeps fired (one per 429). assert sleep_count["n"] == 5 + + +# ─── Shared 429 wait helper (all providers) ────────────────────────────────── + + +class TestSharedRateLimitRetry: + """Before this helper, only Mistral waited out 429s — OpenAI/Anthropic/ + Gemini raised immediately and the ReAct loop burned its retry budget.""" + + @pytest.mark.asyncio + async def test_openai_waits_through_429_then_succeeds(self, monkeypatch): + from repi.llm.adapters import OpenAIProvider + + call_count = {"n": 0} + sleeps: list[float] = [] + + class _Mock429: + status_code = 429 + headers = {"Retry-After": "7"} + text = "" + def json(self): return {} + def raise_for_status(self): pass + + class _Mock200: + status_code = 200 + headers: dict = {} + text = "" + def json(self): return {"choices": [{"message": {"content": "ok"}}]} + def raise_for_status(self): pass + + class _MockClient: + def __init__(self, **_kwargs): pass + async def __aenter__(self): return self + async def __aexit__(self, *_): return False + async def post(self, *_args, **_kwargs): + call_count["n"] += 1 + return _Mock429() if call_count["n"] == 1 else _Mock200() + + async def _record_sleep(d): + sleeps.append(d) + + monkeypatch.setattr("repi.llm.adapters.httpx.AsyncClient", _MockClient) + monkeypatch.setattr("repi.llm.adapters.asyncio.sleep", _record_sleep) + + provider = OpenAIProvider(api_key="test", model="gpt-4o") + result = await provider.complete([Message(role="user", content="hi")]) + + assert result == "ok" + assert call_count["n"] == 2 + # Retry-After honored: 7s base + up to 2s jitter. + assert 7.0 <= sleeps[0] <= 9.0 + + @pytest.mark.asyncio + async def test_exhausted_waits_raise_rate_limit_error(self, monkeypatch): + from repi.llm.adapters import _post_with_429_retry + + class _Mock429: + status_code = 429 + headers = {"Retry-After": "0"} + + class _MockClient: + def __init__(self, **_kwargs): pass + async def __aenter__(self): return self + async def __aexit__(self, *_): return False + async def post(self, *_args, **_kwargs): return _Mock429() + + async def _no_sleep(_): pass + + monkeypatch.setattr("repi.llm.adapters.httpx.AsyncClient", _MockClient) + monkeypatch.setattr("repi.llm.adapters.asyncio.sleep", _no_sleep) + + with pytest.raises(LLMRateLimitError): + await _post_with_429_retry( + "https://example.test", provider="test", model="m", + json_body={}, max_rate_limit_waits=2, + ) From 0d7c4fb5646c72e0ba75d97b0c56acb36e84acf5 Mon Sep 17 00:00:00 2001 From: Varun Singh Date: Fri, 12 Jun 2026 11:27:33 +0530 Subject: [PATCH 5/7] feat(ui): live thinking indicator during investigations Steps only stream after a tool observation completes, leaving 10-30s of dead air. ThinkingIndicator fills the gap with a contextual status line (per-tool wording, reflection, compiling) rotating through generic thinking words every 3s. Hidden on done/error/clarification. --- web/app/page.tsx | 6 +- web/components/chat/ThinkingIndicator.tsx | 68 +++++++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 web/components/chat/ThinkingIndicator.tsx diff --git a/web/app/page.tsx b/web/app/page.tsx index 5e0d14a..c6117f6 100644 --- a/web/app/page.tsx +++ b/web/app/page.tsx @@ -9,6 +9,7 @@ import { InvestigationStepCard } from "@/components/investigation-step" import { ProjectPicker } from "@/components/projects/ProjectPicker" import { ProjectOverview, type SuggestedAction } from "@/components/projects/ProjectOverview" import { Step, useSSE } from "@/lib/sse" +import { ThinkingIndicator } from "@/components/chat/ThinkingIndicator" import { Badge } from "@/components/ui/badge" import { Sparkles } from "lucide-react" import { toast } from "sonner" @@ -392,7 +393,7 @@ interface InvestigateTurnViewProps { function InvestigateTurnView({ investigationId, alreadyHoisted, onComplete }: InvestigateTurnViewProps) { const streamUrl = `${API_BASE}/investigations/${investigationId}/stream` - const { steps, answer, error, done, clarificationQuestion, phase } = useSSE(streamUrl) + const { steps, answer, error, done, clarificationQuestion, awaitingClarification, phase } = useSSE(streamUrl) // Hoist the final answer into the parent's turns state once, so the next // /chat turn can include it as history context. @@ -425,6 +426,9 @@ function InvestigateTurnView({ investigationId, alreadyHoisted, onComplete }: In ))}
+ {!done && !error && !awaitingClarification && ( + + )} {answer && (
{answer diff --git a/web/components/chat/ThinkingIndicator.tsx b/web/components/chat/ThinkingIndicator.tsx new file mode 100644 index 0000000..157f3dd --- /dev/null +++ b/web/components/chat/ThinkingIndicator.tsx @@ -0,0 +1,68 @@ +"use client" + +import { useEffect, useState } from "react" +import { Spinner } from "@/components/ui/spinner" +import type { InvestigationPhase, Step } from "@/lib/sse" + +// Steps only stream in AFTER a tool observation completes, so the gaps +// between them (an LLM call + a tool call, 10–30s on throttled providers) +// previously showed nothing. This fills those gaps with a contextual status +// line derived from what the loop is actually doing. + +const TOOL_STATUS: Record = { + search_logs: "Searching logs", + scan_window: "Scanning the error window", + get_timeline: "Building a timeline", + get_service_summary: "Profiling service health", + find_logs_by_id: "Tracing that identifier", + done_gathering: "Wrapping up evidence gathering", +} + +const THINKING_WORDS = [ + "Thinking", + "Correlating events", + "Weighing evidence", + "Reading log clusters", + "Forming hypotheses", +] + +function contextLabel(phase: InvestigationPhase | null, lastStep?: Step): string | null { + if (phase === "done") return null + if (phase === "compiling") return "Compiling the answer" + if (phase === "gathering") { + if (!lastStep) return "Sweeping recent activity" + if (lastStep.kind === "reflection") return "Stepping back to re-plan" + const tool = lastStep.action?.tool + if (tool && TOOL_STATUS[tool]) return `${TOOL_STATUS[tool]} — deciding next move` + return null // fall through to rotating words + } + return "Reading your question" +} + +interface ThinkingIndicatorProps { + phase: InvestigationPhase | null + lastStep?: Step +} + +export function ThinkingIndicator({ phase, lastStep }: ThinkingIndicatorProps) { + const [tick, setTick] = useState(0) + + // Restart the rotation whenever a new step lands, so the contextual label + // shows first and generic thinking words take over while the gap drags on. + useEffect(() => { + setTick(0) + const id = setInterval(() => setTick((t) => t + 1), 3000) + return () => clearInterval(id) + }, [phase, lastStep?.step_number]) + + const context = contextLabel(phase, lastStep) + const rotation = context ? [context, ...THINKING_WORDS] : THINKING_WORDS + const label = rotation[tick % rotation.length] + + return ( +
+ + {label}… +
+ ) +} From c72b2e78de3dd57400b9dcb2fdb1228ce0f23736 Mon Sep 17 00:00:00 2001 From: Varun Singh Date: Fri, 12 Jun 2026 11:32:44 +0530 Subject: [PATCH 6/7] feat(ingestion): preserve API version segments (/api/v1/) in signatures --- repi/ingestion/log_chunker.py | 4 ++++ tests/ingestion/test_log_chunker.py | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/repi/ingestion/log_chunker.py b/repi/ingestion/log_chunker.py index 95450f0..f7236e7 100644 --- a/repi/ingestion/log_chunker.py +++ b/repi/ingestion/log_chunker.py @@ -45,6 +45,10 @@ def _mask(m: re.Match) -> str: if len(tok) <= 3 and start > 0 and s[start - 1].isalpha() \ and end < len(s) and (s[end].isalpha() or s[end] == '_'): return tok + # API version segments (/api/v1/, v2) — a standalone 'v' + short digits. + if len(tok) <= 2 and start > 0 and s[start - 1] in 'vV' \ + and (start == 1 or not s[start - 2].isalpha()): + return tok prefix = s[max(0, start - 12):start] if _HTTP_VERSION_CTX.search(prefix): return tok diff --git a/tests/ingestion/test_log_chunker.py b/tests/ingestion/test_log_chunker.py index 0e7b47e..bfa4139 100644 --- a/tests/ingestion/test_log_chunker.py +++ b/tests/ingestion/test_log_chunker.py @@ -71,3 +71,9 @@ def test_apache_template_clusters_across_values(): a = get_signature("mod_jk child workerEnv in error state 6") b = get_signature("mod_jk child workerEnv in error state 7") assert a == b + + +def test_api_version_segment_preserved(): + sig = get_signature("GET /api/v1/users/profile took 8231ms for user 99812") + assert "/api/v1/" in sig + assert "8231" not in sig and "99812" not in sig From aed450c132c1b31a72658b337687594ea5326dce Mon Sep 17 00:00:00 2001 From: Varun Singh Date: Sat, 13 Jun 2026 16:13:08 +0530 Subject: [PATCH 7/7] fix(ui): render compiled answer as structured card; harden SSE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The compiled InvestigationAnswer is persisted as json.dumps(...), but the UI rendered it as plain text — so the final card showed raw JSON. Add a CompiledAnswer component that parses the answer and renders root cause, confidence, affected services, trigger, propagation chain, ruled-out hypotheses, assumptions and gaps; non-JSON answers (clarification text, legacy prose) fall back to plain text. SSE robustness: - Live clarifications now emit `clarification_request` instead of a meaningless `done` with "Awaiting clarification..." — previously the question was lost until a page reload hit the replay path. - json.dumps in the stream generator uses default=str so a stray non-serializable observation can't kill the stream. - Client drops a single unparseable frame instead of tearing down the connection, and tolerates EventSource auto-reconnect on transient disconnects rather than surfacing a hard error. --- repi/api/investigate.py | 20 ++- web/app/page.tsx | 9 +- web/components/chat/CompiledAnswer.tsx | 224 +++++++++++++++++++++++++ web/lib/sse.ts | 31 +++- 4 files changed, 271 insertions(+), 13 deletions(-) create mode 100644 web/components/chat/CompiledAnswer.tsx diff --git a/repi/api/investigate.py b/repi/api/investigate.py index 56e09bd..84dd30c 100644 --- a/repi/api/investigate.py +++ b/repi/api/investigate.py @@ -218,7 +218,7 @@ async def event_generator(): "observation": s.observation, "kind": getattr(s, "kind", None), } - yield f"data: {json.dumps({'type': 'step', 'data': step_data})}\n\n" + yield f"data: {json.dumps({'type': 'step', 'data': step_data}, default=str)}\n\n" if investigation.status in ("completed", "failed"): yield f"data: {json.dumps({'type': 'phase_change', 'data': {'phase': 'done'}})}\n\n" @@ -259,14 +259,26 @@ async def on_phase_change(phase: str): try: event = await asyncio.wait_for(queue.get(), timeout=1.0) - yield f"data: {json.dumps(event)}\n\n" + yield f"data: {json.dumps(event, default=str)}\n\n" except asyncio.TimeoutError: continue try: result = await task - done_payload = {"answer": result.answer, "stats": result.stats} - yield f"data: {json.dumps({'type': 'done', 'data': done_payload})}\n\n" + # The loop may have paused for clarification mid-flight rather + # than finishing. In that case it persisted status + # 'awaiting_clarification' (with a question) and returned a + # placeholder answer — surface the question so the UI can prompt + # for a reply, exactly as the replay path above does. Without + # this the live stream emits a meaningless 'done' and the user + # has no way to answer until they reload the page. + refreshed = await store.get_by_id(uuid_obj) + if refreshed and refreshed.status == "awaiting_clarification": + question = refreshed.pending_question or "" + yield f"data: {json.dumps({'type': 'clarification_request', 'data': {'question': question, 'investigation_id': investigation_id}})}\n\n" + else: + done_payload = {"answer": result.answer, "stats": result.stats} + yield f"data: {json.dumps({'type': 'done', 'data': done_payload}, default=str)}\n\n" except Exception as e: logger.error(f"Investigation failed: {e}") yield f"data: {json.dumps({'type': 'error', 'data': {'message': str(e)}})}\n\n" diff --git a/web/app/page.tsx b/web/app/page.tsx index c6117f6..af3bf3f 100644 --- a/web/app/page.tsx +++ b/web/app/page.tsx @@ -10,6 +10,7 @@ import { ProjectPicker } from "@/components/projects/ProjectPicker" import { ProjectOverview, type SuggestedAction } from "@/components/projects/ProjectOverview" import { Step, useSSE } from "@/lib/sse" import { ThinkingIndicator } from "@/components/chat/ThinkingIndicator" +import { CompiledAnswer } from "@/components/chat/CompiledAnswer" import { Badge } from "@/components/ui/badge" import { Sparkles } from "lucide-react" import { toast } from "sonner" @@ -429,13 +430,7 @@ function InvestigateTurnView({ investigationId, alreadyHoisted, onComplete }: In {!done && !error && !awaitingClarification && ( )} - {answer && ( -
- {answer - .replace(/\s*\[chunk:[^\]]+\]/gi, "") - .replace(/\s*\[chunk_id:[^\]]+\]/gi, "")} -
- )} + {answer && } {error && (
{error} diff --git a/web/components/chat/CompiledAnswer.tsx b/web/components/chat/CompiledAnswer.tsx new file mode 100644 index 0000000..1d2c027 --- /dev/null +++ b/web/components/chat/CompiledAnswer.tsx @@ -0,0 +1,224 @@ +"use client" + +import ReactMarkdown from "react-markdown" +import { Badge } from "@/components/ui/badge" +import { AlertTriangle, ArrowDown, Target, Ban, HelpCircle } from "lucide-react" + +// Shape of the compiled InvestigationAnswer (see repi/investigation/schema.py). +// Everything is optional here because the JSON comes off the wire and we render +// defensively — a missing section is simply skipped. +interface TriggerEvent { + chunk_id?: string + service?: string + timestamp?: string + log_line?: string +} +interface PropagationHop { + service?: string + chunk_id?: string + ts?: string + what?: string +} +interface RuledOut { + hypothesis?: string + why_ruled_out?: string +} +interface InvestigationAnswer { + incident_window?: { start?: string; end?: string } + affected_services?: string[] + trigger_event?: TriggerEvent + propagation_chain?: PropagationHop[] + root_cause?: string + ruled_out_hypotheses?: RuledOut[] + assumptions?: string[] + confidence?: string + gaps?: string[] +} + +// Strip the model's inline chunk references from prose — same cleanup the old +// plain-text card did. +function stripChunkRefs(s: string): string { + return s + .replace(/\s*\[chunk:[^\]]+\]/gi, "") + .replace(/\s*\[chunk_id:[^\]]+\]/gi, "") +} + +// The compiled answer is persisted as `json.dumps(InvestigationAnswer)`. Older +// investigations and clarification messages are plain prose. Try to parse; if +// it isn't the structured object, fall back to rendering text. +function tryParse(answer: string): InvestigationAnswer | null { + const trimmed = answer.trim() + if (!trimmed.startsWith("{")) return null + try { + const obj = JSON.parse(trimmed) + if (obj && typeof obj === "object" && ("root_cause" in obj || "affected_services" in obj)) { + return obj as InvestigationAnswer + } + } catch { + // not JSON — fall through to plain text + } + return null +} + +const CONFIDENCE_VARIANT: Record = { + high: "default", + medium: "secondary", + low: "destructive", +} + +function Section({ + icon, + title, + children, +}: { + icon: React.ReactNode + title: string + children: React.ReactNode +}) { + return ( +
+
+ {icon} + {title} +
+ {children} +
+ ) +} + +export function CompiledAnswer({ answer }: { answer: string }) { + const parsed = tryParse(answer) + + // Fallback: plain-text answer (clarification text or legacy prose). + if (!parsed) { + return ( +
+ {stripChunkRefs(answer)} +
+ ) + } + + const { + incident_window, + affected_services, + trigger_event, + propagation_chain, + root_cause, + ruled_out_hypotheses, + assumptions, + confidence, + gaps, + } = parsed + + const conf = (confidence || "").toLowerCase() + const window = incident_window || {} + + return ( +
+ {/* Header: confidence + incident window */} +
+ {conf && ( + + {conf} confidence + + )} + {(window.start || window.end) && ( + + {window.start || "?"} → {window.end || "?"} + + )} +
+ + {/* Root cause — the headline */} + {root_cause && ( +
} title="Root cause"> +
+ {stripChunkRefs(root_cause)} +
+
+ )} + + {/* Affected services */} + {affected_services && affected_services.length > 0 && ( +
} title="Affected services"> +
+ {affected_services.map((svc) => ( + + {svc} + + ))} +
+
+ )} + + {/* Trigger event */} + {trigger_event && (trigger_event.log_line || trigger_event.service) && ( +
} title="Trigger event"> +
+
+ {trigger_event.service && {trigger_event.service}} + {trigger_event.timestamp && {trigger_event.timestamp}} +
+ {trigger_event.log_line && ( +
+                {trigger_event.log_line}
+              
+ )} +
+
+ )} + + {/* Propagation chain */} + {propagation_chain && propagation_chain.length > 0 && ( +
} title="Propagation"> +
    + {propagation_chain.map((hop, i) => ( +
  1. + {i + 1}. + + {hop.service} + {hop.what ? ` — ${stripChunkRefs(hop.what)}` : ""} + {hop.ts ? ({hop.ts}) : null} + +
  2. + ))} +
+
+ )} + + {/* Ruled out */} + {ruled_out_hypotheses && ruled_out_hypotheses.length > 0 && ( +
} title="Ruled out"> +
    + {ruled_out_hypotheses.map((r, i) => ( +
  • + {r.hypothesis} + {r.why_ruled_out ? — {r.why_ruled_out} : null} +
  • + ))} +
+
+ )} + + {/* Assumptions + gaps — secondary context, shown last */} + {assumptions && assumptions.length > 0 && ( +
} title="Assumptions"> +
    + {assumptions.map((a, i) => ( +
  • {a}
  • + ))} +
+
+ )} + {gaps && gaps.length > 0 && ( +
} title="Gaps"> +
    + {gaps.map((g, i) => ( +
  • {g}
  • + ))} +
+
+ )} +
+ ) +} diff --git a/web/lib/sse.ts b/web/lib/sse.ts index f0d267b..09970ab 100644 --- a/web/lib/sse.ts +++ b/web/lib/sse.ts @@ -1,6 +1,6 @@ "use client" -import { useState, useEffect, useCallback } from "react" +import { useState, useEffect, useCallback, useRef } from "react" export type StepKind = null | "reflection" | "signal" | "compile" @@ -37,6 +37,9 @@ export function useSSE(url: string | null) { const [awaitingClarification, setAwaitingClarification] = useState(false) const [phase, setPhase] = useState(null) const [stats, setStats] = useState(null) + // Mirror of `done` readable synchronously inside the EventSource callbacks, + // which capture state from the connect-time render. + const doneRef = useRef(false) const connect = useCallback(() => { if (!url) return @@ -44,6 +47,7 @@ export function useSSE(url: string | null) { setSteps([]) setAnswer(null) setError(null) + doneRef.current = false setDone(false) setClarificationQuestion(null) setAwaitingClarification(false) @@ -53,7 +57,15 @@ export function useSSE(url: string | null) { const eventSource = new EventSource(url) eventSource.onmessage = (event) => { - const { type, data } = JSON.parse(event.data) + let type: string + let data: any + try { + ;({ type, data } = JSON.parse(event.data)) + } catch { + // A single malformed frame must not tear down the whole stream. + console.warn("SSE: dropped unparseable frame", event.data) + return + } if (type === "step") { setSteps((prev) => { @@ -69,6 +81,7 @@ export function useSSE(url: string | null) { setAnswer(data.answer) if (data.stats) setStats(data.stats) setPhase("done") + doneRef.current = true setDone(true) eventSource.close() } else if (type === "clarification_request") { @@ -77,12 +90,26 @@ export function useSSE(url: string | null) { setDone(false) // Stay open for more steps later } else if (type === "error") { setError(data.message) + doneRef.current = true setDone(true) eventSource.close() } } eventSource.onerror = (err) => { + // The server closes the stream once it has sent `done`; the resulting + // error here is expected — swallow it. + if (doneRef.current) { + eventSource.close() + return + } + // readyState === CONNECTING means the browser is auto-reconnecting. The + // stream endpoint replays prior steps (deduped above) and resumes the + // loop, so let the retry happen instead of tearing the view down. + if (eventSource.readyState === EventSource.CONNECTING) { + console.warn("SSE: transient disconnect, reconnecting…") + return + } console.error("SSE Error:", err) setError("Connection to investigation stream lost.") setDone(true)