Skip to content
50 changes: 50 additions & 0 deletions db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,53 @@ CREATE INDEX IF NOT EXISTS leaderboard_dataset_idx ON leaderboard (dataset);
CREATE INDEX IF NOT EXISTS leaderboard_score_idx ON leaderboard (dataset, aggregate_score DESC);
CREATE INDEX IF NOT EXISTS leaderboard_created_idx ON leaderboard (created_at DESC);
CREATE INDEX IF NOT EXISTS leaderboard_backend_idx ON leaderboard (embedding_backend);

-- ── Projects (UX redesign P1) ────────────────────────────────────────────────
-- A project is a logical system/application (e.g. "Ecommerce Platform").
-- Workers, services (via log_chunks), conversations and investigations are
-- scoped to a project. `settings` keys: default_timeline_window ("5h"),
-- auto_load_timeline (true), max_events (25) — read by /projects/{id}/overview.
CREATE TABLE IF NOT EXISTS projects (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT UNIQUE NOT NULL,
settings JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

ALTER TABLE log_chunks ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL;
ALTER TABLE watcher_configs ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL;
ALTER TABLE conversations ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL;
ALTER TABLE investigations ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL;

CREATE INDEX IF NOT EXISTS log_chunks_project_ts_idx ON log_chunks (project_id, timestamp_start);

-- Real signature column ("Path B" from cluster_view.py): enables corpus-wide
-- GROUP BY signature for the project overview's clusters and event feed,
-- instead of re-parsing it out of `text` per row in Python. The ingestor
-- writes it for new rows; the UPDATE below backfills pre-existing rows from
-- the templated text body ("Signature: <sig>\nExamples: ...").
ALTER TABLE log_chunks ADD COLUMN IF NOT EXISTS signature TEXT;
UPDATE log_chunks
SET signature = split_part(split_part(text, E'\n', 1), 'Signature: ', 2)
WHERE signature IS NULL;
CREATE INDEX IF NOT EXISTS log_chunks_signature_idx ON log_chunks (project_id, signature);

-- Seed a Default project on first run and absorb all pre-project rows into it.
-- Idempotent: the seed only fires when `projects` is empty, and the backfills
-- only touch NULL project_id rows.
DO $$
DECLARE
default_id UUID;
BEGIN
IF NOT EXISTS (SELECT 1 FROM projects) THEN
INSERT INTO projects (name) VALUES ('Default');
END IF;
SELECT id INTO default_id FROM projects WHERE name = 'Default';
IF default_id IS NOT NULL THEN
UPDATE log_chunks SET project_id = default_id WHERE project_id IS NULL;
UPDATE watcher_configs SET project_id = default_id WHERE project_id IS NULL;
UPDATE conversations SET project_id = default_id WHERE project_id IS NULL;
UPDATE investigations SET project_id = default_id WHERE project_id IS NULL;
END IF;
END $$;
2 changes: 2 additions & 0 deletions repi/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from repi.api.config import router as config_router
from repi.api.chat import router as chat_router
from repi.api.conversations import router as conversations_router
from repi.api.projects import router as projects_router

logger = logging.getLogger("repi.api")

Expand Down Expand Up @@ -44,6 +45,7 @@ async def lifespan(app: FastAPI):
app.include_router(config_router, tags=["config"])
app.include_router(chat_router, tags=["chat"])
app.include_router(conversations_router, tags=["conversations"])
app.include_router(projects_router, tags=["projects"])


@app.get("/health", tags=["health"])
Expand Down
16 changes: 12 additions & 4 deletions repi/api/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ class ChatRequest(BaseModel):
# payloads early. The legitimate caller only ever sends the last
# assistant turn's citations (≤10 in practice).
previous_chunk_ids: List[str] = Field(default_factory=list, max_length=50)
# UX P1: scopes retrieval + known-services resolution to one project. If
# omitted but the conversation has a project, that project applies.
project_id: Optional[UUID] = None


# ── Module-level constants ────────────────────────────────────────────────────
Expand Down Expand Up @@ -171,9 +174,10 @@ async def event_generator():
# Resolve or create the conversation row up front so every event the
# client sees can carry the (eventual) conversation_id.
conversation_id = req.conversation_id
project_id = req.project_id
async with container.async_session_maker() as session:
if conversation_id is None:
conv = Conversation(title=req.query[:80])
conv = Conversation(title=req.query[:80], project_id=project_id)
session.add(conv)
await session.commit()
await session.refresh(conv)
Expand All @@ -185,9 +189,12 @@ async def event_generator():
res = await session.exec(stmt)
existing = res.first()
if existing is None:
conv = Conversation(id=conversation_id, title=req.query[:80])
conv = Conversation(id=conversation_id, title=req.query[:80], project_id=project_id)
session.add(conv)
await session.commit()
elif project_id is None:
# Inherit the conversation's project when not pinned.
project_id = existing.project_id

# Persist the user turn immediately — the client gets a citation-free
# echo if the LLM call errors out mid-stream.
Expand All @@ -202,7 +209,7 @@ async def event_generator():
try:
# ── Intent resolution ────────────────────────────────────────────
now = _dh.now()
known_services = container.known_services or []
known_services = await container.get_known_services(project_id) or []
resolution = resolve_intent(req.query, known_services, now)

if isinstance(resolution, ClarificationNeeded):
Expand Down Expand Up @@ -314,6 +321,7 @@ async def event_generator():
source_service=service,
time_from=time_from,
time_to=time_to,
project_id=project_id,
)
# search_diverse over-fetches then service-stratifies the top-k
# so a noisy single service can't crowd out the cross-service
Expand Down Expand Up @@ -342,7 +350,7 @@ async def event_generator():
# Entity-bias merge: UNION RRF + find_logs_by_id, deduped by chunk_id.
if entities and container.pool is not None:
for ent in entities:
extra = await find_logs_by_id(container.pool, entity=ent, top_k=20)
extra = await find_logs_by_id(container.pool, entity=ent, top_k=20, project_id=project_id)
for c in extra:
if c["chunk_id"] in seen:
continue
Expand Down
17 changes: 16 additions & 1 deletion repi/api/conversations.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from sqlmodel import select

from repi.core.container import get_container
from repi.models.schema import ChatMessage, Conversation, Investigation
from repi.models.schema import ChatMessage, Conversation, Investigation, Project

logger = logging.getLogger("repi.api.conversations")

Expand All @@ -25,6 +25,8 @@
class ConversationSummary(BaseModel):
id: str
title: Optional[str]
project_id: Optional[str] = None
project_name: Optional[str] = None
created_at: str
updated_at: str

Expand All @@ -43,6 +45,8 @@ class TranscriptTurn(BaseModel):
class ConversationDetail(BaseModel):
id: str
title: Optional[str]
project_id: Optional[str] = None
project_name: Optional[str] = None
created_at: str
updated_at: str
turns: List[TranscriptTurn]
Expand All @@ -59,10 +63,14 @@ async def list_conversations(limit: int = 50):
)
res = await session.exec(stmt)
rows = list(res.all())
name_res = await session.exec(select(Project.id, Project.name))
project_names = {pid: name for pid, name in name_res.all()}
return [
ConversationSummary(
id=str(c.id),
title=c.title,
project_id=str(c.project_id) if c.project_id else None,
project_name=project_names.get(c.project_id),
created_at=c.created_at.isoformat(),
updated_at=c.updated_at.isoformat(),
)
Expand All @@ -84,6 +92,11 @@ async def get_conversation(conversation_id: str):
if conv is None:
raise HTTPException(status_code=404, detail="Conversation not found")

project_name = None
if conv.project_id is not None:
proj = await session.get(Project, conv.project_id)
project_name = proj.name if proj else None

msg_res = await session.exec(
select(ChatMessage)
.where(ChatMessage.conversation_id == cid)
Expand Down Expand Up @@ -125,6 +138,8 @@ async def get_conversation(conversation_id: str):
return ConversationDetail(
id=str(conv.id),
title=conv.title,
project_id=str(conv.project_id) if conv.project_id else None,
project_name=project_name,
created_at=conv.created_at.isoformat(),
updated_at=conv.updated_at.isoformat(),
turns=turns,
Expand Down
13 changes: 11 additions & 2 deletions repi/api/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

class IngestResponse(BaseModel):
service: str
project: str
chunk_count: int
lines_total: int
lines_with_timestamp: int
Expand All @@ -18,18 +19,25 @@ class IngestResponse(BaseModel):
@router.post("/ingest", response_model=IngestResponse)
async def ingest(
service: str = Form(...),
file: UploadFile = File(...)
file: UploadFile = File(...),
project: str | None = Form(None),
):
"""
Ingest logs from a file for a specific service.

`project` is a project name (get-or-create) or id (must exist); omitted
→ the Default project.
"""
from repi.api.projects import resolve_project

content = await file.read()
content_str = content.decode("utf-8")

container = get_container()
async with container.get_session() as session:
project_row = await resolve_project(session, project)
ingestor = container.get_ingestor(session)
stats = await ingestor.ingest(content_str, service)
stats = await ingestor.ingest(content_str, service, project_id=project_row.id)

# Refresh the in-memory service list so a brand-new service is visible to
# the intent resolver immediately, not only after a restart or GET /services.
Expand All @@ -44,6 +52,7 @@ async def ingest(

return IngestResponse(
service=service,
project=project_row.name,
chunk_count=stats.chunk_count,
lines_total=stats.lines_total,
lines_with_timestamp=stats.lines_with_timestamp,
Expand Down
54 changes: 43 additions & 11 deletions repi/api/investigate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class InvestigateRequest(BaseModel):
# conversation row is created and its id returned so the UI can attach
# subsequent /chat turns to the same thread.
conversation_id: Optional[UUID] = None
# UX P1: scopes retrieval + every ReAct tool to one project. If omitted
# but the conversation has a project, the conversation's project applies.
project_id: Optional[UUID] = None

class InvestigationStepModel(BaseModel):
step_number: int
Expand Down Expand Up @@ -93,8 +96,9 @@ async def investigate(request: InvestigateRequest):
from sqlalchemy import text as sa_text

conversation_id = request.conversation_id
project_id = request.project_id
if conversation_id is None:
conv = Conversation(title=request.query[:80])
conv = Conversation(title=request.query[:80], project_id=project_id)
session.add(conv)
await session.commit()
await session.refresh(conv)
Expand All @@ -103,12 +107,18 @@ async def investigate(request: InvestigateRequest):
# Validate it exists; if not, materialise with the caller's id.
stmt = sm_select(Conversation).where(Conversation.id == conversation_id)
res = await session.exec(stmt)
if res.first() is None:
session.add(Conversation(id=conversation_id, title=request.query[:80]))
existing = res.first()
if existing is None:
session.add(Conversation(id=conversation_id, title=request.query[:80], project_id=project_id))
await session.commit()
elif project_id is None:
# Inherit the conversation's project when the caller didn't pin one.
project_id = existing.project_id

store = container.get_investigation_store(session)
investigation = await store.get_or_create(request.query, conversation_id=conversation_id)
investigation = await store.get_or_create(
request.query, conversation_id=conversation_id, project_id=project_id
)

# Bump the conversation's updated_at so the sidebar surfaces the
# thread to the top even when activity is investigation-side rather
Expand Down Expand Up @@ -173,14 +183,24 @@ async def stream_investigation(investigation_id: str):
async def event_generator():
container = get_container()
async with container.get_session() as session:
loop = container.get_investigation_loop(session)
store = loop.store
store = container.get_investigation_store(session)
investigation = await store.get_by_id(uuid_obj)

if not investigation:
yield f"data: {json.dumps({'type': 'error', 'data': {'message': 'Investigation not found'}})}\n\n"
return

# Build the loop scoped to the investigation's project: every tool
# call carries project_id and the resolver sees only that
# project's services.
scoped_services = await container.get_known_services(investigation.project_id)
loop = container.get_investigation_loop(
session,
project_id=investigation.project_id,
known_services=scoped_services,
)
store = loop.store

steps = await store.get_steps(uuid_obj)
for s in steps:
# Persisted shape is {name, args}; the UI consumes {tool, args}
Expand All @@ -198,7 +218,7 @@ async def event_generator():
"observation": s.observation,
"kind": getattr(s, "kind", None),
}
yield f"data: {json.dumps({'type': 'step', 'data': step_data})}\n\n"
yield f"data: {json.dumps({'type': 'step', 'data': step_data}, default=str)}\n\n"

if investigation.status in ("completed", "failed"):
yield f"data: {json.dumps({'type': 'phase_change', 'data': {'phase': 'done'}})}\n\n"
Expand Down Expand Up @@ -239,14 +259,26 @@ async def on_phase_change(phase: str):

try:
event = await asyncio.wait_for(queue.get(), timeout=1.0)
yield f"data: {json.dumps(event)}\n\n"
yield f"data: {json.dumps(event, default=str)}\n\n"
except asyncio.TimeoutError:
continue

try:
result = await task
done_payload = {"answer": result.answer, "stats": result.stats}
yield f"data: {json.dumps({'type': 'done', 'data': done_payload})}\n\n"
# The loop may have paused for clarification mid-flight rather
# than finishing. In that case it persisted status
# 'awaiting_clarification' (with a question) and returned a
# placeholder answer — surface the question so the UI can prompt
# for a reply, exactly as the replay path above does. Without
# this the live stream emits a meaningless 'done' and the user
# has no way to answer until they reload the page.
refreshed = await store.get_by_id(uuid_obj)
if refreshed and refreshed.status == "awaiting_clarification":
question = refreshed.pending_question or ""
yield f"data: {json.dumps({'type': 'clarification_request', 'data': {'question': question, 'investigation_id': investigation_id}})}\n\n"
else:
done_payload = {"answer": result.answer, "stats": result.stats}
yield f"data: {json.dumps({'type': 'done', 'data': done_payload}, default=str)}\n\n"
except Exception as e:
logger.error(f"Investigation failed: {e}")
yield f"data: {json.dumps({'type': 'error', 'data': {'message': str(e)}})}\n\n"
Expand Down
Loading
Loading