Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: tests

on:
push:
branches: [main]
pull_request:

jobs:
pytest:
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v4

- name: Set up uv
uses: astral-sh/setup-uv@v5
with:
enable-cache: true

- name: Install dependencies
run: uv sync --frozen

- name: Run tests
run: uv run pytest tests/ -q
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,5 @@ eval/results/
# Closure / launch tracker (private; not for public repo)
todo/

tmp-ui-tests/
assets/
4 changes: 3 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ make reset-investigations # TRUNCATE investigations CASCADE
# Ingest a log file (via HTTP API)
curl -X POST -F "service=my-svc" -F "file=@/path/to/app.log" http://localhost:8000/ingest

# Run an investigation (via HTTP API)
# Run an investigation (via HTTP API). POST only registers it and returns an
# id — execution happens while a client is attached to the SSE stream.
curl -X POST http://localhost:8000/investigate \
-H "Content-Type: application/json" \
-d '{"query": "why did checkout fail last friday night"}'
curl -N http://localhost:8000/investigations/{id}/stream

# Background worker for continuous ingestion
uv run python -m repi.worker
Expand Down
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# repi

Log ingestion and LLM-based investigation engine. Ingests log files into PostgreSQL (pgvector), retrieves relevant log clusters via hybrid search (BM25 + dense vectors with RRF), and runs a ReAct loop where an LLM autonomously investigates root causes.
Local-first log observability. repi continuously ingests logs, indexes them into a hybrid retrieval system (pgvector HNSW + Postgres FTS with weighted tsvector + pg_trgm fuzzy match), clusters related events, builds incident timelines, and can optionally launch an autonomous root-cause investigation through a ReAct loop. Designed to run on a single machine against a local Postgres — no SaaS, no shared state.

## Architecture

Expand Down Expand Up @@ -45,7 +45,7 @@ On first start, the entrypoint seeds `/app/.repi/config.json` from a baked-in de
- Visit the **Config** page in the UI, pick a provider, paste your API key, save. The API hot-reloads.
- Your config persists across `docker compose down` (lost only on `down -v`).

Pin a release via `REPI_IMAGE=ghcr.io/varungitgood/repi:v0.1.0 docker compose up -d`.
Pin a release via `REPI_IMAGE=ghcr.io/varungitgood/repi:v0.2.0 docker compose up -d`.

### Option 1b — Hack on it (contributor / dev path)

Expand Down Expand Up @@ -88,16 +88,17 @@ curl -X POST \

### Investigate

Starting an investigation is a two-step flow: `POST /investigate` registers it and returns an `id`; attaching to the SSE stream is what actually executes the ReAct loop (the web UI does this for you). A `POST` with no stream consumer stays in `started` and never runs.

```bash
# 1. Register the investigation — returns {"id": "...", ...}
curl -X POST http://localhost:8000/investigate \
-H "Content-Type: application/json" \
-d '{"query": "why did checkout fail last friday night"}'
```

Stream the ReAct steps live:

```bash
curl -N http://localhost:8000/investigate/{id}/stream
# 2. Attach to the stream to execute it and watch the ReAct steps live.
# Reconnecting replays persisted steps, then continues.
curl -N http://localhost:8000/investigations/{id}/stream
```

### Continuous ingestion with the Worker
Expand Down
2 changes: 1 addition & 1 deletion eval/dataset_1_cascading_inventory_migration/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async def main() -> None:
print(f" SKIP {service}: {path} not found")
continue
content = path.read_text().replace(STORY_DATE, anchor_str)
count = await ingestor.ingest(content, source_service=service, source_env="eval")
count = (await ingestor.ingest(content, source_service=service, source_env="eval")).chunk_count
print(f" {service:20s} {count:4d} chunks ({filename})")
total += count

Expand Down
2 changes: 1 addition & 1 deletion eval/dataset_2_insufficient_logging/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async def main() -> None:
print(f" SKIP {service}: {path} not found")
continue
content = path.read_text().replace(STORY_DATE, anchor_str)
count = await ingestor.ingest(content, source_service=service, source_env="eval")
count = (await ingestor.ingest(content, source_service=service, source_env="eval")).chunk_count
print(f" {service:20s} {count:4d} chunks ({filename})")
total += count

Expand Down
2 changes: 1 addition & 1 deletion eval/dataset_3_jwt_key_rotation_noise/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def main() -> None:
print(f" SKIP {service}: {path} not found")
continue
content = path.read_text().replace(STORY_DATE, anchor_str)
count = await ingestor.ingest(content, source_service=service, source_env="eval")
count = (await ingestor.ingest(content, source_service=service, source_env="eval")).chunk_count
print(f" {service:22s} {count:4d} chunks ({filename})")
total += count

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "repi"
version = "0.1.0"
version = "0.2.0"
description = "repi: Log Investigation Engine"
readme = "README.md"
requires-python = ">=3.11,<4"
Expand Down
148 changes: 141 additions & 7 deletions repi/api/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@
import asyncio
import json
import logging
from datetime import datetime, timezone
from collections import Counter
from datetime import datetime, timedelta, timezone
from typing import List, Literal, Optional
from uuid import UUID, uuid4

from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from pydantic import BaseModel, Field
from sqlalchemy import select, text as sa_text

from repi.core.container import get_container
from repi.core.config import get_settings
from repi.core.dates import default_date_handler as _dh
from repi.intent.resolver import (
ClarificationNeeded,
Expand All @@ -42,6 +44,8 @@
from repi.llm.provider import Message
from repi.models.filters import RetrievalFilters
from repi.models.schema import ChatMessage, Conversation
from repi.retrieval.cluster_view import cluster_chunks
from repi.retrieval.timeline_view import build_timeline

logger = logging.getLogger("repi.api.chat")

Expand All @@ -68,6 +72,32 @@ class ChatRequest(BaseModel):
history: List[ChatTurn] = []
filters: Optional[ChatFilters] = None
conversation_id: Optional[UUID] = None
# Followup-bias hint: chunk_ids the previous assistant turn cited. When
# the current query is missing EITHER an explicit service or an explicit
# time window, the chat path fills in just the missing dimension from
# the previous turn's chunks — service via dominant-source check, time
# via a `Settings.FOLLOWUP_BIAS_WINDOW_MINUTES` envelope around their
# timestamps. Soft — never overrides an explicit filter, silently
# ignored if the IDs no longer resolve.
#
# Capped at 50 to bound the indexed-PK fetch and reject malformed
# payloads early. The legitimate caller only ever sends the last
# assistant turn's citations (≤10 in practice).
previous_chunk_ids: List[str] = Field(default_factory=list, max_length=50)


# ── Module-level constants ────────────────────────────────────────────────────

# Caller-visible window on cited-chunk `text` in the SSE done payload. Locked
# to the same length the LLM prompt's evidence block uses so the UI never
# surfaces content the model didn't see.
CHUNK_TEXT_WINDOW = 600

# When the dominant service's count is at least this multiple of the
# runner-up's, treat it as "this is the conversation's service" and bias
# retrieval toward it. Below this ratio the previous turn straddled
# services (cross-service incident) — we let the resolver fan out instead.
SERVICE_DOMINANCE_RATIO = 2.0


# ── SSE envelope helpers ──────────────────────────────────────────────────────
Expand All @@ -77,6 +107,23 @@ def _sse(event_type: str, data: dict) -> str:
return f"data: {json.dumps({'type': event_type, 'data': data})}\n\n"


def _normalize_ts(value):
"""Canonicalise a `timestamp_start` field for the chat path's chunks list.

The chunks the LLM, the cluster_view, and the timeline_view all read share
one rule: `timestamp` is ISO 8601 string or None. Downstream comparisons
(`<`, `>`, `sorted(...)`) rely on this — mixing `datetime` and `str` in
one list would TypeError. Two source paths (RRF + entity-bias merge) feed
this list; both run their `timestamp_start` through here so a future
change to either source can't reintroduce mixed types.
"""
if value is None:
return None
if hasattr(value, "isoformat"):
return _dh.to_iso(value)
return value # already string-ish


# ── Confidence heuristic (chat path) ──────────────────────────────────────────
# Compiler floors don't apply — /chat has no compile step. Deterministic rules:
# - 0 chunks gathered → low
Expand Down Expand Up @@ -215,6 +262,52 @@ async def event_generator():
if caller_entity and caller_entity not in entities:
entities.append(caller_entity)

# Followup bias: when the current query is missing EITHER an
# explicit service or an explicit time window, fill in just the
# missing dimension from the previous turn's cited chunks.
# Indexed PK lookup, so cost is negligible vs the LLM call. Soft:
# explicit filters always win.
if req.previous_chunk_ids and (service is None or (time_from is None and time_to is None)):
async with container.async_session_maker() as session:
prev_meta = await container.get_retrieval_service(session).vector_store.get_chunks_by_ids(
list(req.previous_chunk_ids)
)
if prev_meta:
prev_services = [m.get("source_service") for m in prev_meta.values() if m.get("source_service")]
if service is None and prev_services:
# Narrow to the dominant service only if it's clearly
# dominant — top count >= SERVICE_DOMINANCE_RATIO ×
# runner-up. Otherwise the previous turn straddled
# services (a cross-service incident), and pinning
# one would hide the other half on the followup.
counts = Counter(prev_services).most_common()
top_svc, top_n = counts[0]
runner_up = counts[1][1] if len(counts) > 1 else 0
if runner_up == 0 or top_n >= SERVICE_DOMINANCE_RATIO * runner_up:
service = top_svc
logger.debug(
"chat followup-bias: pinned service=%s (top=%d, runner-up=%d)",
top_svc, top_n, runner_up,
)
else:
logger.debug(
"chat followup-bias: skipped service pin — "
"no dominant service (top=%d, runner-up=%d)",
top_n, runner_up,
)
if time_from is None and time_to is None:
prev_ts = [m.get("timestamp_start") for m in prev_meta.values() if m.get("timestamp_start")]
if prev_ts:
envelope = timedelta(minutes=get_settings().FOLLOWUP_BIAS_WINDOW_MINUTES)
anchor_min = min(prev_ts)
anchor_max = max(prev_ts)
time_from = anchor_min - envelope
time_to = anchor_max + envelope
logger.debug(
"chat followup-bias: time window %s → %s",
time_from, time_to,
)

async with container.async_session_maker() as session:
retrieval = container.get_retrieval_service(session)
rrf_filters = RetrievalFilters(
Expand All @@ -241,9 +334,7 @@ async def event_generator():
"chunk_id": cid,
"service": data.get("source_service"),
"level": data.get("log_level"),
"timestamp": _dh.to_iso(data.get("timestamp_start"))
if hasattr(data.get("timestamp_start"), "isoformat")
else data.get("timestamp_start"),
"timestamp": _normalize_ts(data.get("timestamp_start")),
"text": data.get("text") or "",
"score": float(score),
})
Expand All @@ -261,7 +352,7 @@ async def event_generator():
"chunk_id": c["chunk_id"],
"service": c.get("service"),
"level": c.get("level"),
"timestamp": c.get("timestamp_start"),
"timestamp": _normalize_ts(c.get("timestamp_start")),
"text": c.get("text") or "",
"score": 0.0, # ILIKE has no score; use sentinel
})
Expand All @@ -277,7 +368,7 @@ async def event_generator():
"service": c.get("service"),
"level": c.get("level"),
"timestamp": str(c.get("timestamp") or ""),
"text": (c.get("text") or "")[:600],
"text": (c.get("text") or "")[:CHUNK_TEXT_WINDOW],
}
for c in chunks
], indent=2, default=str)
Expand Down Expand Up @@ -318,11 +409,54 @@ async def event_generator():
)
await session.commit()

# Event clusters across the retrieved top-K. Singletons are
# dropped (they're already in the per-turn timeline); the panel
# gives the user the "1842x JWT failures, 347x DB timeouts"
# compression rather than a raw chunk list. Caveat the UI must
# carry: this is *per-turn* over the retrieved chunks, not a
# corpus-wide aggregate.
clusters = [
{
"signature": v.signature,
"count": v.count,
"services": v.services,
"first_ts": v.first_ts,
"last_ts": v.last_ts,
}
for v in cluster_chunks(chunks)
]

# Incident timeline — chronologically ordered, run-collapsed view
# of the same retrieved chunks. Reuses the in-memory list rather
# than re-fetching via investigation.tools.get_timeline (one less
# DB roundtrip per turn). Singletons stay so the user can see
# the gap between events; runs collapse so 12 identical lines
# become "x12 over 14:02–14:04".
timeline = build_timeline(chunks)

# Minimal projection of cited chunks for the UI's raw-evidence tab.
# Saves a follow-up GET — the chat path already has the hydrated
# list. Keep the text at the same 600-char window used in the LLM
# prompt so the UI doesn't surface content the model didn't see.
cited_chunks = [
{
"chunk_id": c["chunk_id"],
"service": c.get("service"),
"level": c.get("level"),
"timestamp": str(c.get("timestamp") or "") or None,
"text": (c.get("text") or "")[:CHUNK_TEXT_WINDOW],
}
for c in chunks
]

yield _sse("done", {
"chunk_ids": cited_ids,
"confidence": confidence,
"conversation_id": str(conversation_id),
"entities": entities,
"clusters": clusters,
"timeline": timeline,
"cited_chunks": cited_chunks,
})

except Exception as e:
Expand Down
27 changes: 22 additions & 5 deletions repi/api/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
class IngestResponse(BaseModel):
service: str
chunk_count: int
lines_total: int
lines_with_timestamp: int
level_counts: dict[str, int]
message: str

@router.post("/ingest", response_model=IngestResponse)
Expand All @@ -22,14 +25,28 @@ async def ingest(
"""
content = await file.read()
content_str = content.decode("utf-8")

container = get_container()
async with container.get_session() as session:
ingestor = container.get_ingestor(session)
count = await ingestor.ingest(content_str, service)

stats = await ingestor.ingest(content_str, service)

# Refresh the in-memory service list so a brand-new service is visible to
# the intent resolver immediately, not only after a restart or GET /services.
await container.init_known_services()

message = f"Successfully ingested {stats.chunk_count} chunks for {service}"
if stats.lines_total and stats.lines_with_timestamp == 0:
message += (
" (warning: no timestamps could be parsed from these logs — "
"time-based filters will not match them)"
)

return IngestResponse(
service=service,
chunk_count=count,
message=f"Successfully ingested {count} chunks for {service}"
chunk_count=stats.chunk_count,
lines_total=stats.lines_total,
lines_with_timestamp=stats.lines_with_timestamp,
level_counts=stats.level_counts,
message=message,
)
Loading
Loading