From 0b1b00e9fa3526e0e354d9541ffb8dbfcb3f4282 Mon Sep 17 00:00:00 2001 From: Varun Singh Date: Mon, 8 Jun 2026 22:08:21 +0530 Subject: [PATCH 01/12] feat(chat): event clusters surfaced in /chat done payload + UI panel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stream 2 of the drift-alignment plan — promote signature-based event clustering from an ingest-time implementation detail to a first-class product surface. Per the feedback: "Humans don't want 5000 logs. They want event clusters." - repi/retrieval/cluster_view.py extracts the signature from each retrieved chunk's templated text body (log_chunks rows store "Signature: \nExamples: ..." — we read the prefix back out rather than re-running get_signature() over the templated string) and groups by signature. Aggregates: count, deduped service set, first/last timestamp. Singletons are dropped by default; they're the per-turn timeline's job, not this panel's. - /chat emits a `clusters: [...]` key on the SSE done event. Each entry: signature, count, services, first_ts, last_ts. Empty list when nothing crosses the min_count=2 threshold; the UI then hides the panel entirely. - web/components/chat/EventClusters.tsx renders an inline collapsible card under the assistant turn: count badge, signature in mono, service badges, time range. Default-open when ≤5 clusters. - Caveat documented in the docstring and the UI subtitle: clusters are over the retrieved top-K, not a corpus-wide aggregate. A real /clusters endpoint with a first-class signature column is the next step if that distinction starts to matter. --- repi/api/chat.py | 19 +++++ repi/retrieval/cluster_view.py | 113 ++++++++++++++++++++++++++ tests/api/test_chat_clusters.py | 67 +++++++++++++++ tests/retrieval/test_cluster_view.py | 91 +++++++++++++++++++++ web/app/page.tsx | 1 + web/components/chat/ChatMessage.tsx | 6 ++ web/components/chat/EventClusters.tsx | 86 ++++++++++++++++++++ 7 files changed, 383 insertions(+) create mode 100644 repi/retrieval/cluster_view.py create mode 100644 tests/api/test_chat_clusters.py create mode 100644 tests/retrieval/test_cluster_view.py create mode 100644 web/components/chat/EventClusters.tsx diff --git a/repi/api/chat.py b/repi/api/chat.py index 593f856..8e8d39b 100644 --- a/repi/api/chat.py +++ b/repi/api/chat.py @@ -42,6 +42,7 @@ from repi.llm.provider import Message from repi.models.filters import RetrievalFilters from repi.models.schema import ChatMessage, Conversation +from repi.retrieval.cluster_view import cluster_chunks logger = logging.getLogger("repi.api.chat") @@ -318,11 +319,29 @@ async def event_generator(): ) await session.commit() + # Event clusters across the retrieved top-K. Singletons are + # dropped (they're already in the per-turn timeline); the panel + # gives the user the "1842x JWT failures, 347x DB timeouts" + # compression rather than a raw chunk list. Caveat the UI must + # carry: this is *per-turn* over the retrieved chunks, not a + # corpus-wide aggregate. + clusters = [ + { + "signature": v.signature, + "count": v.count, + "services": v.services, + "first_ts": v.first_ts, + "last_ts": v.last_ts, + } + for v in cluster_chunks(chunks) + ] + yield _sse("done", { "chunk_ids": cited_ids, "confidence": confidence, "conversation_id": str(conversation_id), "entities": entities, + "clusters": clusters, }) except Exception as e: diff --git a/repi/retrieval/cluster_view.py b/repi/retrieval/cluster_view.py new file mode 100644 index 0000000..b28f9a2 --- /dev/null +++ b/repi/retrieval/cluster_view.py @@ -0,0 +1,113 @@ +"""Runtime event clustering over a retrieved chunk set. + +The user-visible product framing: "Repi ingests logs, indexes them with +hybrid retrieval, **clusters related events**, builds incident timelines, +and can launch autonomous root-cause investigation." This module is the +**clusters** word in that sentence. + +We do not re-run the ingest-time clustering. log_chunks already stores +each row's signature inline in `text` (see log_ingestor.py — the rows are +templated as `"Signature: \\nExamples: ..."`). We extract the +signature back out, then group the retrieved top-K so the UI can render +"3 events, 1842x · 347x · 92x" instead of 1842 individual log lines. + +This is **not** a corpus-wide cluster aggregate — it covers only the +chunks the retrieval pipeline returned for this turn. The UI label must +say so. For a corpus-wide aggregate we would add a /clusters endpoint +with a real `signature` column (Path B in the drift-alignment plan); +that's intentionally deferred. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import List, Optional + +from repi.ingestion.log_chunker import get_signature + + +def _extract_signature(chunk_text: str) -> str: + """Pull the signature back out of the templated chunk body. + + The ingestor writes `"Signature: \\nExamples: ..."`. We + take the slice between `"Signature: "` and the first newline. For chunks + not produced by our ingestor (defensive — external imports, older data), + fall back to re-running get_signature() on the raw text. + """ + if not chunk_text: + return "" + prefix = "Signature: " + if chunk_text.startswith(prefix): + rest = chunk_text[len(prefix):] + nl = rest.find("\n") + return (rest[:nl] if nl != -1 else rest).strip() + return get_signature(chunk_text).strip() + + +@dataclass(frozen=True) +class ClusterView: + signature: str + count: int + services: List[str] + first_ts: Optional[str] + last_ts: Optional[str] + + +def cluster_chunks( + chunks: List[dict], + min_count: int = 2, +) -> List[ClusterView]: + """Group `chunks` by extracted signature, drop singletons, return by count desc. + + Each chunk dict is expected in the shape the chat path already produces: + `{chunk_id, service, level, timestamp, text, ...}`. Timestamps may be ISO + strings (chat path) or naive — sort behaviour is left to lexical string + comparison, which is correct for ISO8601. + + `min_count=2` is the default because singletons are already covered by + the per-turn timeline; surfacing them here would dilute the "compress + thousands of logs into a few meaningful incidents" framing. + """ + if not chunks: + return [] + + groups: dict[str, dict] = {} + for c in chunks: + sig = _extract_signature(c.get("text") or "") + if not sig: + continue + svc = c.get("service") + ts = c.get("timestamp") + g = groups.get(sig) + if g is None: + g = { + "count": 0, + "services": set(), + "first_ts": None, + "last_ts": None, + } + groups[sig] = g + g["count"] += 1 + if svc: + g["services"].add(svc) + if ts is not None: + if g["first_ts"] is None or ts < g["first_ts"]: + g["first_ts"] = ts + if g["last_ts"] is None or ts > g["last_ts"]: + g["last_ts"] = ts + + views: list[ClusterView] = [] + for sig, g in groups.items(): + if g["count"] < min_count: + continue + views.append( + ClusterView( + signature=sig, + count=g["count"], + services=sorted(g["services"]), + first_ts=g["first_ts"], + last_ts=g["last_ts"], + ) + ) + + views.sort(key=lambda v: v.count, reverse=True) + return views diff --git a/tests/api/test_chat_clusters.py b/tests/api/test_chat_clusters.py new file mode 100644 index 0000000..b8cb96a --- /dev/null +++ b/tests/api/test_chat_clusters.py @@ -0,0 +1,67 @@ +"""Verify the /chat `done` event carries a `clusters` payload. + +We test the serialization shape — cluster_chunks itself is exercised +exhaustively in tests/retrieval/test_cluster_view.py. The chat path +is a streaming SSE handler; full end-to-end coverage lives in the +eval suite. +""" +from __future__ import annotations + +from repi.retrieval.cluster_view import cluster_chunks + + +def _chunk(sig: str, service: str, ts: str): + return { + "chunk_id": "id-" + sig, + "service": service, + "level": "ERROR", + "timestamp": ts, + "text": f"Signature: {sig}\nExamples: x", + } + + +def test_chat_done_clusters_payload_shape(): + """The done event's `clusters` key is a list[dict] with the exact + field names the UI panel binds to. This is the contract that + web/components/chat/EventClusters.tsx renders against.""" + chunks = [ + _chunk("jwt failed", "auth-service", "2026-06-08T14:02:00"), + _chunk("jwt failed", "auth-service", "2026-06-08T14:03:00"), + _chunk("jwt failed", "api-gateway", "2026-06-08T14:04:00"), + _chunk("db timeout", "payments-api", "2026-06-08T14:05:00"), + _chunk("db timeout", "payments-api", "2026-06-08T14:06:00"), + ] + + # Mirror the in-handler projection from repi/api/chat.py + clusters_payload = [ + { + "signature": v.signature, + "count": v.count, + "services": v.services, + "first_ts": v.first_ts, + "last_ts": v.last_ts, + } + for v in cluster_chunks(chunks) + ] + + assert len(clusters_payload) == 2 + # Sorted by count desc. + assert clusters_payload[0]["signature"] == "jwt failed" + assert clusters_payload[0]["count"] == 3 + assert clusters_payload[0]["services"] == ["api-gateway", "auth-service"] + assert clusters_payload[1]["signature"] == "db timeout" + assert clusters_payload[1]["count"] == 2 + + +def test_chat_done_clusters_empty_when_all_singletons(): + """No cluster has ≥2 hits → empty list. The UI can hide the panel + entirely; we don't yield a sentinel.""" + chunks = [ + _chunk("a", "svc-a", "2026-06-08T14:02:00"), + _chunk("b", "svc-b", "2026-06-08T14:03:00"), + ] + payload = [ + {"signature": v.signature, "count": v.count} + for v in cluster_chunks(chunks) + ] + assert payload == [] diff --git a/tests/retrieval/test_cluster_view.py b/tests/retrieval/test_cluster_view.py new file mode 100644 index 0000000..746c63c --- /dev/null +++ b/tests/retrieval/test_cluster_view.py @@ -0,0 +1,91 @@ +"""Pure-function tests for runtime event clustering over a retrieved chunk set. + +The contract: + - Group by signature extracted from the templated `text` field. + - Aggregate count, service set, and time range per signature. + - Drop singletons (default min_count=2) so the panel surfaces real incidents. + - Sort by count desc — the dominant event leads. +""" +from __future__ import annotations + +from repi.retrieval.cluster_view import ( + ClusterView, + cluster_chunks, + _extract_signature, +) + + +def _chunk(text: str, service: str, timestamp: str): + return {"text": text, "service": service, "timestamp": timestamp} + + +def test_extract_signature_from_templated_text(): + """Ingest path stores `"Signature: ...\\nExamples: ..."` — pull the + signature back out for clustering.""" + body = "Signature: JWT verification failed for token \nExamples: token=1 token=2" + assert _extract_signature(body) == "JWT verification failed for token " + + +def test_extract_signature_falls_back_on_external_text(): + """Defensive: a chunk not written by our ingestor (no `Signature: ` prefix) + still produces a signature via the masking regex from log_chunker.""" + sig = _extract_signature("INFO: user 1234 logged in from 10.0.0.1") + assert "" in sig # numeric masking applied + + +def test_returns_empty_for_no_chunks(): + assert cluster_chunks([]) == [] + + +def test_drops_singletons_by_default(): + """A cluster of size 1 isn't a meaningful incident — the timeline panel + already shows it once. The clusters panel surfaces compressions.""" + chunks = [ + _chunk("Signature: lonely event\nExamples: x", "svc-a", "2026-06-08T14:00:00"), + _chunk("Signature: repeated\nExamples: y", "svc-a", "2026-06-08T14:01:00"), + _chunk("Signature: repeated\nExamples: z", "svc-b", "2026-06-08T14:02:00"), + ] + out = cluster_chunks(chunks) + assert [v.signature for v in out] == ["repeated"] + + +def test_aggregates_count_services_and_time_range(): + chunks = [ + _chunk("Signature: jwt failed\nExamples: a", "auth-service", "2026-06-08T14:02:00"), + _chunk("Signature: jwt failed\nExamples: b", "auth-service", "2026-06-08T14:04:00"), + _chunk("Signature: jwt failed\nExamples: c", "api-gateway", "2026-06-08T14:03:00"), + ] + out = cluster_chunks(chunks) + assert len(out) == 1 + v = out[0] + assert v.signature == "jwt failed" + assert v.count == 3 + assert v.services == ["api-gateway", "auth-service"] # sorted, deduped + assert v.first_ts == "2026-06-08T14:02:00" + assert v.last_ts == "2026-06-08T14:04:00" + + +def test_sorted_by_count_descending(): + """Dominant event leads — 'compress thousands of logs into a few + meaningful incidents' framing requires the biggest cluster first.""" + chunks = ( + [_chunk("Signature: small\nExamples: x", "svc-a", "t1")] * 2 + + [_chunk("Signature: huge\nExamples: y", "svc-b", "t2")] * 8 + + [_chunk("Signature: medium\nExamples: z", "svc-c", "t3")] * 4 + ) + out = cluster_chunks(chunks) + assert [v.signature for v in out] == ["huge", "medium", "small"] + assert [v.count for v in out] == [8, 4, 2] + + +def test_missing_timestamps_dont_break_aggregation(): + """Some chunks may have a None timestamp (legacy or bad parse). They + should still join their signature group; just no time-range contribution.""" + chunks = [ + {"text": "Signature: orphan\nExamples: a", "service": "svc-a", "timestamp": None}, + {"text": "Signature: orphan\nExamples: b", "service": "svc-a", "timestamp": "2026-06-08T14:02:00"}, + ] + out = cluster_chunks(chunks) + assert out[0].count == 2 + assert out[0].first_ts == "2026-06-08T14:02:00" + assert out[0].last_ts == "2026-06-08T14:02:00" diff --git a/web/app/page.tsx b/web/app/page.tsx index 9846063..7e53aa7 100644 --- a/web/app/page.tsx +++ b/web/app/page.tsx @@ -225,6 +225,7 @@ export default function HomePage() { ...cur, chunkIds: data.chunk_ids ?? [], confidence: data.confidence ?? null, + clusters: data.clusters ?? [], streaming: false, } return next diff --git a/web/components/chat/ChatMessage.tsx b/web/components/chat/ChatMessage.tsx index 28fbf57..4d6b7dc 100644 --- a/web/components/chat/ChatMessage.tsx +++ b/web/components/chat/ChatMessage.tsx @@ -3,6 +3,7 @@ import { Badge } from "@/components/ui/badge" import { cn } from "@/lib/utils" import { AlertTriangle, Sparkles, User } from "lucide-react" +import { EventClusters, type Cluster } from "@/components/chat/EventClusters" export type ChatMessageProps = { role: "user" | "assistant" @@ -11,6 +12,7 @@ export type ChatMessageProps = { confidence?: "low" | "medium" | "high" | null isClarification?: boolean streaming?: boolean + clusters?: Cluster[] } // Strip raw chunk citations the LLM may still inline despite the system @@ -31,6 +33,7 @@ export function ChatMessageView({ confidence, isClarification, streaming, + clusters, }: ChatMessageProps) { const isUser = role === "user" const displayed = isUser ? content : cleanContent(content) @@ -65,6 +68,9 @@ export function ChatMessageView({ )} + {!isUser && clusters && clusters.length > 0 && ( + + )} {isUser && (
diff --git a/web/components/chat/EventClusters.tsx b/web/components/chat/EventClusters.tsx new file mode 100644 index 0000000..1b0124c --- /dev/null +++ b/web/components/chat/EventClusters.tsx @@ -0,0 +1,86 @@ +"use client" + +import { useState } from "react" +import { Badge } from "@/components/ui/badge" +import { ChevronDown, ChevronRight, Layers } from "lucide-react" +import { cn } from "@/lib/utils" + +export type Cluster = { + signature: string + count: number + services: string[] + first_ts: string | null + last_ts: string | null +} + +function formatRange(first: string | null, last: string | null): string { + if (!first && !last) return "" + if (first === last || !last) return formatTs(first) + if (!first) return formatTs(last) + return `${formatTs(first)}–${formatTs(last)}` +} + +function formatTs(iso: string | null): string { + if (!iso) return "" + // Show HH:MM:SS for tight inline display. Full timestamp on hover via title. + const t = iso.includes("T") ? iso.split("T")[1].slice(0, 8) : iso + return t +} + +export function EventClusters({ clusters }: { clusters: Cluster[] }) { + // Default open when small enough to scan at a glance. + const [open, setOpen] = useState(clusters.length > 0 && clusters.length <= 5) + + if (!clusters || clusters.length === 0) return null + + return ( +
+ + {open && ( +
+ {clusters.map((c, i) => ( +
+
+ + {c.count}× + + + {c.signature} + +
+
+ {c.services.map((s) => ( + + {s} + + ))} + {(c.first_ts || c.last_ts) && ( + + {formatRange(c.first_ts, c.last_ts)} + + )} +
+
+ ))} +
+ )} +
+ ) +} From c481d3fdf2f3c4e9b967e1b5c5b6e789e7777e59 Mon Sep 17 00:00:00 2001 From: Varun Singh Date: Tue, 9 Jun 2026 12:04:05 +0530 Subject: [PATCH 02/12] fix(chat): unify chunk-timestamp shape + harden signature fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses PR #71 review. - repi/api/chat.py: factor `_normalize_ts` and route both chunk-construction sites through it. The RRF path and the find_logs_by_id entity-bias path used different inline forms; both now produce ISO 8601 string or None, full stop. Closes the mixed-type hazard that cluster_view's `<`/`>` (and Stream 3's `sorted(...)`) would have hit if a future change to either source path reintroduced raw datetimes. - repi/retrieval/cluster_view.py: drop the get_signature fallback for chunks without a `Signature:` prefix. Re-running the masking regex over the whole templated body would also mask numerics inside `Examples: ...`, producing a signature that doesn't match what the ingestor would have stored for the same raw line — silent mis-clustering. Log a warning so we notice dual-source state and return empty so cluster_chunks skips the chunk. - web/components/chat/EventClusters.tsx: break-all → break-words on the signature code element. break-all chops mid-word and reads ugly on code-shaped strings. - tests/api/test_chat_timestamp_normalisation.py pins the _normalize_ts contract (None passthrough, naive/aware datetime → ISO, string idempotent). - tests/retrieval/test_cluster_view.py updated for the new empty-on-untemplated contract, asserts the warning lands. --- repi/api/chat.py | 23 +++++++++-- repi/retrieval/cluster_view.py | 20 +++++++--- .../api/test_chat_timestamp_normalisation.py | 39 +++++++++++++++++++ tests/retrieval/test_cluster_view.py | 16 +++++--- web/components/chat/EventClusters.tsx | 2 +- 5 files changed, 85 insertions(+), 15 deletions(-) create mode 100644 tests/api/test_chat_timestamp_normalisation.py diff --git a/repi/api/chat.py b/repi/api/chat.py index 8e8d39b..22c3194 100644 --- a/repi/api/chat.py +++ b/repi/api/chat.py @@ -78,6 +78,23 @@ def _sse(event_type: str, data: dict) -> str: return f"data: {json.dumps({'type': event_type, 'data': data})}\n\n" +def _normalize_ts(value): + """Canonicalise a `timestamp_start` field for the chat path's chunks list. + + The chunks the LLM, the cluster_view, and the timeline_view all read share + one rule: `timestamp` is ISO 8601 string or None. Downstream comparisons + (`<`, `>`, `sorted(...)`) rely on this — mixing `datetime` and `str` in + one list would TypeError. Two source paths (RRF + entity-bias merge) feed + this list; both run their `timestamp_start` through here so a future + change to either source can't reintroduce mixed types. + """ + if value is None: + return None + if hasattr(value, "isoformat"): + return _dh.to_iso(value) + return value # already string-ish + + # ── Confidence heuristic (chat path) ────────────────────────────────────────── # Compiler floors don't apply — /chat has no compile step. Deterministic rules: # - 0 chunks gathered → low @@ -242,9 +259,7 @@ async def event_generator(): "chunk_id": cid, "service": data.get("source_service"), "level": data.get("log_level"), - "timestamp": _dh.to_iso(data.get("timestamp_start")) - if hasattr(data.get("timestamp_start"), "isoformat") - else data.get("timestamp_start"), + "timestamp": _normalize_ts(data.get("timestamp_start")), "text": data.get("text") or "", "score": float(score), }) @@ -262,7 +277,7 @@ async def event_generator(): "chunk_id": c["chunk_id"], "service": c.get("service"), "level": c.get("level"), - "timestamp": c.get("timestamp_start"), + "timestamp": _normalize_ts(c.get("timestamp_start")), "text": c.get("text") or "", "score": 0.0, # ILIKE has no score; use sentinel }) diff --git a/repi/retrieval/cluster_view.py b/repi/retrieval/cluster_view.py index b28f9a2..5dbbce4 100644 --- a/repi/retrieval/cluster_view.py +++ b/repi/retrieval/cluster_view.py @@ -19,19 +19,25 @@ """ from __future__ import annotations +import logging from dataclasses import dataclass from typing import List, Optional -from repi.ingestion.log_chunker import get_signature +logger = logging.getLogger(__name__) def _extract_signature(chunk_text: str) -> str: """Pull the signature back out of the templated chunk body. The ingestor writes `"Signature: \\nExamples: ..."`. We - take the slice between `"Signature: "` and the first newline. For chunks - not produced by our ingestor (defensive — external imports, older data), - fall back to re-running get_signature() on the raw text. + take the slice between `"Signature: "` and the first newline. + + A chunk without that prefix is dual-source state — external imports or + pre-ingestor data. Re-running get_signature() over the whole body would + mask numerics inside the "Examples: ..." portion too, producing a + signature that doesn't match what the ingestor would have stored for + the same raw line. That silently mis-clusters. Log instead so we can + spot the drift, and return empty so the caller skips the chunk. """ if not chunk_text: return "" @@ -40,7 +46,11 @@ def _extract_signature(chunk_text: str) -> str: rest = chunk_text[len(prefix):] nl = rest.find("\n") return (rest[:nl] if nl != -1 else rest).strip() - return get_signature(chunk_text).strip() + logger.warning( + "cluster_view: chunk without 'Signature:' prefix — skipping. " + "Indicates dual-source state (external import or pre-ingestor data).", + ) + return "" @dataclass(frozen=True) diff --git a/tests/api/test_chat_timestamp_normalisation.py b/tests/api/test_chat_timestamp_normalisation.py new file mode 100644 index 0000000..82c8d57 --- /dev/null +++ b/tests/api/test_chat_timestamp_normalisation.py @@ -0,0 +1,39 @@ +"""Pin the chat path's timestamp invariant. + +Downstream cluster_view and timeline_view do string comparisons (`<`, `>`, +`sorted(...)`) on `chunks[i]["timestamp"]`. If one chunk in that list +carries a `datetime` and another a `str`, the comparison TypeErrors at +runtime — which is exactly the failure mode the PR #71 review flagged. + +`repi/api/chat.py::_normalize_ts` is the single point both feed paths +(RRF retrieval, entity-bias merge from find_logs_by_id) run through. +This test pins its contract so a future change can't reintroduce mixed +shapes. +""" +from __future__ import annotations + +from datetime import datetime, timezone + +from repi.api.chat import _normalize_ts + + +def test_normalize_ts_none_passthrough(): + assert _normalize_ts(None) is None + + +def test_normalize_ts_aware_datetime_becomes_iso_string(): + out = _normalize_ts(datetime(2026, 6, 9, 14, 2, 0, tzinfo=timezone.utc)) + assert isinstance(out, str) + assert out.startswith("2026-06-09T14:02:00") + + +def test_normalize_ts_naive_datetime_becomes_iso_string(): + out = _normalize_ts(datetime(2026, 6, 9, 14, 2, 0)) + assert isinstance(out, str) + assert out.startswith("2026-06-09T14:02:00") + + +def test_normalize_ts_existing_string_passthrough(): + """find_logs_by_id already ISO-stringifies upstream; passing its output + through the helper is idempotent.""" + assert _normalize_ts("2026-06-09T14:02:00") == "2026-06-09T14:02:00" diff --git a/tests/retrieval/test_cluster_view.py b/tests/retrieval/test_cluster_view.py index 746c63c..df9eb7a 100644 --- a/tests/retrieval/test_cluster_view.py +++ b/tests/retrieval/test_cluster_view.py @@ -26,11 +26,17 @@ def test_extract_signature_from_templated_text(): assert _extract_signature(body) == "JWT verification failed for token " -def test_extract_signature_falls_back_on_external_text(): - """Defensive: a chunk not written by our ingestor (no `Signature: ` prefix) - still produces a signature via the masking regex from log_chunker.""" - sig = _extract_signature("INFO: user 1234 logged in from 10.0.0.1") - assert "" in sig # numeric masking applied +def test_extract_signature_returns_empty_for_un_templated_text(caplog): + """A chunk without the 'Signature: ' prefix is dual-source state + (external import or pre-ingestor data). Re-running the masking regex + over the whole body would silently mis-cluster — wrong signatures + derived from 'Examples: ...' tokens. Contract: return empty + warn so + cluster_chunks skips the chunk and we can spot the drift in logs.""" + import logging + with caplog.at_level(logging.WARNING, logger="repi.retrieval.cluster_view"): + sig = _extract_signature("INFO: user 1234 logged in from 10.0.0.1") + assert sig == "" + assert any("without 'Signature:' prefix" in r.message for r in caplog.records) def test_returns_empty_for_no_chunks(): diff --git a/web/components/chat/EventClusters.tsx b/web/components/chat/EventClusters.tsx index 1b0124c..5aa2bae 100644 --- a/web/components/chat/EventClusters.tsx +++ b/web/components/chat/EventClusters.tsx @@ -56,7 +56,7 @@ export function EventClusters({ clusters }: { clusters: Cluster[] }) { {c.count}× {c.signature} From 9aee1c7edc432bd78c078291d2f4d3843e290f3c Mon Sep 17 00:00:00 2001 From: Varun Singh Date: Tue, 9 Jun 2026 10:49:38 +0530 Subject: [PATCH 03/12] feat(chat): user-facing incident timeline on /chat done payload + UI panel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stream 3 of the drift-alignment plan. Promotes the timeline view from an internal ReAct tool (investigation.tools.get_timeline) to a first-class chat artifact. Per the feedback: timelines are "insanely useful" and people love them — a chronological narrative beats a chunk dump for any RCA story. - repi/retrieval/timeline_view.py builds the timeline from the chunks the chat path has already hydrated — no second DB roundtrip. Sorts chronologically (ISO strings sort lexically; chat path normalises to UTC upstream via _dh.to_iso), then collapses consecutive runs with identical (service, level, signature) into one entry carrying first_ts / last_ts / repeat_count. The user sees "auth-service ERROR x12 14:02–14:04" instead of twelve near-identical lines. - Collapse key is (service, level, signature). Two ERRORs and a WARNING with the same masked template stay separate — INFO setup ≠ ERROR fallout, and cross-service hits with the same signature are coincidence, not a run. - Chunks without a timestamp are dropped. Placing them in chronological order would require fabricating a position, and "where exactly" is precisely what a timeline answers. - /chat emits a `timeline: [...]` key on the SSE done event alongside the existing clusters payload from Stream 2. - web/components/chat/Timeline.tsx renders a vertical timeline under the assistant message: HH:MM:SS on the left (full ISO on hover), service + level badges (level color-coded — ERROR red, WARNING amber, INFO blue), repeat count when >1, signature in mono. Default-open when ≤15 entries, collapsible above that. --- repi/api/chat.py | 10 +++ repi/retrieval/timeline_view.py | 90 +++++++++++++++++++ tests/api/test_chat_timeline.py | 46 ++++++++++ tests/retrieval/test_timeline_view.py | 125 ++++++++++++++++++++++++++ web/app/page.tsx | 1 + web/components/chat/ChatMessage.tsx | 6 ++ web/components/chat/Timeline.tsx | 110 +++++++++++++++++++++++ 7 files changed, 388 insertions(+) create mode 100644 repi/retrieval/timeline_view.py create mode 100644 tests/api/test_chat_timeline.py create mode 100644 tests/retrieval/test_timeline_view.py create mode 100644 web/components/chat/Timeline.tsx diff --git a/repi/api/chat.py b/repi/api/chat.py index 22c3194..d8fe4aa 100644 --- a/repi/api/chat.py +++ b/repi/api/chat.py @@ -43,6 +43,7 @@ from repi.models.filters import RetrievalFilters from repi.models.schema import ChatMessage, Conversation from repi.retrieval.cluster_view import cluster_chunks +from repi.retrieval.timeline_view import build_timeline logger = logging.getLogger("repi.api.chat") @@ -351,12 +352,21 @@ async def event_generator(): for v in cluster_chunks(chunks) ] + # Incident timeline — chronologically ordered, run-collapsed view + # of the same retrieved chunks. Reuses the in-memory list rather + # than re-fetching via investigation.tools.get_timeline (one less + # DB roundtrip per turn). Singletons stay so the user can see + # the gap between events; runs collapse so 12 identical lines + # become "x12 over 14:02–14:04". + timeline = build_timeline(chunks) + yield _sse("done", { "chunk_ids": cited_ids, "confidence": confidence, "conversation_id": str(conversation_id), "entities": entities, "clusters": clusters, + "timeline": timeline, }) except Exception as e: diff --git a/repi/retrieval/timeline_view.py b/repi/retrieval/timeline_view.py new file mode 100644 index 0000000..132e672 --- /dev/null +++ b/repi/retrieval/timeline_view.py @@ -0,0 +1,90 @@ +"""Build a user-facing timeline from a retrieved chunk set. + +The user-visible product framing: "Repi ingests logs, indexes them with +hybrid retrieval, clusters related events, **builds incident timelines**, +and can launch autonomous root-cause investigation." This module is the +**timelines** word in that sentence. + +Unlike repi.investigation.tools.get_timeline (an internal ReAct tool that +takes chunk_ids and does a SELECT), this runs over the chunks the chat +path already hydrated — no second DB roundtrip. The output is a +chronologically ordered, run-collapsed view: adjacent rows with the same +(service, level, signature) become one entry carrying `repeat_count` and +a `first_ts` / `last_ts` range, so the UI shows "auth-service ERROR x12 +14:02–14:04" instead of twelve near-identical lines. + +Signature extraction reuses cluster_view._extract_signature to pull the +templated `text` body apart — the ingestor stores rows as +`"Signature: \\nExamples: ..."` (see log_ingestor.py). +""" +from __future__ import annotations + +from typing import List, Optional, TypedDict + +from repi.retrieval.cluster_view import _extract_signature + + +class TimelineEntry(TypedDict): + service: Optional[str] + level: Optional[str] + signature: str + first_ts: str + last_ts: str + repeat_count: int + + +def build_timeline(chunks: List[dict]) -> List[TimelineEntry]: + """Project `chunks` to a chronological, run-collapsed timeline. + + Each input dict is expected in the shape the chat path produces: + `{chunk_id, service, level, timestamp, text, ...}`. Chunks without a + timestamp are dropped — placing them in chronological order would + require fabricating a position, and a "where exactly" question is + what a timeline answers. ISO8601 strings sort lexically correctly, + which is what `_dh.to_iso` already produces upstream. + + Collapsing is on identical (service, level, signature). Two ERROR + hits and one INFO hit with the same signature stay separate — they + are different events for the human reader. + """ + timestamped = [c for c in chunks if c.get("timestamp")] + if not timestamped: + return [] + + ordered = sorted(timestamped, key=lambda c: c["timestamp"]) + + entries: list[TimelineEntry] = [] + for c in ordered: + sig = _extract_signature(c.get("text") or "") + if not sig: + # Defensive: no signature means we can't form a run key. Skip + # rather than emit a row with no useful identity. + continue + service = c.get("service") + level = c.get("level") + ts = c["timestamp"] + + if entries: + last = entries[-1] + if ( + last["service"] == service + and last["level"] == level + and last["signature"] == sig + ): + # Same run — extend the range, bump the counter. + last["last_ts"] = ts + last["repeat_count"] += 1 + continue + + entries.append( + TimelineEntry( + service=service, + level=level, + signature=sig, + first_ts=ts, + last_ts=ts, + repeat_count=1, + ) + ) + + return entries diff --git a/tests/api/test_chat_timeline.py b/tests/api/test_chat_timeline.py new file mode 100644 index 0000000..2ef3ad8 --- /dev/null +++ b/tests/api/test_chat_timeline.py @@ -0,0 +1,46 @@ +"""Verify the /chat `done` event carries a `timeline` payload. + +build_timeline itself is exercised in tests/retrieval/test_timeline_view.py. +Here we pin the wire-contract shape the UI's Timeline.tsx binds to. +""" +from __future__ import annotations + +from repi.retrieval.timeline_view import build_timeline + + +def _chunk(sig: str, service: str, level: str, ts: str): + return { + "chunk_id": f"id-{sig}-{ts}", + "service": service, + "level": level, + "timestamp": ts, + "text": f"Signature: {sig}\nExamples: x", + } + + +def test_chat_done_timeline_payload_shape(): + """Exact keys the web/components/chat/Timeline.tsx component reads.""" + chunks = [ + _chunk("jwt failed", "auth-service", "ERROR", "2026-06-09T14:02:00"), + _chunk("jwt failed", "auth-service", "ERROR", "2026-06-09T14:03:00"), + _chunk("db timeout", "payments", "ERROR", "2026-06-09T14:05:00"), + ] + + timeline = build_timeline(chunks) + + assert len(timeline) == 2 + assert set(timeline[0].keys()) == { + "service", "level", "signature", + "first_ts", "last_ts", "repeat_count", + } + assert timeline[0]["service"] == "auth-service" + assert timeline[0]["repeat_count"] == 2 + assert timeline[1]["service"] == "payments" + assert timeline[1]["repeat_count"] == 1 + + +def test_chat_done_timeline_empty_when_no_timestamps(): + chunks = [ + {"text": "Signature: x\nExamples: y", "service": "svc", "level": "ERROR", "timestamp": None} + ] + assert build_timeline(chunks) == [] diff --git a/tests/retrieval/test_timeline_view.py b/tests/retrieval/test_timeline_view.py new file mode 100644 index 0000000..3bf4e29 --- /dev/null +++ b/tests/retrieval/test_timeline_view.py @@ -0,0 +1,125 @@ +"""Pure-function tests for the chat-path timeline view. + +Contract: + - Sort retrieved chunks by timestamp. + - Collapse runs of adjacent (service, level, signature) into one entry + with first_ts/last_ts and repeat_count. + - Drop chunks without timestamps; a timeline can't place them. + - Different log levels for the same signature stay as separate entries. +""" +from __future__ import annotations + +from repi.retrieval.timeline_view import build_timeline + + +def _chunk(text: str, service: str, level: str, timestamp: str | None): + return { + "chunk_id": f"c-{service}-{timestamp}", + "service": service, + "level": level, + "timestamp": timestamp, + "text": text, + } + + +def test_returns_empty_for_no_chunks(): + assert build_timeline([]) == [] + + +def test_returns_empty_when_all_chunks_lack_timestamp(): + chunks = [_chunk("Signature: x\nExamples: y", "svc", "ERROR", None)] + assert build_timeline(chunks) == [] + + +def test_single_chunk_produces_one_entry(): + chunks = [_chunk("Signature: jwt failed\nExamples: a", "auth", "ERROR", "2026-06-09T14:02:00")] + out = build_timeline(chunks) + assert len(out) == 1 + assert out[0]["repeat_count"] == 1 + assert out[0]["first_ts"] == out[0]["last_ts"] == "2026-06-09T14:02:00" + assert out[0]["signature"] == "jwt failed" + + +def test_consecutive_run_collapses_with_range_and_count(): + """The whole point of the run-collapse — 'x12 over 14:02–14:04'.""" + chunks = [ + _chunk("Signature: jwt failed\nExamples: a", "auth", "ERROR", "2026-06-09T14:02:00"), + _chunk("Signature: jwt failed\nExamples: b", "auth", "ERROR", "2026-06-09T14:03:00"), + _chunk("Signature: jwt failed\nExamples: c", "auth", "ERROR", "2026-06-09T14:04:00"), + ] + out = build_timeline(chunks) + assert len(out) == 1 + assert out[0]["repeat_count"] == 3 + assert out[0]["first_ts"] == "2026-06-09T14:02:00" + assert out[0]["last_ts"] == "2026-06-09T14:04:00" + + +def test_different_services_dont_collapse_even_with_same_signature(): + """Cross-service signature match is a coincidence, not a run. + Two services emitting "auth check failed" is two events, not one.""" + chunks = [ + _chunk("Signature: auth check failed\nExamples: x", "auth-service", "ERROR", "2026-06-09T14:02:00"), + _chunk("Signature: auth check failed\nExamples: y", "api-gateway", "ERROR", "2026-06-09T14:03:00"), + ] + out = build_timeline(chunks) + assert [e["service"] for e in out] == ["auth-service", "api-gateway"] + + +def test_different_levels_dont_collapse_even_with_same_signature(): + """An ERROR and a WARNING with the same masked template are two + different observations — INFO setup ≠ ERROR fallout.""" + chunks = [ + _chunk("Signature: token validation\nExamples: a", "auth", "INFO", "2026-06-09T14:02:00"), + _chunk("Signature: token validation\nExamples: b", "auth", "ERROR", "2026-06-09T14:03:00"), + ] + out = build_timeline(chunks) + assert len(out) == 2 + assert out[0]["level"] == "INFO" + assert out[1]["level"] == "ERROR" + + +def test_runs_break_then_resume_produce_separate_entries(): + """A B A → three entries. The two A's are not consecutive.""" + chunks = [ + _chunk("Signature: A\nExamples: 1", "svc-a", "ERROR", "2026-06-09T14:00:00"), + _chunk("Signature: A\nExamples: 2", "svc-a", "ERROR", "2026-06-09T14:01:00"), + _chunk("Signature: B\nExamples: 1", "svc-a", "ERROR", "2026-06-09T14:02:00"), + _chunk("Signature: A\nExamples: 3", "svc-a", "ERROR", "2026-06-09T14:03:00"), + ] + out = build_timeline(chunks) + sigs = [(e["signature"], e["repeat_count"]) for e in out] + assert sigs == [("A", 2), ("B", 1), ("A", 1)] + + +def test_unsorted_input_is_sorted_chronologically(): + """The chat path's chunks are RRF-ranked, not time-sorted. The timeline + must impose chronological order.""" + chunks = [ + _chunk("Signature: B\nExamples: y", "svc", "ERROR", "2026-06-09T14:03:00"), + _chunk("Signature: A\nExamples: x", "svc", "ERROR", "2026-06-09T14:01:00"), + _chunk("Signature: C\nExamples: z", "svc", "ERROR", "2026-06-09T14:05:00"), + ] + out = build_timeline(chunks) + assert [e["signature"] for e in out] == ["A", "B", "C"] + + +def test_chunks_without_timestamps_are_dropped(): + """Mixed input — only the timestamped ones land in the timeline.""" + chunks = [ + _chunk("Signature: orphan\nExamples: x", "svc", "ERROR", None), + _chunk("Signature: anchored\nExamples: y", "svc", "ERROR", "2026-06-09T14:00:00"), + ] + out = build_timeline(chunks) + assert len(out) == 1 + assert out[0]["signature"] == "anchored" + + +def test_chunks_without_signature_are_dropped(): + """Defensive: empty text → no signature → can't form a run key.""" + chunks = [ + {"text": "", "service": "svc", "level": "ERROR", "timestamp": "2026-06-09T14:00:00"}, + _chunk("Signature: real\nExamples: y", "svc", "ERROR", "2026-06-09T14:01:00"), + ] + out = build_timeline(chunks) + assert len(out) == 1 + assert out[0]["signature"] == "real" diff --git a/web/app/page.tsx b/web/app/page.tsx index 7e53aa7..15d0eae 100644 --- a/web/app/page.tsx +++ b/web/app/page.tsx @@ -226,6 +226,7 @@ export default function HomePage() { chunkIds: data.chunk_ids ?? [], confidence: data.confidence ?? null, clusters: data.clusters ?? [], + timeline: data.timeline ?? [], streaming: false, } return next diff --git a/web/components/chat/ChatMessage.tsx b/web/components/chat/ChatMessage.tsx index 4d6b7dc..f4c2f02 100644 --- a/web/components/chat/ChatMessage.tsx +++ b/web/components/chat/ChatMessage.tsx @@ -4,6 +4,7 @@ import { Badge } from "@/components/ui/badge" import { cn } from "@/lib/utils" import { AlertTriangle, Sparkles, User } from "lucide-react" import { EventClusters, type Cluster } from "@/components/chat/EventClusters" +import { Timeline, type TimelineEntry } from "@/components/chat/Timeline" export type ChatMessageProps = { role: "user" | "assistant" @@ -13,6 +14,7 @@ export type ChatMessageProps = { isClarification?: boolean streaming?: boolean clusters?: Cluster[] + timeline?: TimelineEntry[] } // Strip raw chunk citations the LLM may still inline despite the system @@ -34,6 +36,7 @@ export function ChatMessageView({ isClarification, streaming, clusters, + timeline, }: ChatMessageProps) { const isUser = role === "user" const displayed = isUser ? content : cleanContent(content) @@ -68,6 +71,9 @@ export function ChatMessageView({
)} + {!isUser && timeline && timeline.length > 0 && ( + + )} {!isUser && clusters && clusters.length > 0 && ( )} diff --git a/web/components/chat/Timeline.tsx b/web/components/chat/Timeline.tsx new file mode 100644 index 0000000..707960b --- /dev/null +++ b/web/components/chat/Timeline.tsx @@ -0,0 +1,110 @@ +"use client" + +import { useState } from "react" +import { Badge } from "@/components/ui/badge" +import { ChevronDown, ChevronRight, Clock } from "lucide-react" +import { cn } from "@/lib/utils" + +export type TimelineEntry = { + service: string | null + level: string | null + signature: string + first_ts: string + last_ts: string + repeat_count: number +} + +function formatTs(iso: string): string { + if (!iso) return "" + return iso.includes("T") ? iso.split("T")[1].slice(0, 8) : iso +} + +function formatRange(first: string, last: string): string { + if (first === last) return formatTs(first) + return `${formatTs(first)}–${formatTs(last)}` +} + +function levelTone(level: string | null): string { + switch ((level || "").toUpperCase()) { + case "ERROR": + case "CRITICAL": + case "FATAL": + return "text-red-500 border-red-500/30" + case "WARNING": + case "WARN": + return "text-amber-500 border-amber-500/30" + case "INFO": + return "text-blue-500 border-blue-500/30" + default: + return "text-muted-foreground border-border" + } +} + +export function Timeline({ entries }: { entries: TimelineEntry[] }) { + // Default-open when small enough to scan; closed beyond that so the + // chat surface doesn't drown in a 30-row timeline by default. + const [open, setOpen] = useState(entries.length > 0 && entries.length <= 15) + + if (!entries || entries.length === 0) return null + + return ( +
+ + {open && ( +
+ {entries.map((e, i) => ( +
+
+
{formatTs(e.first_ts)}
+ {e.last_ts !== e.first_ts && ( +
+ →{formatTs(e.last_ts)} +
+ )} +
+
+
+ {e.service && ( + + {e.service} + + )} + {e.level && ( + + {e.level} + + )} + {e.repeat_count > 1 && ( + + ×{e.repeat_count} + + )} +
+ + {e.signature} + +
+
+ ))} +
+ )} +
+ ) +} From dadd934d90a7ce56eaf1e66e1975a7c50cd12b9b Mon Sep 17 00:00:00 2001 From: Varun Singh Date: Tue, 9 Jun 2026 12:06:07 +0530 Subject: [PATCH 04/12] refactor(retrieval): drop underscore on cross-module extract_signature MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses PR #72 review. - Rename cluster_view._extract_signature → extract_signature. The moment timeline_view imported it, the leading underscore was no longer telling the truth — it's a shared primitive across two modules. Linters and readers were both being misled by the name. - timeline_view: import the renamed symbol; add a debug-log tally for chunks dropped because they lack a signature. A spike in that count signals dual-source state (external imports, pre-ingestor data) and matches the warning cluster_view already emits at extraction time. --- repi/retrieval/cluster_view.py | 4 ++-- repi/retrieval/timeline_view.py | 20 +++++++++++++++----- tests/retrieval/test_cluster_view.py | 10 +++++----- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/repi/retrieval/cluster_view.py b/repi/retrieval/cluster_view.py index 5dbbce4..65938ee 100644 --- a/repi/retrieval/cluster_view.py +++ b/repi/retrieval/cluster_view.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) -def _extract_signature(chunk_text: str) -> str: +def extract_signature(chunk_text: str) -> str: """Pull the signature back out of the templated chunk body. The ingestor writes `"Signature: \\nExamples: ..."`. We @@ -82,7 +82,7 @@ def cluster_chunks( groups: dict[str, dict] = {} for c in chunks: - sig = _extract_signature(c.get("text") or "") + sig = extract_signature(c.get("text") or "") if not sig: continue svc = c.get("service") diff --git a/repi/retrieval/timeline_view.py b/repi/retrieval/timeline_view.py index 132e672..f7a34af 100644 --- a/repi/retrieval/timeline_view.py +++ b/repi/retrieval/timeline_view.py @@ -13,15 +13,18 @@ a `first_ts` / `last_ts` range, so the UI shows "auth-service ERROR x12 14:02–14:04" instead of twelve near-identical lines. -Signature extraction reuses cluster_view._extract_signature to pull the +Signature extraction reuses cluster_view.extract_signature to pull the templated `text` body apart — the ingestor stores rows as `"Signature: \\nExamples: ..."` (see log_ingestor.py). """ from __future__ import annotations +import logging from typing import List, Optional, TypedDict -from repi.retrieval.cluster_view import _extract_signature +from repi.retrieval.cluster_view import extract_signature + +logger = logging.getLogger(__name__) class TimelineEntry(TypedDict): @@ -54,11 +57,14 @@ def build_timeline(chunks: List[dict]) -> List[TimelineEntry]: ordered = sorted(timestamped, key=lambda c: c["timestamp"]) entries: list[TimelineEntry] = [] + skipped = 0 for c in ordered: - sig = _extract_signature(c.get("text") or "") + sig = extract_signature(c.get("text") or "") if not sig: - # Defensive: no signature means we can't form a run key. Skip - # rather than emit a row with no useful identity. + # No signature → can't form a run key, can't render meaningfully. + # Tally so a spike in untemplated chunks shows up in logs (signals + # ingest drift or external import contamination). + skipped += 1 continue service = c.get("service") level = c.get("level") @@ -87,4 +93,8 @@ def build_timeline(chunks: List[dict]) -> List[TimelineEntry]: ) ) + if skipped: + logger.debug( + "build_timeline: skipped %d chunk(s) without a signature", skipped + ) return entries diff --git a/tests/retrieval/test_cluster_view.py b/tests/retrieval/test_cluster_view.py index df9eb7a..ad04256 100644 --- a/tests/retrieval/test_cluster_view.py +++ b/tests/retrieval/test_cluster_view.py @@ -11,7 +11,7 @@ from repi.retrieval.cluster_view import ( ClusterView, cluster_chunks, - _extract_signature, + extract_signature, ) @@ -19,14 +19,14 @@ def _chunk(text: str, service: str, timestamp: str): return {"text": text, "service": service, "timestamp": timestamp} -def test_extract_signature_from_templated_text(): +def testextract_signature_from_templated_text(): """Ingest path stores `"Signature: ...\\nExamples: ..."` — pull the signature back out for clustering.""" body = "Signature: JWT verification failed for token \nExamples: token=1 token=2" - assert _extract_signature(body) == "JWT verification failed for token " + assert extract_signature(body) == "JWT verification failed for token " -def test_extract_signature_returns_empty_for_un_templated_text(caplog): +def testextract_signature_returns_empty_for_un_templated_text(caplog): """A chunk without the 'Signature: ' prefix is dual-source state (external import or pre-ingestor data). Re-running the masking regex over the whole body would silently mis-cluster — wrong signatures @@ -34,7 +34,7 @@ def test_extract_signature_returns_empty_for_un_templated_text(caplog): cluster_chunks skips the chunk and we can spot the drift in logs.""" import logging with caplog.at_level(logging.WARNING, logger="repi.retrieval.cluster_view"): - sig = _extract_signature("INFO: user 1234 logged in from 10.0.0.1") + sig = extract_signature("INFO: user 1234 logged in from 10.0.0.1") assert sig == "" assert any("without 'Signature:' prefix" in r.message for r in caplog.records) From 1e85218b6134bb8e9cceac96fe57d2ad015848b1 Mon Sep 17 00:00:00 2001 From: Varun Singh Date: Tue, 9 Jun 2026 11:17:17 +0530 Subject: [PATCH 05/12] feat(chat): followup bias + cited-chunks panel + quick actions + reframe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stream 4 of the drift-alignment plan. Closes the polish gap so /chat feels like a product, not a debug surface. Token streaming is the only plan item explicitly deferred — making it production-quality across five provider adapters with partial-stream error handling deserves its own PR rather than ride along here. - /chat ChatRequest gains optional `previous_chunk_ids: list[str]`. The frontend passes the last assistant turn's cited IDs; the backend reads them via the existing vector_store.get_chunks_by_ids (indexed PK lookup) and uses them to default-fill service + ±5min time envelope when the current intent has no explicit filter. Soft hint — caller filters and resolver output always win. - /chat done payload gains `cited_chunks: [...]` — minimal projection (chunk_id, service, level, timestamp, 600-char text window matching the LLM prompt) so the new UI evidence panel renders without a follow-up roundtrip. - web/components/chat/CitedChunks.tsx — third inline collapsible under the assistant turn, default-closed (it's the debug-grade view; the story is in Timeline and Clusters above it). Stacked-collapsibles approach chosen over a tabbed roll-up after the simpler-alternative surface-up. - Timeline and EventClusters gain optional controlled-mode `open` + `onOpenChange` props. Falls through to uncontrolled internal state when callers don't pass them — no behavior change for existing call sites. - ChatMessageView gains "Show timeline", "Show clusters", and "Investigate deeper" quick-action buttons under the assistant turn. The first two open the corresponding panel and scrollIntoView it; the third invokes a parent-supplied onInvestigateDeeper(query) callback which page.tsx wires to flip the Deep Research toggle and re-run the same query through /investigate. - README and the chat empty state reframe — repi now leads with the observability framing (continuous ingestion, hybrid retrieval, event clusters, incident timelines, optional autonomous root-cause investigation) instead of "log investigation engine." --- README.md | 2 +- repi/api/chat.py | 48 +++++++++++++ web/app/page.tsx | 37 ++++++++-- web/components/chat/ChatMessage.tsx | 97 +++++++++++++++++++++++++-- web/components/chat/CitedChunks.tsx | 84 +++++++++++++++++++++++ web/components/chat/EventClusters.tsx | 20 ++++-- web/components/chat/Timeline.tsx | 24 +++++-- 7 files changed, 292 insertions(+), 20 deletions(-) create mode 100644 web/components/chat/CitedChunks.tsx diff --git a/README.md b/README.md index 4dcdbb0..8545532 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # repi -Log ingestion and LLM-based investigation engine. Ingests log files into PostgreSQL (pgvector), retrieves relevant log clusters via hybrid search (BM25 + dense vectors with RRF), and runs a ReAct loop where an LLM autonomously investigates root causes. +Local-first log observability. repi continuously ingests logs, indexes them into a hybrid retrieval system (pgvector HNSW + Postgres FTS with weighted tsvector + pg_trgm fuzzy match), clusters related events, builds incident timelines, and can optionally launch an autonomous root-cause investigation through a ReAct loop. Designed to run on a single machine against a local Postgres — no SaaS, no shared state. ## Architecture diff --git a/repi/api/chat.py b/repi/api/chat.py index d8fe4aa..7738232 100644 --- a/repi/api/chat.py +++ b/repi/api/chat.py @@ -70,6 +70,13 @@ class ChatRequest(BaseModel): history: List[ChatTurn] = [] filters: Optional[ChatFilters] = None conversation_id: Optional[UUID] = None + # Followup-bias hint: chunk_ids the previous assistant turn cited. When + # the current query has no explicit service / time filter (neither in + # `filters` nor from the resolver), the chat path uses these chunks to + # default the retrieval window to the previous turn's service distribution + # and a ±5min envelope around their timestamps. Soft — never overrides + # an explicit filter, and silently ignored if the IDs no longer resolve. + previous_chunk_ids: List[str] = [] # ── SSE envelope helpers ────────────────────────────────────────────────────── @@ -234,6 +241,31 @@ async def event_generator(): if caller_entity and caller_entity not in entities: entities.append(caller_entity) + # Followup bias: if neither the caller nor the resolver pinned a + # service or time window, derive defaults from the previous turn's + # cited chunks. Indexed PK lookup, so cost is negligible vs the + # LLM call. Soft: explicit filters always win. + if req.previous_chunk_ids and (service is None or (time_from is None and time_to is None)): + async with container.async_session_maker() as session: + prev_meta = await container.get_retrieval_service(session).vector_store.get_chunks_by_ids( + list(req.previous_chunk_ids) + ) + if prev_meta: + prev_services = [m.get("source_service") for m in prev_meta.values() if m.get("source_service")] + if service is None and prev_services: + # Pick the dominant service from the previous turn so a + # followup like "what changed there?" keeps the focus. + from collections import Counter + service = Counter(prev_services).most_common(1)[0][0] + if time_from is None and time_to is None: + prev_ts = [m.get("timestamp_start") for m in prev_meta.values() if m.get("timestamp_start")] + if prev_ts: + from datetime import timedelta + anchor_min = min(prev_ts) + anchor_max = max(prev_ts) + time_from = anchor_min - timedelta(minutes=5) + time_to = anchor_max + timedelta(minutes=5) + async with container.async_session_maker() as session: retrieval = container.get_retrieval_service(session) rrf_filters = RetrievalFilters( @@ -360,6 +392,21 @@ async def event_generator(): # become "x12 over 14:02–14:04". timeline = build_timeline(chunks) + # Minimal projection of cited chunks for the UI's raw-evidence tab. + # Saves a follow-up GET — the chat path already has the hydrated + # list. Keep the text at the same 600-char window used in the LLM + # prompt so the UI doesn't surface content the model didn't see. + cited_chunks = [ + { + "chunk_id": c["chunk_id"], + "service": c.get("service"), + "level": c.get("level"), + "timestamp": str(c.get("timestamp") or "") or None, + "text": (c.get("text") or "")[:600], + } + for c in chunks + ] + yield _sse("done", { "chunk_ids": cited_ids, "confidence": confidence, @@ -367,6 +414,7 @@ async def event_generator(): "entities": entities, "clusters": clusters, "timeline": timeline, + "cited_chunks": cited_chunks, }) except Exception as e: diff --git a/web/app/page.tsx b/web/app/page.tsx index 15d0eae..275e9ea 100644 --- a/web/app/page.tsx +++ b/web/app/page.tsx @@ -35,6 +35,20 @@ type Turn = // `/investigate` path is intentionally stateless and ignores this. const CHAT_HISTORY_TURNS = 6 +// Pull the most recent assistant chat turn's cited chunks so the next /chat +// turn can bias retrieval toward the same service + time envelope. Stream 4 +// followup awareness — the backend treats this as a soft hint and never +// overrides explicit intent. +function lastChatChunkIds(turns: Turn[]): string[] { + for (let i = turns.length - 1; i >= 0; i--) { + const t = turns[i] + if (t.mode === "chat" && t.role === "assistant" && t.chunkIds && t.chunkIds.length > 0) { + return t.chunkIds + } + } + return [] +} + function buildChatHistory(turns: Turn[]): { role: "user" | "assistant"; content: string }[] { // Treat both chat turns and completed investigation answers as conversation // history. An investigation that hasn't produced a final answer yet is @@ -148,6 +162,7 @@ export default function HomePage() { // Send last N turns as history so the server can keep the assistant // contextual across followups (lite — no DB lookup, no compaction). const history = buildChatHistory(turns) + const previousChunkIds = lastChatChunkIds(turns) try { const resp = await fetch(`${API_BASE}/chat`, { method: "POST", @@ -156,6 +171,7 @@ export default function HomePage() { query, conversation_id: conversationId ?? undefined, history, + previous_chunk_ids: previousChunkIds, }), }) if (!resp.ok || !resp.body) { @@ -170,7 +186,7 @@ export default function HomePage() { arr.findIndex((t) => t.mode === "chat" && (t as any).pendingId === pendingId) setTurns((prev) => [ ...prev, - { mode: "chat", role: "assistant", content: "", chunkIds: [], confidence: null, streaming: true, pendingId } as any, + { mode: "chat", role: "assistant", content: "", chunkIds: [], confidence: null, streaming: true, pendingId, query } as any, ]) const reader = resp.body.getReader() @@ -227,6 +243,7 @@ export default function HomePage() { confidence: data.confidence ?? null, clusters: data.clusters ?? [], timeline: data.timeline ?? [], + citedChunks: data.cited_chunks ?? [], streaming: false, } return next @@ -268,15 +285,25 @@ export default function HomePage() {

Chat with your logs

- Ask a quick question for a single-shot RAG answer. Toggle{" "} - Deep Research on to run a full - multi-step investigation. + Hybrid retrieval over your ingested logs surfaces a chronological{" "} + timeline and the{" "} + event clusters behind your + question. Toggle{" "} + Deep Research for a + full autonomous root-cause investigation.

) : ( turns.map((t, i) => t.mode === "chat" ? ( - + { + setDeepResearch(true) + handleSend(q, true) + }} + /> ) : ( void } // Strip raw chunk citations the LLM may still inline despite the system @@ -37,9 +43,41 @@ export function ChatMessageView({ streaming, clusters, timeline, + citedChunks, + query, + onInvestigateDeeper, }: ChatMessageProps) { const isUser = role === "user" const displayed = isUser ? content : cleanContent(content) + + // Lift open state out of the three panels so the quick-action buttons can + // open them on demand. Uncontrolled fallback inside each panel handles + // the no-button path. + const [timelineOpen, setTimelineOpen] = useState(undefined) + const [clustersOpen, setClustersOpen] = useState(undefined) + const [chunksOpen, setChunksOpen] = useState(undefined) + + const timelineRef = useRef(null) + const clustersRef = useRef(null) + const chunksRef = useRef(null) + + const showAndScroll = ( + setter: (v: boolean) => void, + ref: React.RefObject, + ) => { + setter(true) + // Defer the scroll so the panel has rendered open before measuring. + requestAnimationFrame(() => { + ref.current?.scrollIntoView({ behavior: "smooth", block: "nearest" }) + }) + } + + const hasTimeline = !!(timeline && timeline.length > 0) + const hasClusters = !!(clusters && clusters.length > 0) + const hasChunks = !!(citedChunks && citedChunks.length > 0) + const showQuickActions = + !isUser && !streaming && (hasTimeline || hasClusters || (onInvestigateDeeper && query)) + return (
{!isUser && ( @@ -64,6 +102,7 @@ export function ChatMessageView({ )} {displayed || (streaming ? : "")}
+ {!isUser && confidence && (
@@ -71,11 +110,59 @@ export function ChatMessageView({
)} - {!isUser && timeline && timeline.length > 0 && ( - + + {showQuickActions && ( +
+ {hasTimeline && ( + + )} + {hasClusters && ( + + )} + {onInvestigateDeeper && query && ( + + )} +
+ )} + + {hasTimeline && ( +
+ +
)} - {!isUser && clusters && clusters.length > 0 && ( - + {hasClusters && ( +
+ +
+ )} + {hasChunks && ( +
+ +
)} {isUser && ( diff --git a/web/components/chat/CitedChunks.tsx b/web/components/chat/CitedChunks.tsx new file mode 100644 index 0000000..df57ac7 --- /dev/null +++ b/web/components/chat/CitedChunks.tsx @@ -0,0 +1,84 @@ +"use client" + +import { useState } from "react" +import { Badge } from "@/components/ui/badge" +import { ChevronDown, ChevronRight, FileText } from "lucide-react" + +export type CitedChunk = { + chunk_id: string + service: string | null + level: string | null + timestamp: string | null + text: string +} + +function shortTs(iso: string | null): string { + if (!iso) return "" + return iso.includes("T") ? iso.split("T")[1].slice(0, 8) : iso +} + +export function CitedChunks({ + chunks, + open: openProp, + onOpenChange, +}: { + chunks: CitedChunk[] + open?: boolean + onOpenChange?: (open: boolean) => void +}) { + // Default closed — raw chunks are the debug view; users seeking them + // know to click. Timeline and clusters carry the story. + const [openUncontrolled, setOpenUncontrolled] = useState(false) + const open = openProp ?? openUncontrolled + const setOpen = (v: boolean) => { + if (onOpenChange) onOpenChange(v) + else setOpenUncontrolled(v) + } + + if (!chunks || chunks.length === 0) return null + + return ( +
+ + {open && ( +
+ {chunks.map((c) => ( +
+
+ {c.service && ( + + {c.service} + + )} + {c.level && ( + + {c.level} + + )} + {c.timestamp && ( + + {shortTs(c.timestamp)} + + )} +
+
+                {c.text}
+              
+
+ ))} +
+ )} +
+ ) +} diff --git a/web/components/chat/EventClusters.tsx b/web/components/chat/EventClusters.tsx index 5aa2bae..d7cc00f 100644 --- a/web/components/chat/EventClusters.tsx +++ b/web/components/chat/EventClusters.tsx @@ -27,9 +27,21 @@ function formatTs(iso: string | null): string { return t } -export function EventClusters({ clusters }: { clusters: Cluster[] }) { - // Default open when small enough to scan at a glance. - const [open, setOpen] = useState(clusters.length > 0 && clusters.length <= 5) +export function EventClusters({ + clusters, + open: openProp, + onOpenChange, +}: { + clusters: Cluster[] + open?: boolean + onOpenChange?: (open: boolean) => void +}) { + const [openUncontrolled, setOpenUncontrolled] = useState(clusters.length > 0 && clusters.length <= 5) + const open = openProp ?? openUncontrolled + const setOpen = (v: boolean) => { + if (onOpenChange) onOpenChange(v) + else setOpenUncontrolled(v) + } if (!clusters || clusters.length === 0) return null @@ -37,7 +49,7 @@ export function EventClusters({ clusters }: { clusters: Cluster[] }) {