diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..6622529 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,25 @@ +name: tests + +on: + push: + branches: [main] + pull_request: + +jobs: + pytest: + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up uv + uses: astral-sh/setup-uv@v5 + with: + enable-cache: true + + - name: Install dependencies + run: uv sync --frozen + + - name: Run tests + run: uv run pytest tests/ -q diff --git a/.gitignore b/.gitignore index 57a7c22..7f5cd48 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,5 @@ eval/results/ # Closure / launch tracker (private; not for public repo) todo/ +tmp-ui-tests/ +assets/ diff --git a/CLAUDE.md b/CLAUDE.md index 14c58ea..325d98b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -53,10 +53,12 @@ make reset-investigations # TRUNCATE investigations CASCADE # Ingest a log file (via HTTP API) curl -X POST -F "service=my-svc" -F "file=@/path/to/app.log" http://localhost:8000/ingest -# Run an investigation (via HTTP API) +# Run an investigation (via HTTP API). POST only registers it and returns an +# id — execution happens while a client is attached to the SSE stream. curl -X POST http://localhost:8000/investigate \ -H "Content-Type: application/json" \ -d '{"query": "why did checkout fail last friday night"}' +curl -N http://localhost:8000/investigations/{id}/stream # Background worker for continuous ingestion uv run python -m repi.worker diff --git a/README.md b/README.md index 4dcdbb0..1d503c5 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # repi -Log ingestion and LLM-based investigation engine. Ingests log files into PostgreSQL (pgvector), retrieves relevant log clusters via hybrid search (BM25 + dense vectors with RRF), and runs a ReAct loop where an LLM autonomously investigates root causes. +Local-first log observability. repi continuously ingests logs, indexes them into a hybrid retrieval system (pgvector HNSW + Postgres FTS with weighted tsvector + pg_trgm fuzzy match), clusters related events, builds incident timelines, and can optionally launch an autonomous root-cause investigation through a ReAct loop. Designed to run on a single machine against a local Postgres — no SaaS, no shared state. ## Architecture @@ -45,7 +45,7 @@ On first start, the entrypoint seeds `/app/.repi/config.json` from a baked-in de - Visit the **Config** page in the UI, pick a provider, paste your API key, save. The API hot-reloads. - Your config persists across `docker compose down` (lost only on `down -v`). -Pin a release via `REPI_IMAGE=ghcr.io/varungitgood/repi:v0.1.0 docker compose up -d`. +Pin a release via `REPI_IMAGE=ghcr.io/varungitgood/repi:v0.2.0 docker compose up -d`. ### Option 1b — Hack on it (contributor / dev path) @@ -88,16 +88,17 @@ curl -X POST \ ### Investigate +Starting an investigation is a two-step flow: `POST /investigate` registers it and returns an `id`; attaching to the SSE stream is what actually executes the ReAct loop (the web UI does this for you). A `POST` with no stream consumer stays in `started` and never runs. + ```bash +# 1. Register the investigation — returns {"id": "...", ...} curl -X POST http://localhost:8000/investigate \ -H "Content-Type: application/json" \ -d '{"query": "why did checkout fail last friday night"}' -``` -Stream the ReAct steps live: - -```bash -curl -N http://localhost:8000/investigate/{id}/stream +# 2. Attach to the stream to execute it and watch the ReAct steps live. +# Reconnecting replays persisted steps, then continues. +curl -N http://localhost:8000/investigations/{id}/stream ``` ### Continuous ingestion with the Worker diff --git a/eval/dataset_1_cascading_inventory_migration/seed.py b/eval/dataset_1_cascading_inventory_migration/seed.py index f8ff763..90e10cd 100644 --- a/eval/dataset_1_cascading_inventory_migration/seed.py +++ b/eval/dataset_1_cascading_inventory_migration/seed.py @@ -54,7 +54,7 @@ async def main() -> None: print(f" SKIP {service}: {path} not found") continue content = path.read_text().replace(STORY_DATE, anchor_str) - count = await ingestor.ingest(content, source_service=service, source_env="eval") + count = (await ingestor.ingest(content, source_service=service, source_env="eval")).chunk_count print(f" {service:20s} {count:4d} chunks ({filename})") total += count diff --git a/eval/dataset_2_insufficient_logging/seed.py b/eval/dataset_2_insufficient_logging/seed.py index 75ee2b7..33cabd3 100644 --- a/eval/dataset_2_insufficient_logging/seed.py +++ b/eval/dataset_2_insufficient_logging/seed.py @@ -52,7 +52,7 @@ async def main() -> None: print(f" SKIP {service}: {path} not found") continue content = path.read_text().replace(STORY_DATE, anchor_str) - count = await ingestor.ingest(content, source_service=service, source_env="eval") + count = (await ingestor.ingest(content, source_service=service, source_env="eval")).chunk_count print(f" {service:20s} {count:4d} chunks ({filename})") total += count diff --git a/eval/dataset_3_jwt_key_rotation_noise/seed.py b/eval/dataset_3_jwt_key_rotation_noise/seed.py index 5382a3b..3d19b9b 100644 --- a/eval/dataset_3_jwt_key_rotation_noise/seed.py +++ b/eval/dataset_3_jwt_key_rotation_noise/seed.py @@ -55,7 +55,7 @@ async def main() -> None: print(f" SKIP {service}: {path} not found") continue content = path.read_text().replace(STORY_DATE, anchor_str) - count = await ingestor.ingest(content, source_service=service, source_env="eval") + count = (await ingestor.ingest(content, source_service=service, source_env="eval")).chunk_count print(f" {service:22s} {count:4d} chunks ({filename})") total += count diff --git a/pyproject.toml b/pyproject.toml index 6331573..92b16cd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "repi" -version = "0.1.0" +version = "0.2.0" description = "repi: Log Investigation Engine" readme = "README.md" requires-python = ">=3.11,<4" diff --git a/repi/api/chat.py b/repi/api/chat.py index 593f856..57a39cf 100644 --- a/repi/api/chat.py +++ b/repi/api/chat.py @@ -22,16 +22,18 @@ import asyncio import json import logging -from datetime import datetime, timezone +from collections import Counter +from datetime import datetime, timedelta, timezone from typing import List, Literal, Optional from uuid import UUID, uuid4 from fastapi import APIRouter from fastapi.responses import StreamingResponse -from pydantic import BaseModel +from pydantic import BaseModel, Field from sqlalchemy import select, text as sa_text from repi.core.container import get_container +from repi.core.config import get_settings from repi.core.dates import default_date_handler as _dh from repi.intent.resolver import ( ClarificationNeeded, @@ -42,6 +44,8 @@ from repi.llm.provider import Message from repi.models.filters import RetrievalFilters from repi.models.schema import ChatMessage, Conversation +from repi.retrieval.cluster_view import cluster_chunks +from repi.retrieval.timeline_view import build_timeline logger = logging.getLogger("repi.api.chat") @@ -68,6 +72,32 @@ class ChatRequest(BaseModel): history: List[ChatTurn] = [] filters: Optional[ChatFilters] = None conversation_id: Optional[UUID] = None + # Followup-bias hint: chunk_ids the previous assistant turn cited. When + # the current query is missing EITHER an explicit service or an explicit + # time window, the chat path fills in just the missing dimension from + # the previous turn's chunks — service via dominant-source check, time + # via a `Settings.FOLLOWUP_BIAS_WINDOW_MINUTES` envelope around their + # timestamps. Soft — never overrides an explicit filter, silently + # ignored if the IDs no longer resolve. + # + # Capped at 50 to bound the indexed-PK fetch and reject malformed + # payloads early. The legitimate caller only ever sends the last + # assistant turn's citations (≤10 in practice). + previous_chunk_ids: List[str] = Field(default_factory=list, max_length=50) + + +# ── Module-level constants ──────────────────────────────────────────────────── + +# Caller-visible window on cited-chunk `text` in the SSE done payload. Locked +# to the same length the LLM prompt's evidence block uses so the UI never +# surfaces content the model didn't see. +CHUNK_TEXT_WINDOW = 600 + +# When the dominant service's count is at least this multiple of the +# runner-up's, treat it as "this is the conversation's service" and bias +# retrieval toward it. Below this ratio the previous turn straddled +# services (cross-service incident) — we let the resolver fan out instead. +SERVICE_DOMINANCE_RATIO = 2.0 # ── SSE envelope helpers ────────────────────────────────────────────────────── @@ -77,6 +107,23 @@ def _sse(event_type: str, data: dict) -> str: return f"data: {json.dumps({'type': event_type, 'data': data})}\n\n" +def _normalize_ts(value): + """Canonicalise a `timestamp_start` field for the chat path's chunks list. + + The chunks the LLM, the cluster_view, and the timeline_view all read share + one rule: `timestamp` is ISO 8601 string or None. Downstream comparisons + (`<`, `>`, `sorted(...)`) rely on this — mixing `datetime` and `str` in + one list would TypeError. Two source paths (RRF + entity-bias merge) feed + this list; both run their `timestamp_start` through here so a future + change to either source can't reintroduce mixed types. + """ + if value is None: + return None + if hasattr(value, "isoformat"): + return _dh.to_iso(value) + return value # already string-ish + + # ── Confidence heuristic (chat path) ────────────────────────────────────────── # Compiler floors don't apply — /chat has no compile step. Deterministic rules: # - 0 chunks gathered → low @@ -215,6 +262,52 @@ async def event_generator(): if caller_entity and caller_entity not in entities: entities.append(caller_entity) + # Followup bias: when the current query is missing EITHER an + # explicit service or an explicit time window, fill in just the + # missing dimension from the previous turn's cited chunks. + # Indexed PK lookup, so cost is negligible vs the LLM call. Soft: + # explicit filters always win. + if req.previous_chunk_ids and (service is None or (time_from is None and time_to is None)): + async with container.async_session_maker() as session: + prev_meta = await container.get_retrieval_service(session).vector_store.get_chunks_by_ids( + list(req.previous_chunk_ids) + ) + if prev_meta: + prev_services = [m.get("source_service") for m in prev_meta.values() if m.get("source_service")] + if service is None and prev_services: + # Narrow to the dominant service only if it's clearly + # dominant — top count >= SERVICE_DOMINANCE_RATIO × + # runner-up. Otherwise the previous turn straddled + # services (a cross-service incident), and pinning + # one would hide the other half on the followup. + counts = Counter(prev_services).most_common() + top_svc, top_n = counts[0] + runner_up = counts[1][1] if len(counts) > 1 else 0 + if runner_up == 0 or top_n >= SERVICE_DOMINANCE_RATIO * runner_up: + service = top_svc + logger.debug( + "chat followup-bias: pinned service=%s (top=%d, runner-up=%d)", + top_svc, top_n, runner_up, + ) + else: + logger.debug( + "chat followup-bias: skipped service pin — " + "no dominant service (top=%d, runner-up=%d)", + top_n, runner_up, + ) + if time_from is None and time_to is None: + prev_ts = [m.get("timestamp_start") for m in prev_meta.values() if m.get("timestamp_start")] + if prev_ts: + envelope = timedelta(minutes=get_settings().FOLLOWUP_BIAS_WINDOW_MINUTES) + anchor_min = min(prev_ts) + anchor_max = max(prev_ts) + time_from = anchor_min - envelope + time_to = anchor_max + envelope + logger.debug( + "chat followup-bias: time window %s → %s", + time_from, time_to, + ) + async with container.async_session_maker() as session: retrieval = container.get_retrieval_service(session) rrf_filters = RetrievalFilters( @@ -241,9 +334,7 @@ async def event_generator(): "chunk_id": cid, "service": data.get("source_service"), "level": data.get("log_level"), - "timestamp": _dh.to_iso(data.get("timestamp_start")) - if hasattr(data.get("timestamp_start"), "isoformat") - else data.get("timestamp_start"), + "timestamp": _normalize_ts(data.get("timestamp_start")), "text": data.get("text") or "", "score": float(score), }) @@ -261,7 +352,7 @@ async def event_generator(): "chunk_id": c["chunk_id"], "service": c.get("service"), "level": c.get("level"), - "timestamp": c.get("timestamp_start"), + "timestamp": _normalize_ts(c.get("timestamp_start")), "text": c.get("text") or "", "score": 0.0, # ILIKE has no score; use sentinel }) @@ -277,7 +368,7 @@ async def event_generator(): "service": c.get("service"), "level": c.get("level"), "timestamp": str(c.get("timestamp") or ""), - "text": (c.get("text") or "")[:600], + "text": (c.get("text") or "")[:CHUNK_TEXT_WINDOW], } for c in chunks ], indent=2, default=str) @@ -318,11 +409,54 @@ async def event_generator(): ) await session.commit() + # Event clusters across the retrieved top-K. Singletons are + # dropped (they're already in the per-turn timeline); the panel + # gives the user the "1842x JWT failures, 347x DB timeouts" + # compression rather than a raw chunk list. Caveat the UI must + # carry: this is *per-turn* over the retrieved chunks, not a + # corpus-wide aggregate. + clusters = [ + { + "signature": v.signature, + "count": v.count, + "services": v.services, + "first_ts": v.first_ts, + "last_ts": v.last_ts, + } + for v in cluster_chunks(chunks) + ] + + # Incident timeline — chronologically ordered, run-collapsed view + # of the same retrieved chunks. Reuses the in-memory list rather + # than re-fetching via investigation.tools.get_timeline (one less + # DB roundtrip per turn). Singletons stay so the user can see + # the gap between events; runs collapse so 12 identical lines + # become "x12 over 14:02–14:04". + timeline = build_timeline(chunks) + + # Minimal projection of cited chunks for the UI's raw-evidence tab. + # Saves a follow-up GET — the chat path already has the hydrated + # list. Keep the text at the same 600-char window used in the LLM + # prompt so the UI doesn't surface content the model didn't see. + cited_chunks = [ + { + "chunk_id": c["chunk_id"], + "service": c.get("service"), + "level": c.get("level"), + "timestamp": str(c.get("timestamp") or "") or None, + "text": (c.get("text") or "")[:CHUNK_TEXT_WINDOW], + } + for c in chunks + ] + yield _sse("done", { "chunk_ids": cited_ids, "confidence": confidence, "conversation_id": str(conversation_id), "entities": entities, + "clusters": clusters, + "timeline": timeline, + "cited_chunks": cited_chunks, }) except Exception as e: diff --git a/repi/api/ingest.py b/repi/api/ingest.py index e11dfe3..7d74bc7 100644 --- a/repi/api/ingest.py +++ b/repi/api/ingest.py @@ -10,6 +10,9 @@ class IngestResponse(BaseModel): service: str chunk_count: int + lines_total: int + lines_with_timestamp: int + level_counts: dict[str, int] message: str @router.post("/ingest", response_model=IngestResponse) @@ -22,14 +25,28 @@ async def ingest( """ content = await file.read() content_str = content.decode("utf-8") - + container = get_container() async with container.get_session() as session: ingestor = container.get_ingestor(session) - count = await ingestor.ingest(content_str, service) - + stats = await ingestor.ingest(content_str, service) + + # Refresh the in-memory service list so a brand-new service is visible to + # the intent resolver immediately, not only after a restart or GET /services. + await container.init_known_services() + + message = f"Successfully ingested {stats.chunk_count} chunks for {service}" + if stats.lines_total and stats.lines_with_timestamp == 0: + message += ( + " (warning: no timestamps could be parsed from these logs — " + "time-based filters will not match them)" + ) + return IngestResponse( service=service, - chunk_count=count, - message=f"Successfully ingested {count} chunks for {service}" + chunk_count=stats.chunk_count, + lines_total=stats.lines_total, + lines_with_timestamp=stats.lines_with_timestamp, + level_counts=stats.level_counts, + message=message, ) diff --git a/repi/core/config.py b/repi/core/config.py index 5f89fb8..ef4fada 100644 --- a/repi/core/config.py +++ b/repi/core/config.py @@ -6,8 +6,23 @@ from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, SettingsConfigDict from pydantic import Field -CONFIG_DIR = Path(".repi") -CONFIG_PATH = CONFIG_DIR / "config.json" +def _resolve_config_path() -> Path: + """Locate .repi/config.json: cwd first (docker runs from /app), then parent + directories (running from a subdir of a checkout), then alongside the + package (where `repi init` writes it). Falls back to the cwd-relative + default so a fresh PUT /config can still create the file.""" + rel = Path(".repi") / "config.json" + for base in [Path.cwd(), *Path.cwd().parents]: + candidate = base / rel + if candidate.exists(): + return candidate + pkg_anchored = Path(__file__).resolve().parent.parent.parent / rel + if pkg_anchored.exists(): + return pkg_anchored + return rel + +CONFIG_PATH = _resolve_config_path() +CONFIG_DIR = CONFIG_PATH.parent class Settings(BaseSettings): REPI_ENV: str = Field(default="production", description="Runtime environment") @@ -56,6 +71,14 @@ class Settings(BaseSettings): ENABLE_REFLECTION: bool = True REFLECTION_INTERVAL: int = 3 + # /chat followup-bias envelope. When a turn omits an explicit time + # window AND the previous assistant turn cited chunks, the chat path + # widens the previous turn's first/last timestamps by this much on + # each side. Same conceptual dial as TIME_WINDOW_INITIAL_MINUTES — kept + # separate because the followup window is much narrower than a fresh + # search and operators want to tune them independently. + FOLLOWUP_BIAS_WINDOW_MINUTES: int = 5 + # Extra entity-detection regex patterns. Industry-standard IDs (UUID, W3C # trace/span ids, ULID, Stripe/Twilio-style prefixed ids, AWS resource ids, # git SHAs, hyphenated IDs containing a digit) are matched out of the box diff --git a/repi/ingestion/log_ingestor.py b/repi/ingestion/log_ingestor.py index d29f619..cc0e6d5 100644 --- a/repi/ingestion/log_ingestor.py +++ b/repi/ingestion/log_ingestor.py @@ -1,5 +1,7 @@ from __future__ import annotations import logging +from collections import Counter +from dataclasses import dataclass, field from typing import List, Optional from repi.ingestion.log_parser import parse_log_line from repi.ingestion.log_chunker import chunk_logs @@ -8,12 +10,22 @@ logger = logging.getLogger(__name__) +@dataclass +class IngestStats: + """Parse-quality report for one ingest call. A run with + lines_with_timestamp == 0 means time filters will never match these logs — + surface that to the caller instead of failing silently at query time.""" + chunk_count: int = 0 + lines_total: int = 0 + lines_with_timestamp: int = 0 + level_counts: dict[str, int] = field(default_factory=dict) + class LogIngestor: def __init__(self, vector_store: PgVectorStore, embedding_func) -> None: self.vector_store = vector_store self.embedding_func = embedding_func - async def ingest(self, logs: str | List[str], source_service: str, source_env: str = "production") -> int: + async def ingest(self, logs: str | List[str], source_service: str, source_env: str = "production") -> IngestStats: """ Ingest logs from a specific source. """ @@ -28,11 +40,17 @@ async def ingest(self, logs: str | List[str], source_service: str, source_env: s parsed_logs = [parse_log_line(line) for line in lines if line.strip()] chunks = chunk_logs(parsed_logs) + stats = IngestStats( + lines_total=len(parsed_logs), + lines_with_timestamp=sum(1 for p in parsed_logs if p.parsed_timestamp is not None), + level_counts=dict(Counter(p.level for p in parsed_logs)), + ) + chunk_texts = [f"Signature: {c.signature}\nExamples: {' '.join(c.examples)}" for c in chunks] - + if not chunk_texts: - return 0 - + return stats + embeddings = self.embedding_func(chunk_texts) count = 0 @@ -51,5 +69,11 @@ async def ingest(self, logs: str | List[str], source_service: str, source_env: s ) count += 1 + stats.chunk_count = count + if stats.lines_total and stats.lines_with_timestamp == 0: + logger.warning( + f"No timestamps parsed from any of {stats.lines_total} lines for " + f"{source_service} — time-based filters will not match these chunks" + ) logger.info(f"Ingested {count} chunks from service {source_service}") - return count + return stats diff --git a/repi/ingestion/log_parser.py b/repi/ingestion/log_parser.py index 111eda2..fc2cfbf 100644 --- a/repi/ingestion/log_parser.py +++ b/repi/ingestion/log_parser.py @@ -2,7 +2,7 @@ import json import re from typing import Optional, Dict, Any -from datetime import datetime +from datetime import datetime, timedelta, timezone from dataclasses import dataclass import logging @@ -17,45 +17,72 @@ class ParsedLog: # Common log patterns TEXT_LOG_PATTERN = re.compile( - r"(?P\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:\.\d+)?Z?)?\s*" + r"(?P\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:[.,]\d+)?(?:Z|[+-]\d{2}:?\d{2})?)?\s*-?\s*" r"\[?(?PINFO|ERROR|WARN|WARNING|DEBUG|CRITICAL|FATAL)\]?\s*" r"(?P.*)", re.IGNORECASE ) +# Syslog: "Dec 10 06:55:46 host proc[pid]: message" +SYSLOG_PATTERN = re.compile( + r"(?P[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+(?P.+)" +) + +# Daemons like sshd prefix some syslog message bodies with a level token +# ("error: maximum authentication attempts exceeded"). +SYSLOG_LEVEL_HINT = re.compile(r"(?Perror|warning|fatal)\b[: ]", re.IGNORECASE) + +# Apache/nginx access log: '1.2.3.4 - - [10/Oct/2000:13:55:36 -0700] "GET / HTTP/1.0" 200 ...' +ACCESS_LOG_PATTERN = re.compile( + r"(?P\S+) \S+ \S+ \[(?P\d{2}/[A-Z][a-z]{2}/\d{4}:\d{2}:\d{2}:\d{2} [+-]\d{4})\] (?P.+)" +) + def _parse_timestamp(ts_str: str | None) -> datetime | None: if not ts_str: return None - + # Try formats formats = [ "%Y-%m-%d %H:%M:%S.%f", + "%Y-%m-%d %H:%M:%S,%f", # log4j / logback "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S.%fZ", - "%b %d %H:%M:%S" # Syslog + "%d/%b/%Y:%H:%M:%S %z", # apache/nginx access log + "%b %d %H:%M:%S" # Syslog ] - + + dt: datetime | None = None for fmt in formats: try: dt = datetime.strptime(ts_str, fmt) - if fmt == "%b %d %H:%M:%S": - # Assume current year for syslog - dt = dt.replace(year=datetime.utcnow().year) - return dt except ValueError: continue - - # Try ISO format - try: - return datetime.fromisoformat(ts_str.replace("Z", "+00:00")) - except (ValueError, TypeError): - return None + if fmt == "%b %d %H:%M:%S": + # Syslog omits the year — assume the most recent occurrence. + now = datetime.utcnow() + dt = dt.replace(year=now.year) + if dt > now + timedelta(days=1): + dt = dt.replace(year=now.year - 1) + break + + if dt is None: + # Try ISO format + try: + dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) + except (ValueError, TypeError): + return None + + # Normalise to naive UTC so timestamps sort/compare consistently downstream. + if dt.tzinfo is not None: + dt = dt.astimezone(timezone.utc).replace(tzinfo=None) + return dt def parse_log_line(line: str) -> ParsedLog: """ Parse a single log line into a structured ParsedLog. - Supports JSON logs and common plain text formats. + Supports JSON logs and common plain text formats + (ISO/log4j app logs, syslog, apache/nginx access logs). """ line = line.strip() if not line: @@ -80,6 +107,28 @@ def parse_log_line(line: str) -> ParsedLog: logger.debug(f"Parser: Matched text log (level={level})") return ParsedLog(timestamp=ts_str, level=level, message=message, parsed_timestamp=_parse_timestamp(ts_str)) + # Try syslog ("Dec 10 06:55:46 host sshd[24200]: message") + match = SYSLOG_PATTERN.match(line) + if match: + ts_str = match.group("timestamp") + message = match.group("message") + level = "INFO" + # "host proc[pid]: body" — sniff a leading level token in the body. + _, sep, body = message.partition(": ") + hint = SYSLOG_LEVEL_HINT.match(body) if sep else None + if hint: + level = hint.group("level").upper() + logger.debug(f"Parser: Matched syslog (level={level})") + return ParsedLog(timestamp=ts_str, level=level, message=message, parsed_timestamp=_parse_timestamp(ts_str)) + + # Try apache/nginx access log + match = ACCESS_LOG_PATTERN.match(line) + if match: + ts_str = match.group("timestamp") + message = f"{match.group('host')} {match.group('message')}" + logger.debug("Parser: Matched access log") + return ParsedLog(timestamp=ts_str, level="INFO", message=message, parsed_timestamp=_parse_timestamp(ts_str)) + # Fallback logger.debug("Parser: Falling back to plain message (no match)") return ParsedLog(timestamp=None, level="INFO", message=line) diff --git a/repi/retrieval/cluster_view.py b/repi/retrieval/cluster_view.py new file mode 100644 index 0000000..65938ee --- /dev/null +++ b/repi/retrieval/cluster_view.py @@ -0,0 +1,123 @@ +"""Runtime event clustering over a retrieved chunk set. + +The user-visible product framing: "Repi ingests logs, indexes them with +hybrid retrieval, **clusters related events**, builds incident timelines, +and can launch autonomous root-cause investigation." This module is the +**clusters** word in that sentence. + +We do not re-run the ingest-time clustering. log_chunks already stores +each row's signature inline in `text` (see log_ingestor.py — the rows are +templated as `"Signature: \\nExamples: ..."`). We extract the +signature back out, then group the retrieved top-K so the UI can render +"3 events, 1842x · 347x · 92x" instead of 1842 individual log lines. + +This is **not** a corpus-wide cluster aggregate — it covers only the +chunks the retrieval pipeline returned for this turn. The UI label must +say so. For a corpus-wide aggregate we would add a /clusters endpoint +with a real `signature` column (Path B in the drift-alignment plan); +that's intentionally deferred. +""" +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import List, Optional + +logger = logging.getLogger(__name__) + + +def extract_signature(chunk_text: str) -> str: + """Pull the signature back out of the templated chunk body. + + The ingestor writes `"Signature: \\nExamples: ..."`. We + take the slice between `"Signature: "` and the first newline. + + A chunk without that prefix is dual-source state — external imports or + pre-ingestor data. Re-running get_signature() over the whole body would + mask numerics inside the "Examples: ..." portion too, producing a + signature that doesn't match what the ingestor would have stored for + the same raw line. That silently mis-clusters. Log instead so we can + spot the drift, and return empty so the caller skips the chunk. + """ + if not chunk_text: + return "" + prefix = "Signature: " + if chunk_text.startswith(prefix): + rest = chunk_text[len(prefix):] + nl = rest.find("\n") + return (rest[:nl] if nl != -1 else rest).strip() + logger.warning( + "cluster_view: chunk without 'Signature:' prefix — skipping. " + "Indicates dual-source state (external import or pre-ingestor data).", + ) + return "" + + +@dataclass(frozen=True) +class ClusterView: + signature: str + count: int + services: List[str] + first_ts: Optional[str] + last_ts: Optional[str] + + +def cluster_chunks( + chunks: List[dict], + min_count: int = 2, +) -> List[ClusterView]: + """Group `chunks` by extracted signature, drop singletons, return by count desc. + + Each chunk dict is expected in the shape the chat path already produces: + `{chunk_id, service, level, timestamp, text, ...}`. Timestamps may be ISO + strings (chat path) or naive — sort behaviour is left to lexical string + comparison, which is correct for ISO8601. + + `min_count=2` is the default because singletons are already covered by + the per-turn timeline; surfacing them here would dilute the "compress + thousands of logs into a few meaningful incidents" framing. + """ + if not chunks: + return [] + + groups: dict[str, dict] = {} + for c in chunks: + sig = extract_signature(c.get("text") or "") + if not sig: + continue + svc = c.get("service") + ts = c.get("timestamp") + g = groups.get(sig) + if g is None: + g = { + "count": 0, + "services": set(), + "first_ts": None, + "last_ts": None, + } + groups[sig] = g + g["count"] += 1 + if svc: + g["services"].add(svc) + if ts is not None: + if g["first_ts"] is None or ts < g["first_ts"]: + g["first_ts"] = ts + if g["last_ts"] is None or ts > g["last_ts"]: + g["last_ts"] = ts + + views: list[ClusterView] = [] + for sig, g in groups.items(): + if g["count"] < min_count: + continue + views.append( + ClusterView( + signature=sig, + count=g["count"], + services=sorted(g["services"]), + first_ts=g["first_ts"], + last_ts=g["last_ts"], + ) + ) + + views.sort(key=lambda v: v.count, reverse=True) + return views diff --git a/repi/retrieval/timeline_view.py b/repi/retrieval/timeline_view.py new file mode 100644 index 0000000..f7a34af --- /dev/null +++ b/repi/retrieval/timeline_view.py @@ -0,0 +1,100 @@ +"""Build a user-facing timeline from a retrieved chunk set. + +The user-visible product framing: "Repi ingests logs, indexes them with +hybrid retrieval, clusters related events, **builds incident timelines**, +and can launch autonomous root-cause investigation." This module is the +**timelines** word in that sentence. + +Unlike repi.investigation.tools.get_timeline (an internal ReAct tool that +takes chunk_ids and does a SELECT), this runs over the chunks the chat +path already hydrated — no second DB roundtrip. The output is a +chronologically ordered, run-collapsed view: adjacent rows with the same +(service, level, signature) become one entry carrying `repeat_count` and +a `first_ts` / `last_ts` range, so the UI shows "auth-service ERROR x12 +14:02–14:04" instead of twelve near-identical lines. + +Signature extraction reuses cluster_view.extract_signature to pull the +templated `text` body apart — the ingestor stores rows as +`"Signature: \\nExamples: ..."` (see log_ingestor.py). +""" +from __future__ import annotations + +import logging +from typing import List, Optional, TypedDict + +from repi.retrieval.cluster_view import extract_signature + +logger = logging.getLogger(__name__) + + +class TimelineEntry(TypedDict): + service: Optional[str] + level: Optional[str] + signature: str + first_ts: str + last_ts: str + repeat_count: int + + +def build_timeline(chunks: List[dict]) -> List[TimelineEntry]: + """Project `chunks` to a chronological, run-collapsed timeline. + + Each input dict is expected in the shape the chat path produces: + `{chunk_id, service, level, timestamp, text, ...}`. Chunks without a + timestamp are dropped — placing them in chronological order would + require fabricating a position, and a "where exactly" question is + what a timeline answers. ISO8601 strings sort lexically correctly, + which is what `_dh.to_iso` already produces upstream. + + Collapsing is on identical (service, level, signature). Two ERROR + hits and one INFO hit with the same signature stay separate — they + are different events for the human reader. + """ + timestamped = [c for c in chunks if c.get("timestamp")] + if not timestamped: + return [] + + ordered = sorted(timestamped, key=lambda c: c["timestamp"]) + + entries: list[TimelineEntry] = [] + skipped = 0 + for c in ordered: + sig = extract_signature(c.get("text") or "") + if not sig: + # No signature → can't form a run key, can't render meaningfully. + # Tally so a spike in untemplated chunks shows up in logs (signals + # ingest drift or external import contamination). + skipped += 1 + continue + service = c.get("service") + level = c.get("level") + ts = c["timestamp"] + + if entries: + last = entries[-1] + if ( + last["service"] == service + and last["level"] == level + and last["signature"] == sig + ): + # Same run — extend the range, bump the counter. + last["last_ts"] = ts + last["repeat_count"] += 1 + continue + + entries.append( + TimelineEntry( + service=service, + level=level, + signature=sig, + first_ts=ts, + last_ts=ts, + repeat_count=1, + ) + ) + + if skipped: + logger.debug( + "build_timeline: skipped %d chunk(s) without a signature", skipped + ) + return entries diff --git a/repi/worker.py b/repi/worker.py index 1f7b814..1615663 100644 --- a/repi/worker.py +++ b/repi/worker.py @@ -112,8 +112,8 @@ async def handle_file_change(self, file_path: str): return ingestor = self.container.get_ingestor(session) - count = await ingestor.ingest(new_content, config.service_name) - logger.info(f"Ingested {count} chunks from {file_path}") + stats = await ingestor.ingest(new_content, config.service_name) + logger.info(f"Ingested {stats.chunk_count} chunks from {file_path}") await self.update_offset(session, config.id, file_path, file_size) diff --git a/tests/api/test_chat_clusters.py b/tests/api/test_chat_clusters.py new file mode 100644 index 0000000..b8cb96a --- /dev/null +++ b/tests/api/test_chat_clusters.py @@ -0,0 +1,67 @@ +"""Verify the /chat `done` event carries a `clusters` payload. + +We test the serialization shape — cluster_chunks itself is exercised +exhaustively in tests/retrieval/test_cluster_view.py. The chat path +is a streaming SSE handler; full end-to-end coverage lives in the +eval suite. +""" +from __future__ import annotations + +from repi.retrieval.cluster_view import cluster_chunks + + +def _chunk(sig: str, service: str, ts: str): + return { + "chunk_id": "id-" + sig, + "service": service, + "level": "ERROR", + "timestamp": ts, + "text": f"Signature: {sig}\nExamples: x", + } + + +def test_chat_done_clusters_payload_shape(): + """The done event's `clusters` key is a list[dict] with the exact + field names the UI panel binds to. This is the contract that + web/components/chat/EventClusters.tsx renders against.""" + chunks = [ + _chunk("jwt failed", "auth-service", "2026-06-08T14:02:00"), + _chunk("jwt failed", "auth-service", "2026-06-08T14:03:00"), + _chunk("jwt failed", "api-gateway", "2026-06-08T14:04:00"), + _chunk("db timeout", "payments-api", "2026-06-08T14:05:00"), + _chunk("db timeout", "payments-api", "2026-06-08T14:06:00"), + ] + + # Mirror the in-handler projection from repi/api/chat.py + clusters_payload = [ + { + "signature": v.signature, + "count": v.count, + "services": v.services, + "first_ts": v.first_ts, + "last_ts": v.last_ts, + } + for v in cluster_chunks(chunks) + ] + + assert len(clusters_payload) == 2 + # Sorted by count desc. + assert clusters_payload[0]["signature"] == "jwt failed" + assert clusters_payload[0]["count"] == 3 + assert clusters_payload[0]["services"] == ["api-gateway", "auth-service"] + assert clusters_payload[1]["signature"] == "db timeout" + assert clusters_payload[1]["count"] == 2 + + +def test_chat_done_clusters_empty_when_all_singletons(): + """No cluster has ≥2 hits → empty list. The UI can hide the panel + entirely; we don't yield a sentinel.""" + chunks = [ + _chunk("a", "svc-a", "2026-06-08T14:02:00"), + _chunk("b", "svc-b", "2026-06-08T14:03:00"), + ] + payload = [ + {"signature": v.signature, "count": v.count} + for v in cluster_chunks(chunks) + ] + assert payload == [] diff --git a/tests/api/test_chat_timeline.py b/tests/api/test_chat_timeline.py new file mode 100644 index 0000000..2ef3ad8 --- /dev/null +++ b/tests/api/test_chat_timeline.py @@ -0,0 +1,46 @@ +"""Verify the /chat `done` event carries a `timeline` payload. + +build_timeline itself is exercised in tests/retrieval/test_timeline_view.py. +Here we pin the wire-contract shape the UI's Timeline.tsx binds to. +""" +from __future__ import annotations + +from repi.retrieval.timeline_view import build_timeline + + +def _chunk(sig: str, service: str, level: str, ts: str): + return { + "chunk_id": f"id-{sig}-{ts}", + "service": service, + "level": level, + "timestamp": ts, + "text": f"Signature: {sig}\nExamples: x", + } + + +def test_chat_done_timeline_payload_shape(): + """Exact keys the web/components/chat/Timeline.tsx component reads.""" + chunks = [ + _chunk("jwt failed", "auth-service", "ERROR", "2026-06-09T14:02:00"), + _chunk("jwt failed", "auth-service", "ERROR", "2026-06-09T14:03:00"), + _chunk("db timeout", "payments", "ERROR", "2026-06-09T14:05:00"), + ] + + timeline = build_timeline(chunks) + + assert len(timeline) == 2 + assert set(timeline[0].keys()) == { + "service", "level", "signature", + "first_ts", "last_ts", "repeat_count", + } + assert timeline[0]["service"] == "auth-service" + assert timeline[0]["repeat_count"] == 2 + assert timeline[1]["service"] == "payments" + assert timeline[1]["repeat_count"] == 1 + + +def test_chat_done_timeline_empty_when_no_timestamps(): + chunks = [ + {"text": "Signature: x\nExamples: y", "service": "svc", "level": "ERROR", "timestamp": None} + ] + assert build_timeline(chunks) == [] diff --git a/tests/api/test_chat_timestamp_normalisation.py b/tests/api/test_chat_timestamp_normalisation.py new file mode 100644 index 0000000..82c8d57 --- /dev/null +++ b/tests/api/test_chat_timestamp_normalisation.py @@ -0,0 +1,39 @@ +"""Pin the chat path's timestamp invariant. + +Downstream cluster_view and timeline_view do string comparisons (`<`, `>`, +`sorted(...)`) on `chunks[i]["timestamp"]`. If one chunk in that list +carries a `datetime` and another a `str`, the comparison TypeErrors at +runtime — which is exactly the failure mode the PR #71 review flagged. + +`repi/api/chat.py::_normalize_ts` is the single point both feed paths +(RRF retrieval, entity-bias merge from find_logs_by_id) run through. +This test pins its contract so a future change can't reintroduce mixed +shapes. +""" +from __future__ import annotations + +from datetime import datetime, timezone + +from repi.api.chat import _normalize_ts + + +def test_normalize_ts_none_passthrough(): + assert _normalize_ts(None) is None + + +def test_normalize_ts_aware_datetime_becomes_iso_string(): + out = _normalize_ts(datetime(2026, 6, 9, 14, 2, 0, tzinfo=timezone.utc)) + assert isinstance(out, str) + assert out.startswith("2026-06-09T14:02:00") + + +def test_normalize_ts_naive_datetime_becomes_iso_string(): + out = _normalize_ts(datetime(2026, 6, 9, 14, 2, 0)) + assert isinstance(out, str) + assert out.startswith("2026-06-09T14:02:00") + + +def test_normalize_ts_existing_string_passthrough(): + """find_logs_by_id already ISO-stringifies upstream; passing its output + through the helper is idempotent.""" + assert _normalize_ts("2026-06-09T14:02:00") == "2026-06-09T14:02:00" diff --git a/tests/ingestion/__init__.py b/tests/ingestion/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/ingestion/test_log_parser.py b/tests/ingestion/test_log_parser.py new file mode 100644 index 0000000..4f53cef --- /dev/null +++ b/tests/ingestion/test_log_parser.py @@ -0,0 +1,91 @@ +"""Parser coverage for real-world log formats (LogHub-style samples). + +These formats previously fell through to the plain-message fallback, which +stored every line as level=INFO with no timestamp — silently disabling +time-window and level filtering for real logs. +""" +from datetime import datetime, timedelta + +from repi.ingestion.log_parser import parse_log_line + + +# ── Synthetic / app-log format (regression) ────────────────────────────────── + +def test_iso_z_format_still_parses(): + p = parse_log_line("2026-05-01T21:50:00Z [ERROR] DB connection refused") + assert p.level == "ERROR" + assert p.message == "DB connection refused" + assert p.parsed_timestamp == datetime(2026, 5, 1, 21, 50, 0) + + +def test_json_format_still_parses(): + p = parse_log_line('{"timestamp": "2026-05-01T21:50:00Z", "level": "warn", "message": "slow query"}') + assert p.level == "WARN" + assert p.message == "slow query" + assert p.parsed_timestamp == datetime(2026, 5, 1, 21, 50, 0) + + +# ── log4j / logback (e.g. Zookeeper, Kafka) ────────────────────────────────── + +def test_log4j_comma_millis_timestamp_and_level(): + p = parse_log_line( + "2015-07-29 17:41:44,747 - INFO [QuorumPeer[myid=1]:FastLeaderElection@774] - Notification time out: 3200" + ) + assert p.level == "INFO" + assert p.parsed_timestamp == datetime(2015, 7, 29, 17, 41, 44, 747000) + + +def test_log4j_warn_level_not_downgraded_to_info(): + p = parse_log_line( + "2015-07-29 17:41:45,000 - WARN [SendWorker:188978561024:QuorumCnxManager$SendWorker@679] - Interrupted while waiting for message" + ) + assert p.level == "WARN" + assert p.parsed_timestamp == datetime(2015, 7, 29, 17, 41, 45) + + +# ── Syslog (e.g. sshd) ──────────────────────────────────────────────────────── + +def test_syslog_timestamp_extracted(): + p = parse_log_line("Dec 10 06:55:46 LabSZ sshd[24200]: Invalid user webmaster from 173.234.31.186") + assert p.parsed_timestamp is not None + assert (p.parsed_timestamp.month, p.parsed_timestamp.day) == (12, 10) + assert p.parsed_timestamp.hour == 6 + assert p.message == "LabSZ sshd[24200]: Invalid user webmaster from 173.234.31.186" + assert p.level == "INFO" + + +def test_syslog_year_inference_never_lands_in_future(): + p = parse_log_line("Dec 10 06:55:46 LabSZ sshd[24200]: Invalid user webmaster from 173.234.31.186") + assert p.parsed_timestamp <= datetime.utcnow() + timedelta(days=1) + + +def test_syslog_single_digit_day(): + p = parse_log_line("Jun 9 11:42:01 host cron[123]: (root) CMD (run-parts /etc/cron.hourly)") + assert p.parsed_timestamp is not None + assert (p.parsed_timestamp.month, p.parsed_timestamp.day) == (6, 9) + + +def test_syslog_error_body_tagged_error(): + p = parse_log_line( + "Dec 10 07:28:03 LabSZ sshd[24245]: error: maximum authentication attempts exceeded for root [preauth]" + ) + assert p.level == "ERROR" + + +# ── Apache/nginx access log ────────────────────────────────────────────────── + +def test_access_log_timestamp_normalised_to_utc(): + p = parse_log_line('66.249.66.1 - - [10/Oct/2025:13:55:36 -0700] "GET /index.html HTTP/1.1" 200 2326') + assert p.parsed_timestamp == datetime(2025, 10, 10, 20, 55, 36) + assert p.parsed_timestamp.tzinfo is None + assert p.message.startswith("66.249.66.1 ") + assert '"GET /index.html HTTP/1.1" 200 2326' in p.message + + +# ── Fallback ───────────────────────────────────────────────────────────────── + +def test_unknown_format_falls_back_to_plain_message(): + p = parse_log_line("completely unstructured line with no timestamp") + assert p.level == "INFO" + assert p.parsed_timestamp is None + assert p.message == "completely unstructured line with no timestamp" diff --git a/tests/retrieval/test_cluster_view.py b/tests/retrieval/test_cluster_view.py new file mode 100644 index 0000000..ad04256 --- /dev/null +++ b/tests/retrieval/test_cluster_view.py @@ -0,0 +1,97 @@ +"""Pure-function tests for runtime event clustering over a retrieved chunk set. + +The contract: + - Group by signature extracted from the templated `text` field. + - Aggregate count, service set, and time range per signature. + - Drop singletons (default min_count=2) so the panel surfaces real incidents. + - Sort by count desc — the dominant event leads. +""" +from __future__ import annotations + +from repi.retrieval.cluster_view import ( + ClusterView, + cluster_chunks, + extract_signature, +) + + +def _chunk(text: str, service: str, timestamp: str): + return {"text": text, "service": service, "timestamp": timestamp} + + +def testextract_signature_from_templated_text(): + """Ingest path stores `"Signature: ...\\nExamples: ..."` — pull the + signature back out for clustering.""" + body = "Signature: JWT verification failed for token \nExamples: token=1 token=2" + assert extract_signature(body) == "JWT verification failed for token " + + +def testextract_signature_returns_empty_for_un_templated_text(caplog): + """A chunk without the 'Signature: ' prefix is dual-source state + (external import or pre-ingestor data). Re-running the masking regex + over the whole body would silently mis-cluster — wrong signatures + derived from 'Examples: ...' tokens. Contract: return empty + warn so + cluster_chunks skips the chunk and we can spot the drift in logs.""" + import logging + with caplog.at_level(logging.WARNING, logger="repi.retrieval.cluster_view"): + sig = extract_signature("INFO: user 1234 logged in from 10.0.0.1") + assert sig == "" + assert any("without 'Signature:' prefix" in r.message for r in caplog.records) + + +def test_returns_empty_for_no_chunks(): + assert cluster_chunks([]) == [] + + +def test_drops_singletons_by_default(): + """A cluster of size 1 isn't a meaningful incident — the timeline panel + already shows it once. The clusters panel surfaces compressions.""" + chunks = [ + _chunk("Signature: lonely event\nExamples: x", "svc-a", "2026-06-08T14:00:00"), + _chunk("Signature: repeated\nExamples: y", "svc-a", "2026-06-08T14:01:00"), + _chunk("Signature: repeated\nExamples: z", "svc-b", "2026-06-08T14:02:00"), + ] + out = cluster_chunks(chunks) + assert [v.signature for v in out] == ["repeated"] + + +def test_aggregates_count_services_and_time_range(): + chunks = [ + _chunk("Signature: jwt failed\nExamples: a", "auth-service", "2026-06-08T14:02:00"), + _chunk("Signature: jwt failed\nExamples: b", "auth-service", "2026-06-08T14:04:00"), + _chunk("Signature: jwt failed\nExamples: c", "api-gateway", "2026-06-08T14:03:00"), + ] + out = cluster_chunks(chunks) + assert len(out) == 1 + v = out[0] + assert v.signature == "jwt failed" + assert v.count == 3 + assert v.services == ["api-gateway", "auth-service"] # sorted, deduped + assert v.first_ts == "2026-06-08T14:02:00" + assert v.last_ts == "2026-06-08T14:04:00" + + +def test_sorted_by_count_descending(): + """Dominant event leads — 'compress thousands of logs into a few + meaningful incidents' framing requires the biggest cluster first.""" + chunks = ( + [_chunk("Signature: small\nExamples: x", "svc-a", "t1")] * 2 + + [_chunk("Signature: huge\nExamples: y", "svc-b", "t2")] * 8 + + [_chunk("Signature: medium\nExamples: z", "svc-c", "t3")] * 4 + ) + out = cluster_chunks(chunks) + assert [v.signature for v in out] == ["huge", "medium", "small"] + assert [v.count for v in out] == [8, 4, 2] + + +def test_missing_timestamps_dont_break_aggregation(): + """Some chunks may have a None timestamp (legacy or bad parse). They + should still join their signature group; just no time-range contribution.""" + chunks = [ + {"text": "Signature: orphan\nExamples: a", "service": "svc-a", "timestamp": None}, + {"text": "Signature: orphan\nExamples: b", "service": "svc-a", "timestamp": "2026-06-08T14:02:00"}, + ] + out = cluster_chunks(chunks) + assert out[0].count == 2 + assert out[0].first_ts == "2026-06-08T14:02:00" + assert out[0].last_ts == "2026-06-08T14:02:00" diff --git a/tests/retrieval/test_timeline_view.py b/tests/retrieval/test_timeline_view.py new file mode 100644 index 0000000..3bf4e29 --- /dev/null +++ b/tests/retrieval/test_timeline_view.py @@ -0,0 +1,125 @@ +"""Pure-function tests for the chat-path timeline view. + +Contract: + - Sort retrieved chunks by timestamp. + - Collapse runs of adjacent (service, level, signature) into one entry + with first_ts/last_ts and repeat_count. + - Drop chunks without timestamps; a timeline can't place them. + - Different log levels for the same signature stay as separate entries. +""" +from __future__ import annotations + +from repi.retrieval.timeline_view import build_timeline + + +def _chunk(text: str, service: str, level: str, timestamp: str | None): + return { + "chunk_id": f"c-{service}-{timestamp}", + "service": service, + "level": level, + "timestamp": timestamp, + "text": text, + } + + +def test_returns_empty_for_no_chunks(): + assert build_timeline([]) == [] + + +def test_returns_empty_when_all_chunks_lack_timestamp(): + chunks = [_chunk("Signature: x\nExamples: y", "svc", "ERROR", None)] + assert build_timeline(chunks) == [] + + +def test_single_chunk_produces_one_entry(): + chunks = [_chunk("Signature: jwt failed\nExamples: a", "auth", "ERROR", "2026-06-09T14:02:00")] + out = build_timeline(chunks) + assert len(out) == 1 + assert out[0]["repeat_count"] == 1 + assert out[0]["first_ts"] == out[0]["last_ts"] == "2026-06-09T14:02:00" + assert out[0]["signature"] == "jwt failed" + + +def test_consecutive_run_collapses_with_range_and_count(): + """The whole point of the run-collapse — 'x12 over 14:02–14:04'.""" + chunks = [ + _chunk("Signature: jwt failed\nExamples: a", "auth", "ERROR", "2026-06-09T14:02:00"), + _chunk("Signature: jwt failed\nExamples: b", "auth", "ERROR", "2026-06-09T14:03:00"), + _chunk("Signature: jwt failed\nExamples: c", "auth", "ERROR", "2026-06-09T14:04:00"), + ] + out = build_timeline(chunks) + assert len(out) == 1 + assert out[0]["repeat_count"] == 3 + assert out[0]["first_ts"] == "2026-06-09T14:02:00" + assert out[0]["last_ts"] == "2026-06-09T14:04:00" + + +def test_different_services_dont_collapse_even_with_same_signature(): + """Cross-service signature match is a coincidence, not a run. + Two services emitting "auth check failed" is two events, not one.""" + chunks = [ + _chunk("Signature: auth check failed\nExamples: x", "auth-service", "ERROR", "2026-06-09T14:02:00"), + _chunk("Signature: auth check failed\nExamples: y", "api-gateway", "ERROR", "2026-06-09T14:03:00"), + ] + out = build_timeline(chunks) + assert [e["service"] for e in out] == ["auth-service", "api-gateway"] + + +def test_different_levels_dont_collapse_even_with_same_signature(): + """An ERROR and a WARNING with the same masked template are two + different observations — INFO setup ≠ ERROR fallout.""" + chunks = [ + _chunk("Signature: token validation\nExamples: a", "auth", "INFO", "2026-06-09T14:02:00"), + _chunk("Signature: token validation\nExamples: b", "auth", "ERROR", "2026-06-09T14:03:00"), + ] + out = build_timeline(chunks) + assert len(out) == 2 + assert out[0]["level"] == "INFO" + assert out[1]["level"] == "ERROR" + + +def test_runs_break_then_resume_produce_separate_entries(): + """A B A → three entries. The two A's are not consecutive.""" + chunks = [ + _chunk("Signature: A\nExamples: 1", "svc-a", "ERROR", "2026-06-09T14:00:00"), + _chunk("Signature: A\nExamples: 2", "svc-a", "ERROR", "2026-06-09T14:01:00"), + _chunk("Signature: B\nExamples: 1", "svc-a", "ERROR", "2026-06-09T14:02:00"), + _chunk("Signature: A\nExamples: 3", "svc-a", "ERROR", "2026-06-09T14:03:00"), + ] + out = build_timeline(chunks) + sigs = [(e["signature"], e["repeat_count"]) for e in out] + assert sigs == [("A", 2), ("B", 1), ("A", 1)] + + +def test_unsorted_input_is_sorted_chronologically(): + """The chat path's chunks are RRF-ranked, not time-sorted. The timeline + must impose chronological order.""" + chunks = [ + _chunk("Signature: B\nExamples: y", "svc", "ERROR", "2026-06-09T14:03:00"), + _chunk("Signature: A\nExamples: x", "svc", "ERROR", "2026-06-09T14:01:00"), + _chunk("Signature: C\nExamples: z", "svc", "ERROR", "2026-06-09T14:05:00"), + ] + out = build_timeline(chunks) + assert [e["signature"] for e in out] == ["A", "B", "C"] + + +def test_chunks_without_timestamps_are_dropped(): + """Mixed input — only the timestamped ones land in the timeline.""" + chunks = [ + _chunk("Signature: orphan\nExamples: x", "svc", "ERROR", None), + _chunk("Signature: anchored\nExamples: y", "svc", "ERROR", "2026-06-09T14:00:00"), + ] + out = build_timeline(chunks) + assert len(out) == 1 + assert out[0]["signature"] == "anchored" + + +def test_chunks_without_signature_are_dropped(): + """Defensive: empty text → no signature → can't form a run key.""" + chunks = [ + {"text": "", "service": "svc", "level": "ERROR", "timestamp": "2026-06-09T14:00:00"}, + _chunk("Signature: real\nExamples: y", "svc", "ERROR", "2026-06-09T14:01:00"), + ] + out = build_timeline(chunks) + assert len(out) == 1 + assert out[0]["signature"] == "real" diff --git a/tests/test_cli_version_doctor.py b/tests/test_cli_version_doctor.py index 9165a70..f434e92 100644 --- a/tests/test_cli_version_doctor.py +++ b/tests/test_cli_version_doctor.py @@ -6,11 +6,25 @@ from typer.testing import CliRunner +import repi.cli as cli_mod from repi.cli import app runner = CliRunner() +def _isolate_config(monkeypatch, tmp_path): + """Point the doctor's config-presence check at a tmp config so the test + doesn't depend on whether the dev machine (or CI runner) has a real + .repi/config.json — CI runs from a fresh checkout where it never exists.""" + cfg_dir = tmp_path / ".repi" + cfg_dir.mkdir() + cfg_file = cfg_dir / "config.json" + cfg_file.write_text("{}") + monkeypatch.setattr(cli_mod, "REPO_ROOT", tmp_path) + monkeypatch.setattr(cli_mod, "CONFIG_DIR", cfg_dir) + monkeypatch.setattr(cli_mod, "CONFIG_FILE", cfg_file) + + def test_version_flag_prints_version_and_exits(): result = runner.invoke(app, ["--version"]) assert result.exit_code == 0 @@ -18,8 +32,9 @@ def test_version_flag_prints_version_and_exits(): assert re.match(r"^(\d+\.\d+\.\d+|0\.0\.0\+unknown)", result.stdout.strip()) -def test_doctor_passes_when_all_checks_pass(): +def test_doctor_passes_when_all_checks_pass(monkeypatch, tmp_path): """All checks mocked to PASS — doctor exits 0.""" + _isolate_config(monkeypatch, tmp_path) with patch("repi.cli._read_db_url", return_value="postgresql+asyncpg://x@y/z"), \ patch("repi.cli._check_postgres", new=AsyncMock(return_value=(True, "PostgreSQL 16"))), \ patch("repi.cli._check_pgvector", new=AsyncMock(return_value=(True, "vector 0.8.2"))), \ @@ -31,8 +46,9 @@ def test_doctor_passes_when_all_checks_pass(): assert "FAIL" not in result.stdout -def test_doctor_fails_when_postgres_unreachable(): +def test_doctor_fails_when_postgres_unreachable(monkeypatch, tmp_path): """A failing check produces nonzero exit + FAIL in output.""" + _isolate_config(monkeypatch, tmp_path) with patch("repi.cli._read_db_url", return_value="postgresql+asyncpg://x@y/z"), \ patch("repi.cli._check_postgres", new=AsyncMock(return_value=(False, "ConnectionRefusedError"))), \ patch("repi.cli._check_redis", new=AsyncMock(return_value=(True, "PING ok"))), \ diff --git a/tests/test_config_path_resolution.py b/tests/test_config_path_resolution.py new file mode 100644 index 0000000..123cb8a --- /dev/null +++ b/tests/test_config_path_resolution.py @@ -0,0 +1,36 @@ +"""CONFIG_PATH must not depend on which directory the process happens to start +in. Before _resolve_config_path, `repi serve` launched from any directory other +than the repo root silently fell back to class defaults (OpenAI provider, no +key) because the cwd-relative `.repi/config.json` didn't exist there.""" +import json +import os +from pathlib import Path + +from repi.core.config import _resolve_config_path + + +def test_finds_config_in_cwd(tmp_path, monkeypatch): + (tmp_path / ".repi").mkdir() + cfg = tmp_path / ".repi" / "config.json" + cfg.write_text(json.dumps({"LLM_PROVIDER": "mistral"})) + monkeypatch.chdir(tmp_path) + assert _resolve_config_path() == cfg + + +def test_finds_config_in_parent_when_run_from_subdir(tmp_path, monkeypatch): + (tmp_path / ".repi").mkdir() + cfg = tmp_path / ".repi" / "config.json" + cfg.write_text(json.dumps({"LLM_PROVIDER": "mistral"})) + subdir = tmp_path / "tmp-ui-tests" / "deep" + subdir.mkdir(parents=True) + monkeypatch.chdir(subdir) + assert _resolve_config_path() == cfg + + +def test_falls_back_to_relative_default_when_nothing_exists(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + resolved = _resolve_config_path() + # Either the cwd-relative default (fresh machine) or the repo checkout's + # own .repi/config.json found via the package anchor — both are stable + # locations; what matters is it never silently resolves elsewhere. + assert resolved.name == "config.json" and resolved.parent.name == ".repi" diff --git a/tests/worker/test_offset_resume.py b/tests/worker/test_offset_resume.py index 09f6832..3035e75 100644 --- a/tests/worker/test_offset_resume.py +++ b/tests/worker/test_offset_resume.py @@ -2,6 +2,7 @@ from unittest.mock import AsyncMock, MagicMock from uuid import uuid4 from repi.worker import IngestionWorker +from repi.ingestion.log_ingestor import IngestStats from repi.models.schema import WatcherConfig, WatcherOffset @pytest.mark.asyncio @@ -32,7 +33,7 @@ async def test_offset_resume(tmp_path): mock_session.commit = AsyncMock() mock_session.refresh = AsyncMock() mock_ingestor = AsyncMock() - mock_ingestor.ingest = AsyncMock(return_value=2) + mock_ingestor.ingest = AsyncMock(return_value=IngestStats(chunk_count=2)) # get_session() returns an async context manager that yields mock_session mock_ctx = MagicMock() diff --git a/tests/worker/test_watcher.py b/tests/worker/test_watcher.py index a85fc89..08eed6c 100644 --- a/tests/worker/test_watcher.py +++ b/tests/worker/test_watcher.py @@ -6,6 +6,7 @@ from datetime import datetime from repi.worker import IngestionWorker +from repi.ingestion.log_ingestor import IngestStats from repi.models.schema import WatcherConfig, WatcherOffset @pytest.mark.asyncio @@ -31,7 +32,7 @@ async def test_handle_file_change(tmp_path): mock_session.commit = AsyncMock() mock_session.refresh = AsyncMock() mock_ingestor = AsyncMock() - mock_ingestor.ingest = AsyncMock(return_value=2) + mock_ingestor.ingest = AsyncMock(return_value=IngestStats(chunk_count=2)) # get_session() returns an async context manager that yields mock_session mock_ctx = MagicMock() diff --git a/uv.lock b/uv.lock index 71bb0d6..e17ac3f 100644 --- a/uv.lock +++ b/uv.lock @@ -1350,7 +1350,7 @@ wheels = [ [[package]] name = "repi" -version = "0.1.0" +version = "0.2.0" source = { editable = "." } dependencies = [ { name = "asyncpg" }, diff --git a/web/app/page.tsx b/web/app/page.tsx index 9846063..72ebb16 100644 --- a/web/app/page.tsx +++ b/web/app/page.tsx @@ -35,6 +35,20 @@ type Turn = // `/investigate` path is intentionally stateless and ignores this. const CHAT_HISTORY_TURNS = 6 +// Pull the most recent assistant chat turn's cited chunks so the next /chat +// turn can bias retrieval toward the same service + time envelope. Stream 4 +// followup awareness — the backend treats this as a soft hint and never +// overrides explicit intent. +function lastChatChunkIds(turns: Turn[]): string[] { + for (let i = turns.length - 1; i >= 0; i--) { + const t = turns[i] + if (t.mode === "chat" && t.role === "assistant" && t.chunkIds && t.chunkIds.length > 0) { + return t.chunkIds + } + } + return [] +} + function buildChatHistory(turns: Turn[]): { role: "user" | "assistant"; content: string }[] { // Treat both chat turns and completed investigation answers as conversation // history. An investigation that hasn't produced a final answer yet is @@ -148,6 +162,7 @@ export default function HomePage() { // Send last N turns as history so the server can keep the assistant // contextual across followups (lite — no DB lookup, no compaction). const history = buildChatHistory(turns) + const previousChunkIds = lastChatChunkIds(turns) try { const resp = await fetch(`${API_BASE}/chat`, { method: "POST", @@ -156,6 +171,7 @@ export default function HomePage() { query, conversation_id: conversationId ?? undefined, history, + previous_chunk_ids: previousChunkIds, }), }) if (!resp.ok || !resp.body) { @@ -170,7 +186,7 @@ export default function HomePage() { arr.findIndex((t) => t.mode === "chat" && (t as any).pendingId === pendingId) setTurns((prev) => [ ...prev, - { mode: "chat", role: "assistant", content: "", chunkIds: [], confidence: null, streaming: true, pendingId } as any, + { mode: "chat", role: "assistant", content: "", chunkIds: [], confidence: null, streaming: true, pendingId, query } as any, ]) const reader = resp.body.getReader() @@ -225,6 +241,9 @@ export default function HomePage() { ...cur, chunkIds: data.chunk_ids ?? [], confidence: data.confidence ?? null, + clusters: data.clusters ?? [], + timeline: data.timeline ?? [], + citedChunks: data.cited_chunks ?? [], streaming: false, } return next @@ -266,15 +285,30 @@ export default function HomePage() {

Chat with your logs

- Ask a quick question for a single-shot RAG answer. Toggle{" "} - Deep Research on to run a full - multi-step investigation. + Hybrid retrieval over your ingested logs surfaces a chronological{" "} + timeline and the{" "} + event clusters behind your + question. Toggle{" "} + Deep Research for a + full autonomous root-cause investigation.

) : ( turns.map((t, i) => t.mode === "chat" ? ( - + { + // handleSend reads its second arg as the deep-research + // decision (not React state) — so the setDeepResearch + // call below is purely for UI sync (the toggle visually + // moves), not a precondition for the routing. Order + // doesn't matter; setState's asynchrony doesn't race. + setDeepResearch(true) + handleSend(q, true) + }} + /> ) : ( void } // Strip raw chunk citations the LLM may still inline despite the system @@ -31,9 +41,43 @@ export function ChatMessageView({ confidence, isClarification, streaming, + clusters, + timeline, + citedChunks, + query, + onInvestigateDeeper, }: ChatMessageProps) { const isUser = role === "user" const displayed = isUser ? content : cleanContent(content) + + // Lift open state out of the three panels so the quick-action buttons can + // open them on demand. Uncontrolled fallback inside each panel handles + // the no-button path. + const [timelineOpen, setTimelineOpen] = useState(undefined) + const [clustersOpen, setClustersOpen] = useState(undefined) + const [chunksOpen, setChunksOpen] = useState(undefined) + + const timelineRef = useRef(null) + const clustersRef = useRef(null) + const chunksRef = useRef(null) + + const showAndScroll = ( + setter: (v: boolean) => void, + ref: React.RefObject, + ) => { + setter(true) + // Defer the scroll so the panel has rendered open before measuring. + requestAnimationFrame(() => { + ref.current?.scrollIntoView({ behavior: "smooth", block: "nearest" }) + }) + } + + const hasTimeline = !!(timeline && timeline.length > 0) + const hasClusters = !!(clusters && clusters.length > 0) + const hasChunks = !!(citedChunks && citedChunks.length > 0) + const showQuickActions = + !isUser && !streaming && (hasTimeline || hasClusters || (onInvestigateDeeper && query)) + return (
{!isUser && ( @@ -58,6 +102,7 @@ export function ChatMessageView({ )} {displayed || (streaming ? : "")}
+ {!isUser && confidence && (
@@ -65,6 +110,60 @@ export function ChatMessageView({
)} + + {showQuickActions && ( +
+ {hasTimeline && ( + + )} + {hasClusters && ( + + )} + {onInvestigateDeeper && query && ( + + )} +
+ )} + + {hasTimeline && ( +
+ +
+ )} + {hasClusters && ( +
+ +
+ )} + {hasChunks && ( +
+ +
+ )} {isUser && (
diff --git a/web/components/chat/CitedChunks.tsx b/web/components/chat/CitedChunks.tsx new file mode 100644 index 0000000..df57ac7 --- /dev/null +++ b/web/components/chat/CitedChunks.tsx @@ -0,0 +1,84 @@ +"use client" + +import { useState } from "react" +import { Badge } from "@/components/ui/badge" +import { ChevronDown, ChevronRight, FileText } from "lucide-react" + +export type CitedChunk = { + chunk_id: string + service: string | null + level: string | null + timestamp: string | null + text: string +} + +function shortTs(iso: string | null): string { + if (!iso) return "" + return iso.includes("T") ? iso.split("T")[1].slice(0, 8) : iso +} + +export function CitedChunks({ + chunks, + open: openProp, + onOpenChange, +}: { + chunks: CitedChunk[] + open?: boolean + onOpenChange?: (open: boolean) => void +}) { + // Default closed — raw chunks are the debug view; users seeking them + // know to click. Timeline and clusters carry the story. + const [openUncontrolled, setOpenUncontrolled] = useState(false) + const open = openProp ?? openUncontrolled + const setOpen = (v: boolean) => { + if (onOpenChange) onOpenChange(v) + else setOpenUncontrolled(v) + } + + if (!chunks || chunks.length === 0) return null + + return ( +
+ + {open && ( +
+ {chunks.map((c) => ( +
+
+ {c.service && ( + + {c.service} + + )} + {c.level && ( + + {c.level} + + )} + {c.timestamp && ( + + {shortTs(c.timestamp)} + + )} +
+
+                {c.text}
+              
+
+ ))} +
+ )} +
+ ) +} diff --git a/web/components/chat/EventClusters.tsx b/web/components/chat/EventClusters.tsx new file mode 100644 index 0000000..d7cc00f --- /dev/null +++ b/web/components/chat/EventClusters.tsx @@ -0,0 +1,98 @@ +"use client" + +import { useState } from "react" +import { Badge } from "@/components/ui/badge" +import { ChevronDown, ChevronRight, Layers } from "lucide-react" +import { cn } from "@/lib/utils" + +export type Cluster = { + signature: string + count: number + services: string[] + first_ts: string | null + last_ts: string | null +} + +function formatRange(first: string | null, last: string | null): string { + if (!first && !last) return "" + if (first === last || !last) return formatTs(first) + if (!first) return formatTs(last) + return `${formatTs(first)}–${formatTs(last)}` +} + +function formatTs(iso: string | null): string { + if (!iso) return "" + // Show HH:MM:SS for tight inline display. Full timestamp on hover via title. + const t = iso.includes("T") ? iso.split("T")[1].slice(0, 8) : iso + return t +} + +export function EventClusters({ + clusters, + open: openProp, + onOpenChange, +}: { + clusters: Cluster[] + open?: boolean + onOpenChange?: (open: boolean) => void +}) { + const [openUncontrolled, setOpenUncontrolled] = useState(clusters.length > 0 && clusters.length <= 5) + const open = openProp ?? openUncontrolled + const setOpen = (v: boolean) => { + if (onOpenChange) onOpenChange(v) + else setOpenUncontrolled(v) + } + + if (!clusters || clusters.length === 0) return null + + return ( +
+ + {open && ( +
+ {clusters.map((c, i) => ( +
+
+ + {c.count}× + + + {c.signature} + +
+
+ {c.services.map((s) => ( + + {s} + + ))} + {(c.first_ts || c.last_ts) && ( + + {formatRange(c.first_ts, c.last_ts)} + + )} +
+
+ ))} +
+ )} +
+ ) +} diff --git a/web/components/chat/Timeline.tsx b/web/components/chat/Timeline.tsx new file mode 100644 index 0000000..1414d0d --- /dev/null +++ b/web/components/chat/Timeline.tsx @@ -0,0 +1,124 @@ +"use client" + +import { useState } from "react" +import { Badge } from "@/components/ui/badge" +import { ChevronDown, ChevronRight, Clock } from "lucide-react" +import { cn } from "@/lib/utils" + +export type TimelineEntry = { + service: string | null + level: string | null + signature: string + first_ts: string + last_ts: string + repeat_count: number +} + +function formatTs(iso: string): string { + if (!iso) return "" + return iso.includes("T") ? iso.split("T")[1].slice(0, 8) : iso +} + +function formatRange(first: string, last: string): string { + if (first === last) return formatTs(first) + return `${formatTs(first)}–${formatTs(last)}` +} + +function levelTone(level: string | null): string { + switch ((level || "").toUpperCase()) { + case "ERROR": + case "CRITICAL": + case "FATAL": + return "text-red-500 border-red-500/30" + case "WARNING": + case "WARN": + return "text-amber-500 border-amber-500/30" + case "INFO": + return "text-blue-500 border-blue-500/30" + default: + return "text-muted-foreground border-border" + } +} + +// Optional controlled `open` so a parent (e.g. a quick-action button on the +// assistant turn) can force the panel open. Falls back to uncontrolled +// internal state when omitted. +export function Timeline({ + entries, + open: openProp, + onOpenChange, +}: { + entries: TimelineEntry[] + open?: boolean + onOpenChange?: (open: boolean) => void +}) { + const [openUncontrolled, setOpenUncontrolled] = useState(entries.length > 0 && entries.length <= 15) + const open = openProp ?? openUncontrolled + const setOpen = (v: boolean) => { + if (onOpenChange) onOpenChange(v) + else setOpenUncontrolled(v) + } + + if (!entries || entries.length === 0) return null + + return ( +
+ + {open && ( +
+ {entries.map((e, i) => ( +
+
+
{formatTs(e.first_ts)}
+ {e.last_ts !== e.first_ts && ( +
+ →{formatTs(e.last_ts)} +
+ )} +
+
+
+ {e.service && ( + + {e.service} + + )} + {e.level && ( + + {e.level} + + )} + {e.repeat_count > 1 && ( + + ×{e.repeat_count} + + )} +
+ + {e.signature} + +
+
+ ))} +
+ )} +
+ ) +} diff --git a/web/package.json b/web/package.json index 8f39a96..76f6fc7 100644 --- a/web/package.json +++ b/web/package.json @@ -1,6 +1,6 @@ { "name": "web", - "version": "0.1.0", + "version": "0.2.0", "private": true, "scripts": { "dev": "next dev",