Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,53 @@ CREATE INDEX IF NOT EXISTS leaderboard_dataset_idx ON leaderboard (dataset);
CREATE INDEX IF NOT EXISTS leaderboard_score_idx ON leaderboard (dataset, aggregate_score DESC);
CREATE INDEX IF NOT EXISTS leaderboard_created_idx ON leaderboard (created_at DESC);
CREATE INDEX IF NOT EXISTS leaderboard_backend_idx ON leaderboard (embedding_backend);

-- ── Projects (UX redesign P1) ────────────────────────────────────────────────
-- A project is a logical system/application (e.g. "Ecommerce Platform").
-- Workers, services (via log_chunks), conversations and investigations are
-- scoped to a project. `settings` keys: default_timeline_window ("5h"),
-- auto_load_timeline (true), max_events (25) — read by /projects/{id}/overview.
CREATE TABLE IF NOT EXISTS projects (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT UNIQUE NOT NULL,
settings JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

ALTER TABLE log_chunks ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL;
ALTER TABLE watcher_configs ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL;
ALTER TABLE conversations ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL;
ALTER TABLE investigations ADD COLUMN IF NOT EXISTS project_id UUID REFERENCES projects(id) ON DELETE SET NULL;

CREATE INDEX IF NOT EXISTS log_chunks_project_ts_idx ON log_chunks (project_id, timestamp_start);

-- Real signature column ("Path B" from cluster_view.py): enables corpus-wide
-- GROUP BY signature for the project overview's clusters and event feed,
-- instead of re-parsing it out of `text` per row in Python. The ingestor
-- writes it for new rows; the UPDATE below backfills pre-existing rows from
-- the templated text body ("Signature: <sig>\nExamples: ...").
ALTER TABLE log_chunks ADD COLUMN IF NOT EXISTS signature TEXT;
UPDATE log_chunks
SET signature = split_part(split_part(text, E'\n', 1), 'Signature: ', 2)
WHERE signature IS NULL;
CREATE INDEX IF NOT EXISTS log_chunks_signature_idx ON log_chunks (project_id, signature);

-- Seed a Default project on first run and absorb all pre-project rows into it.
-- Idempotent: the seed only fires when `projects` is empty, and the backfills
-- only touch NULL project_id rows.
DO $$
DECLARE
default_id UUID;
BEGIN
IF NOT EXISTS (SELECT 1 FROM projects) THEN
INSERT INTO projects (name) VALUES ('Default');
END IF;
SELECT id INTO default_id FROM projects WHERE name = 'Default';
IF default_id IS NOT NULL THEN
UPDATE log_chunks SET project_id = default_id WHERE project_id IS NULL;
UPDATE watcher_configs SET project_id = default_id WHERE project_id IS NULL;
UPDATE conversations SET project_id = default_id WHERE project_id IS NULL;
UPDATE investigations SET project_id = default_id WHERE project_id IS NULL;
END IF;
END $$;
2 changes: 2 additions & 0 deletions repi/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from repi.api.config import router as config_router
from repi.api.chat import router as chat_router
from repi.api.conversations import router as conversations_router
from repi.api.projects import router as projects_router

logger = logging.getLogger("repi.api")

Expand Down Expand Up @@ -44,6 +45,7 @@ async def lifespan(app: FastAPI):
app.include_router(config_router, tags=["config"])
app.include_router(chat_router, tags=["chat"])
app.include_router(conversations_router, tags=["conversations"])
app.include_router(projects_router, tags=["projects"])


@app.get("/health", tags=["health"])
Expand Down
16 changes: 12 additions & 4 deletions repi/api/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ class ChatRequest(BaseModel):
# payloads early. The legitimate caller only ever sends the last
# assistant turn's citations (≤10 in practice).
previous_chunk_ids: List[str] = Field(default_factory=list, max_length=50)
# UX P1: scopes retrieval + known-services resolution to one project. If
# omitted but the conversation has a project, that project applies.
project_id: Optional[UUID] = None


# ── Module-level constants ────────────────────────────────────────────────────
Expand Down Expand Up @@ -171,9 +174,10 @@ async def event_generator():
# Resolve or create the conversation row up front so every event the
# client sees can carry the (eventual) conversation_id.
conversation_id = req.conversation_id
project_id = req.project_id
async with container.async_session_maker() as session:
if conversation_id is None:
conv = Conversation(title=req.query[:80])
conv = Conversation(title=req.query[:80], project_id=project_id)
session.add(conv)
await session.commit()
await session.refresh(conv)
Expand All @@ -185,9 +189,12 @@ async def event_generator():
res = await session.exec(stmt)
existing = res.first()
if existing is None:
conv = Conversation(id=conversation_id, title=req.query[:80])
conv = Conversation(id=conversation_id, title=req.query[:80], project_id=project_id)
session.add(conv)
await session.commit()
elif project_id is None:
# Inherit the conversation's project when not pinned.
project_id = existing.project_id

# Persist the user turn immediately — the client gets a citation-free
# echo if the LLM call errors out mid-stream.
Expand All @@ -202,7 +209,7 @@ async def event_generator():
try:
# ── Intent resolution ────────────────────────────────────────────
now = _dh.now()
known_services = container.known_services or []
known_services = await container.get_known_services(project_id) or []
resolution = resolve_intent(req.query, known_services, now)

if isinstance(resolution, ClarificationNeeded):
Expand Down Expand Up @@ -314,6 +321,7 @@ async def event_generator():
source_service=service,
time_from=time_from,
time_to=time_to,
project_id=project_id,
)
# search_diverse over-fetches then service-stratifies the top-k
# so a noisy single service can't crowd out the cross-service
Expand Down Expand Up @@ -342,7 +350,7 @@ async def event_generator():
# Entity-bias merge: UNION RRF + find_logs_by_id, deduped by chunk_id.
if entities and container.pool is not None:
for ent in entities:
extra = await find_logs_by_id(container.pool, entity=ent, top_k=20)
extra = await find_logs_by_id(container.pool, entity=ent, top_k=20, project_id=project_id)
for c in extra:
if c["chunk_id"] in seen:
continue
Expand Down
17 changes: 16 additions & 1 deletion repi/api/conversations.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from sqlmodel import select

from repi.core.container import get_container
from repi.models.schema import ChatMessage, Conversation, Investigation
from repi.models.schema import ChatMessage, Conversation, Investigation, Project

logger = logging.getLogger("repi.api.conversations")

Expand All @@ -25,6 +25,8 @@
class ConversationSummary(BaseModel):
id: str
title: Optional[str]
project_id: Optional[str] = None
project_name: Optional[str] = None
created_at: str
updated_at: str

Expand All @@ -43,6 +45,8 @@ class TranscriptTurn(BaseModel):
class ConversationDetail(BaseModel):
id: str
title: Optional[str]
project_id: Optional[str] = None
project_name: Optional[str] = None
created_at: str
updated_at: str
turns: List[TranscriptTurn]
Expand All @@ -59,10 +63,14 @@ async def list_conversations(limit: int = 50):
)
res = await session.exec(stmt)
rows = list(res.all())
name_res = await session.exec(select(Project.id, Project.name))
project_names = {pid: name for pid, name in name_res.all()}
return [
ConversationSummary(
id=str(c.id),
title=c.title,
project_id=str(c.project_id) if c.project_id else None,
project_name=project_names.get(c.project_id),
created_at=c.created_at.isoformat(),
updated_at=c.updated_at.isoformat(),
)
Expand All @@ -84,6 +92,11 @@ async def get_conversation(conversation_id: str):
if conv is None:
raise HTTPException(status_code=404, detail="Conversation not found")

project_name = None
if conv.project_id is not None:
proj = await session.get(Project, conv.project_id)
project_name = proj.name if proj else None

msg_res = await session.exec(
select(ChatMessage)
.where(ChatMessage.conversation_id == cid)
Expand Down Expand Up @@ -125,6 +138,8 @@ async def get_conversation(conversation_id: str):
return ConversationDetail(
id=str(conv.id),
title=conv.title,
project_id=str(conv.project_id) if conv.project_id else None,
project_name=project_name,
created_at=conv.created_at.isoformat(),
updated_at=conv.updated_at.isoformat(),
turns=turns,
Expand Down
13 changes: 11 additions & 2 deletions repi/api/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

class IngestResponse(BaseModel):
service: str
project: str
chunk_count: int
lines_total: int
lines_with_timestamp: int
Expand All @@ -18,18 +19,25 @@ class IngestResponse(BaseModel):
@router.post("/ingest", response_model=IngestResponse)
async def ingest(
service: str = Form(...),
file: UploadFile = File(...)
file: UploadFile = File(...),
project: str | None = Form(None),
):
"""
Ingest logs from a file for a specific service.

`project` is a project name (get-or-create) or id (must exist); omitted
→ the Default project.
"""
from repi.api.projects import resolve_project

content = await file.read()
content_str = content.decode("utf-8")

container = get_container()
async with container.get_session() as session:
project_row = await resolve_project(session, project)
ingestor = container.get_ingestor(session)
stats = await ingestor.ingest(content_str, service)
stats = await ingestor.ingest(content_str, service, project_id=project_row.id)

# Refresh the in-memory service list so a brand-new service is visible to
# the intent resolver immediately, not only after a restart or GET /services.
Expand All @@ -44,6 +52,7 @@ async def ingest(

return IngestResponse(
service=service,
project=project_row.name,
chunk_count=stats.chunk_count,
lines_total=stats.lines_total,
lines_with_timestamp=stats.lines_with_timestamp,
Expand Down
34 changes: 27 additions & 7 deletions repi/api/investigate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class InvestigateRequest(BaseModel):
# conversation row is created and its id returned so the UI can attach
# subsequent /chat turns to the same thread.
conversation_id: Optional[UUID] = None
# UX P1: scopes retrieval + every ReAct tool to one project. If omitted
# but the conversation has a project, the conversation's project applies.
project_id: Optional[UUID] = None

class InvestigationStepModel(BaseModel):
step_number: int
Expand Down Expand Up @@ -93,8 +96,9 @@ async def investigate(request: InvestigateRequest):
from sqlalchemy import text as sa_text

conversation_id = request.conversation_id
project_id = request.project_id
if conversation_id is None:
conv = Conversation(title=request.query[:80])
conv = Conversation(title=request.query[:80], project_id=project_id)
session.add(conv)
await session.commit()
await session.refresh(conv)
Expand All @@ -103,12 +107,18 @@ async def investigate(request: InvestigateRequest):
# Validate it exists; if not, materialise with the caller's id.
stmt = sm_select(Conversation).where(Conversation.id == conversation_id)
res = await session.exec(stmt)
if res.first() is None:
session.add(Conversation(id=conversation_id, title=request.query[:80]))
existing = res.first()
if existing is None:
session.add(Conversation(id=conversation_id, title=request.query[:80], project_id=project_id))
await session.commit()
elif project_id is None:
# Inherit the conversation's project when the caller didn't pin one.
project_id = existing.project_id

store = container.get_investigation_store(session)
investigation = await store.get_or_create(request.query, conversation_id=conversation_id)
investigation = await store.get_or_create(
request.query, conversation_id=conversation_id, project_id=project_id
)

# Bump the conversation's updated_at so the sidebar surfaces the
# thread to the top even when activity is investigation-side rather
Expand Down Expand Up @@ -173,14 +183,24 @@ async def stream_investigation(investigation_id: str):
async def event_generator():
container = get_container()
async with container.get_session() as session:
loop = container.get_investigation_loop(session)
store = loop.store
store = container.get_investigation_store(session)
investigation = await store.get_by_id(uuid_obj)

if not investigation:
yield f"data: {json.dumps({'type': 'error', 'data': {'message': 'Investigation not found'}})}\n\n"
return

# Build the loop scoped to the investigation's project: every tool
# call carries project_id and the resolver sees only that
# project's services.
scoped_services = await container.get_known_services(investigation.project_id)
loop = container.get_investigation_loop(
session,
project_id=investigation.project_id,
known_services=scoped_services,
)
store = loop.store

steps = await store.get_steps(uuid_obj)
for s in steps:
# Persisted shape is {name, args}; the UI consumes {tool, args}
Expand Down
Loading
Loading