From 9a7bdec7e8c1d1530b67086e1a1f46657c82d1c8 Mon Sep 17 00:00:00 2001 From: Bailey Dixon Date: Sun, 19 Apr 2026 14:28:33 -0400 Subject: [PATCH] =?UTF-8?q?feat(v3):=20chat=20rooms=20primitive=20?= =?UTF-8?q?=E2=80=94=20daemon=20RPCs=20+=20CLI=20+=20client=20wrappers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ships `arc chat rooms create/list`, `arc chat post`, `arc chat read`, and `arc chat wait` on top of the v3 daemon's `chat_rooms` + `chat_messages` tables. Messages auto-extract `@mentions` on post, support threading via `--reply-to`, and `chat.wait` blocks on room subscribers until a matching message arrives or the timeout elapses. Protocol v1 additions (all new fields optional): - chat.create, chat.list, chat.post, chat.read, chat.wait - Schemas in packages/client/src/protocol-chat.ts Hub gains a `subscribeRaw` API for in-process RPC handlers to await topic events without going through the WebSocket encoder — used by chat.wait. External WebSocket subscribers continue to work unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/cli/src/cli.ts | 85 ++++++++++- packages/cli/src/commands/chat.ts | 213 +++++++++++++++++++++++++++ packages/client/src/client.ts | 40 +++++ packages/client/src/index.ts | 1 + packages/client/src/protocol-chat.ts | 90 +++++++++++ packages/client/src/protocol.ts | 5 + packages/daemon/src/hub.ts | 59 ++++++-- packages/daemon/src/rpc/chat.ts | 202 +++++++++++++++++++++++++ packages/daemon/src/rpc/index.ts | 6 + tests/chat-rooms.test.ts | 213 +++++++++++++++++++++++++++ 10 files changed, 902 insertions(+), 12 deletions(-) create mode 100644 packages/client/src/protocol-chat.ts create mode 100644 packages/daemon/src/rpc/chat.ts create mode 100644 tests/chat-rooms.test.ts diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index b2f19df..3abc126 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -345,7 +345,7 @@ Examples: } ); - program + const chatCmd = program .command("chat") .description("Interactive chat with your active profile's agent (with ARC tool use)") .option("--profile ", "Profile to use (default: active)") @@ -363,6 +363,9 @@ Examples: $ arc chat --mode read-only (no writes) $ arc chat --session abc-123 (resume) $ arc chat --no-tools (plain chat only) + $ arc chat rooms create standup (new chat room via daemon) + $ arc chat post standup "hello @agent-1" (post a message) + $ arc chat wait standup --mentioning me (block until mentioned) REPL commands: /exit /quit End the session @@ -395,6 +398,86 @@ REPL commands: }, ); + const chatRooms = chatCmd + .command("rooms") + .description("Manage chat rooms backed by the ARC daemon"); + + chatRooms + .command("create ") + .description("Create a new chat room (idempotent by name)") + .option("--json", "Output machine-readable JSON") + .action(async (name: string, opts: { json?: boolean }) => { + const mod = await import("./commands/chat.js"); + await mod.handleChatRoomsCreate(name, opts); + }); + + chatRooms + .command("list") + .alias("ls") + .description("List all chat rooms") + .option("--json", "Output machine-readable JSON") + .action(async (opts: { json?: boolean }) => { + const mod = await import("./commands/chat.js"); + await mod.handleChatRoomsList(opts); + }); + + chatCmd + .command("post ") + .description("Post a message to a chat room (extracts @mentions from text)") + .option("--as ", "Author id (default: user)") + .option("--reply-to ", "Parent message id to thread under") + .option("--json", "Output machine-readable JSON") + .action( + async ( + room: string, + text: string, + opts: { as?: string; replyTo?: string; json?: boolean }, + ) => { + const mod = await import("./commands/chat.js"); + await mod.handleChatPost(room, text, opts); + }, + ); + + chatCmd + .command("read ") + .description("Read messages from a chat room") + .option("--since ", "Only show messages after a duration (e.g. 5m, 1h, 30s)") + .option("--mentions-only", "Only show messages that contain any mention") + .option("--mentioning ", "Only show messages that mention this id (or @everyone)") + .option("--limit ", "Maximum messages (default 200)") + .option("--json", "Output machine-readable JSON") + .action( + async ( + room: string, + opts: { + since?: string; + mentionsOnly?: boolean; + mentioning?: string; + limit?: string; + json?: boolean; + }, + ) => { + const mod = await import("./commands/chat.js"); + await mod.handleChatRead(room, opts); + }, + ); + + chatCmd + .command("wait ") + .description("Block until a new message arrives (optionally mentioning )") + .option("--mentioning ", "Only resolve on messages mentioning this id (or @everyone)") + .option("--timeout ", "Timeout duration (e.g. 30s, 5m) [default 60s]") + .option("--json", "Output machine-readable JSON") + .action( + async ( + room: string, + opts: { mentioning?: string; timeout?: string; json?: boolean }, + ) => { + const mod = await import("./commands/chat.js"); + await mod.handleChatWait(room, opts); + }, + ); + program .command("roundtable ") .description("Run a multi-agent roundtable discussion across profiles") diff --git a/packages/cli/src/commands/chat.ts b/packages/cli/src/commands/chat.ts index 511af85..a3f9b6e 100644 --- a/packages/cli/src/commands/chat.ts +++ b/packages/cli/src/commands/chat.ts @@ -11,6 +11,7 @@ * - `ChatSession` + store (Phase 4) — multi-turn persistence. */ +import fs from "node:fs"; import readline from "node:readline"; import pc from "picocolors"; import { @@ -35,6 +36,8 @@ import { type ToolContext, type AgentEvent, } from "@axiom-labs/arc-core"; +import { loadDaemonConfig } from "@axiom-labs/arc-daemon"; +import { ArcClient, type ChatMessage as WireChatMessage } from "@axiom-labs/arc-client"; import { getVersion } from "../display.js"; // --------------------------------------------------------------------------- @@ -631,3 +634,213 @@ export async function handleChat(opts: ChatOptions): Promise { printSystem("Goodbye."); } + +// --------------------------------------------------------------------------- +// Chat rooms CLI — thin wrappers over the ARC daemon's chat.* RPCs. +// --------------------------------------------------------------------------- + +async function connectToDaemon(): Promise { + const cfg = loadDaemonConfig(); + let token: string | null; + try { + const parsed = JSON.parse(fs.readFileSync(cfg.authPath, "utf8")) as { rootToken?: string }; + token = typeof parsed.rootToken === "string" ? parsed.rootToken : null; + } catch { + token = null; + } + if (!token) { + throw new Error( + `ARC daemon auth not found at ${cfg.authPath}. Start the daemon with \`arc daemon start\` first.`, + ); + } + const host = cfg.host === "0.0.0.0" ? "127.0.0.1" : cfg.host; + const url = `ws://${host}:${cfg.port}`; + const client = new ArcClient({ url, token, noReconnect: true }); + try { + await client.connect(); + } catch (err) { + throw new Error( + `failed to connect to ARC daemon on ${url} (${(err as Error).message}). Is \`arc daemon\` running?`, + ); + } + return client; +} + +async function withDaemon(fn: (client: ArcClient) => Promise): Promise { + const client = await connectToDaemon(); + try { + return await fn(client); + } finally { + try { + await client.close(); + } catch { + /* ignore — best-effort on shutdown */ + } + } +} + +/** Parse a duration string like "30s", "5m", "1h". Falsy → null. */ +export function parseDuration(input: string | undefined): number | null { + if (!input) return null; + const trimmed = input.trim(); + if (trimmed === "") return null; + const match = /^(\d+)\s*(ms|s|m|h|d)?$/.exec(trimmed); + if (!match) { + // allow bare numbers = ms + if (/^\d+$/.test(trimmed)) return Number.parseInt(trimmed, 10); + throw new Error( + `invalid duration "${input}" — expected e.g. 500ms, 30s, 5m, 1h, 2d`, + ); + } + const n = Number.parseInt(match[1]!, 10); + const unit = (match[2] ?? "ms").toLowerCase(); + switch (unit) { + case "ms": + return n; + case "s": + return n * 1000; + case "m": + return n * 60_000; + case "h": + return n * 3_600_000; + case "d": + return n * 86_400_000; + } + return n; +} + +function formatTs(ts: number): string { + try { + return new Date(ts).toISOString(); + } catch { + return String(ts); + } +} + +function printRoom(r: { id: string; name: string; createdAt: number }): void { + writeLine(` ${pc.cyan(r.name)} ${pc.dim(r.id)} ${pc.dim(formatTs(r.createdAt))}`); +} + +function printMessage(m: WireChatMessage): void { + const head = `${pc.dim("#" + m.id)} ${pc.cyan(m.author)}${m.replyTo ? pc.dim(` →#${m.replyTo}`) : ""} ${pc.dim(formatTs(m.ts))}`; + const mentions = + m.mentions && m.mentions.length > 0 + ? ` ${pc.yellow(m.mentions.map((x) => "@" + x).join(" "))}` + : ""; + writeLine(head + mentions); + writeLine(` ${m.body}`); +} + +export async function handleChatRoomsCreate( + name: string, + opts: { json?: boolean } = {}, +): Promise { + await withDaemon(async (client) => { + const { room } = await client.chat.create({ name }); + if (opts.json) { + process.stdout.write(JSON.stringify({ room }, null, 2) + "\n"); + } else { + printSystem(`Chat room ready: ${pc.cyan(room.name)} ${pc.dim(room.id)}`); + } + }); +} + +export async function handleChatRoomsList(opts: { json?: boolean } = {}): Promise { + await withDaemon(async (client) => { + const { rooms } = await client.chat.list(); + if (opts.json) { + process.stdout.write(JSON.stringify({ rooms }, null, 2) + "\n"); + return; + } + if (rooms.length === 0) { + printSystem("No chat rooms yet. Create one with `arc chat rooms create `."); + return; + } + writeLine(""); + for (const r of rooms) printRoom(r); + writeLine(""); + }); +} + +export async function handleChatPost( + room: string, + text: string, + opts: { as?: string; replyTo?: string; json?: boolean } = {}, +): Promise { + const author = opts.as ?? "user"; + let replyTo: number | undefined; + if (opts.replyTo !== undefined) { + replyTo = Number.parseInt(opts.replyTo, 10); + if (!Number.isFinite(replyTo)) { + throw new Error(`--reply-to must be a number (got "${opts.replyTo}")`); + } + } + await withDaemon(async (client) => { + const { message } = await client.chat.post({ + room, + author, + body: text, + ...(replyTo !== undefined ? { replyTo } : {}), + }); + if (opts.json) { + process.stdout.write(JSON.stringify({ message }, null, 2) + "\n"); + } else { + printMessage(message); + } + }); +} + +export async function handleChatRead( + room: string, + opts: { + since?: string; + mentionsOnly?: boolean; + mentioning?: string; + limit?: string; + json?: boolean; + } = {}, +): Promise { + const sinceDur = parseDuration(opts.since); + const sinceMs = sinceDur !== null && sinceDur > 0 ? Date.now() - sinceDur : undefined; + const limit = opts.limit !== undefined ? Number.parseInt(opts.limit, 10) : undefined; + await withDaemon(async (client) => { + const params: Parameters[0] = { room }; + if (sinceMs !== undefined) params.sinceMs = sinceMs; + if (opts.mentionsOnly) params.mentionsOnly = true; + if (opts.mentioning) params.mentioning = opts.mentioning; + if (limit !== undefined && Number.isFinite(limit)) params.limit = limit; + const { messages } = await client.chat.read(params); + if (opts.json) { + process.stdout.write(JSON.stringify({ messages }, null, 2) + "\n"); + return; + } + if (messages.length === 0) { + printSystem(`No messages in ${room}.`); + return; + } + writeLine(""); + for (const m of messages) printMessage(m); + writeLine(""); + }); +} + +export async function handleChatWait( + room: string, + opts: { mentioning?: string; timeout?: string; json?: boolean } = {}, +): Promise { + const timeoutMs = parseDuration(opts.timeout) ?? 60_000; + await withDaemon(async (client) => { + const params: Parameters[0] = { room, timeoutMs }; + if (opts.mentioning) params.mentioning = opts.mentioning; + const result = await client.chat.wait(params); + if (opts.json) { + process.stdout.write(JSON.stringify(result, null, 2) + "\n"); + } else if (result.timedOut) { + printSystem(`No matching message within ${timeoutMs}ms.`); + process.exitCode = 2; + } else if (result.message) { + printMessage(result.message); + } + }); +} + diff --git a/packages/client/src/client.ts b/packages/client/src/client.ts index 0cbb10b..4c7e604 100644 --- a/packages/client/src/client.ts +++ b/packages/client/src/client.ts @@ -16,6 +16,17 @@ import { type ChannelId, type Envelope as EnvelopeT, } from "./protocol.js"; +import { + ChatCreateParams, + ChatCreateResult, + ChatListResult, + ChatPostParams, + ChatPostResult, + ChatReadParams, + ChatReadResult, + ChatWaitParams, + ChatWaitResult, +} from "./protocol-chat.js"; import { decodeFrame, encodeControl, encodeFrame } from "./frame.js"; export interface ArcClientOptions { @@ -152,6 +163,35 @@ export class ArcClient { ), }; + chat = { + create: ( + params: z.infer, + ): Promise> => + this.call(Methods.chat_create, ChatCreateParams.parse(params)).then((r) => + ChatCreateResult.parse(r), + ), + list: (): Promise> => + this.call(Methods.chat_list).then((r) => ChatListResult.parse(r)), + post: ( + params: z.infer, + ): Promise> => + this.call(Methods.chat_post, ChatPostParams.parse(params)).then((r) => + ChatPostResult.parse(r), + ), + read: ( + params: z.infer, + ): Promise> => + this.call(Methods.chat_read, ChatReadParams.parse(params)).then((r) => + ChatReadResult.parse(r), + ), + wait: ( + params: z.infer, + ): Promise> => + this.call(Methods.chat_wait, ChatWaitParams.parse(params)).then((r) => + ChatWaitResult.parse(r), + ), + }; + // --- Internals ----------------------------------------------------------- private nextId(): string { diff --git a/packages/client/src/index.ts b/packages/client/src/index.ts index a7e6421..169eabf 100644 --- a/packages/client/src/index.ts +++ b/packages/client/src/index.ts @@ -1,3 +1,4 @@ export * from "./protocol.js"; +export * from "./protocol-chat.js"; export * from "./frame.js"; export * from "./client.js"; diff --git a/packages/client/src/protocol-chat.ts b/packages/client/src/protocol-chat.ts new file mode 100644 index 0000000..5acc894 --- /dev/null +++ b/packages/client/src/protocol-chat.ts @@ -0,0 +1,90 @@ +import { z } from "zod"; + +/** + * Protocol v1 chat room schemas. All new fields MUST be `.optional()` with a + * default on the daemon side to keep the wire format forward-compatible. + */ + +export const ChatCreateParams = z.object({ + name: z.string().min(1), + metadata: z.record(z.unknown()).optional(), +}); + +export const ChatRoomSummary = z.object({ + id: z.string(), + name: z.string(), + createdAt: z.number(), + metadata: z.record(z.unknown()).nullable().optional(), +}); + +export const ChatCreateResult = z.object({ room: ChatRoomSummary }); + +export const ChatListResult = z.object({ rooms: z.array(ChatRoomSummary) }); + +export const ChatPostParams = z.object({ + room: z.string(), + author: z.string(), + body: z.string(), + replyTo: z.number().int().nonnegative().optional(), + /** Optional explicit mentions; overrides body extraction when provided. */ + mentions: z.array(z.string()).optional(), +}); + +export const ChatMessage = z.object({ + id: z.number(), + roomId: z.string(), + author: z.string(), + body: z.string(), + ts: z.number(), + replyTo: z.number().nullable().optional(), + mentions: z.array(z.string()).optional(), +}); +export type ChatMessage = z.infer; + +export const ChatPostResult = z.object({ message: ChatMessage }); + +export const ChatReadParams = z.object({ + room: z.string(), + sinceMs: z.number().int().nonnegative().optional(), + mentionsOnly: z.boolean().optional(), + mentioning: z.string().optional(), + limit: z.number().int().positive().max(1000).optional(), +}); + +export const ChatReadResult = z.object({ messages: z.array(ChatMessage) }); + +export const ChatWaitParams = z.object({ + room: z.string(), + mentioning: z.string().optional(), + timeoutMs: z.number().int().positive().max(10 * 60 * 1000).optional(), +}); + +export const ChatWaitResult = z.object({ + message: ChatMessage.nullable(), + timedOut: z.boolean(), +}); + +/** Extract `@token` mentions from a message body. */ +export function extractMentions(body: string): string[] { + const out = new Set(); + // Match @word, allow letters, digits, _, - and . (for agent ids like @agent-abc.1) + const re = /(^|[\s.,;:!?()[\]{}"'])@([A-Za-z0-9_.-]+)/g; + let m: RegExpExecArray | null; + while ((m = re.exec(body)) !== null) { + const id = m[2]; + if (id) out.add(id); + } + return Array.from(out); +} + +/** Returns true if `mentions` matches the caller `id` (or `@everyone`). */ +export function mentionMatches(mentions: string[] | undefined, id: string): boolean { + if (!mentions || mentions.length === 0) return false; + const target = id.startsWith("@") ? id.slice(1) : id; + for (const raw of mentions) { + const m = raw.startsWith("@") ? raw.slice(1) : raw; + if (m === target) return true; + if (m === "everyone") return true; + } + return false; +} diff --git a/packages/client/src/protocol.ts b/packages/client/src/protocol.ts index 5d9594e..1d39d67 100644 --- a/packages/client/src/protocol.ts +++ b/packages/client/src/protocol.ts @@ -59,6 +59,11 @@ export const Methods = { agent_run: "agent.run", agent_stop: "agent.stop", agent_send: "agent.send", + chat_create: "chat.create", + chat_list: "chat.list", + chat_post: "chat.post", + chat_read: "chat.read", + chat_wait: "chat.wait", } as const; export type MethodName = (typeof Methods)[keyof typeof Methods]; diff --git a/packages/daemon/src/hub.ts b/packages/daemon/src/hub.ts index 5b9bbef..abe551e 100644 --- a/packages/daemon/src/hub.ts +++ b/packages/daemon/src/hub.ts @@ -1,13 +1,20 @@ import type { Session } from "./ws/session.js"; import type { Envelope } from "@axiom-labs/arc-client"; +export type RawSubscriber = (payload: unknown, topic: string) => void; + /** * Subscription fan-out. Keeps a set of sessions per topic; `publish` pushes * an event envelope to every subscribed session. + * + * Also supports "raw" in-process subscribers (callbacks not bound to a Session) + * so that server-side RPC handlers like `chat.wait` can await topic events + * without going through the WebSocket encoder. */ export class Hub { private byTopic = new Map>(); private bySession = new WeakMap>(); + private rawByTopic = new Map>(); subscribe(session: Session, topic: string): void { let set = this.byTopic.get(topic); @@ -40,19 +47,49 @@ export class Hub { this.bySession.delete(session); } + /** Subscribe a raw in-process callback. Returns an unsubscribe fn. */ + subscribeRaw(topic: string, fn: RawSubscriber): () => void { + let set = this.rawByTopic.get(topic); + if (!set) { + set = new Set(); + this.rawByTopic.set(topic, set); + } + set.add(fn); + return () => this.unsubscribeRaw(topic, fn); + } + + unsubscribeRaw(topic: string, fn: RawSubscriber): void { + const set = this.rawByTopic.get(topic); + if (!set) return; + set.delete(fn); + if (set.size === 0) this.rawByTopic.delete(topic); + } + publish(topic: string, payload: unknown): void { const set = this.byTopic.get(topic); - if (!set) return; - const envelope: Envelope = { - v: 1, - id: randomEventId(), - type: "event", - topic, - payload, - }; - for (const session of set) { - if (!session.conn.alive) continue; - session.sendControl(envelope); + if (set) { + const envelope: Envelope = { + v: 1, + id: randomEventId(), + type: "event", + topic, + payload, + }; + for (const session of set) { + if (!session.conn.alive) continue; + session.sendControl(envelope); + } + } + const rawSet = this.rawByTopic.get(topic); + if (rawSet) { + // Snapshot before calling to allow unsubscribe during iteration. + for (const fn of [...rawSet]) { + try { + fn(payload, topic); + } catch { + // Subscribers own their error handling; never let one take out others. + } + } } } } diff --git a/packages/daemon/src/rpc/chat.ts b/packages/daemon/src/rpc/chat.ts new file mode 100644 index 0000000..4933321 --- /dev/null +++ b/packages/daemon/src/rpc/chat.ts @@ -0,0 +1,202 @@ +import crypto from "node:crypto"; +import { + ChatCreateParams, + ChatPostParams, + ChatReadParams, + ChatWaitParams, + Topics, + extractMentions, + mentionMatches, + type ChatMessage, +} from "@axiom-labs/arc-client"; +import type { RpcHandler } from "./types.js"; +import type { DB } from "../db.js"; + +/** + * Chat-room RPCs — rooms + append-only messages with mention extraction. + * Schemas live in `@axiom-labs/arc-client/protocol-chat.ts`. + */ + +interface ChatRoomRow { + id: string; + name: string; + created_at: number; + metadata: string | null; +} + +interface ChatMessageRow { + id: number; + room_id: string; + author: string; + reply_to: number | null; + mentions: string | null; + body: string; + ts: number; +} + +function roomRowToSummary(row: ChatRoomRow): { + id: string; + name: string; + createdAt: number; + metadata: Record | null; +} { + return { + id: row.id, + name: row.name, + createdAt: row.created_at, + metadata: row.metadata ? (JSON.parse(row.metadata) as Record) : null, + }; +} + +function messageRowToWire(row: ChatMessageRow): ChatMessage { + const mentions = row.mentions ? (JSON.parse(row.mentions) as string[]) : []; + return { + id: row.id, + roomId: row.room_id, + author: row.author, + body: row.body, + ts: row.ts, + replyTo: row.reply_to, + mentions, + }; +} + +function badRequest(message: string): Error & { code: string } { + const err = new Error(message) as Error & { code: string }; + err.code = "bad_request"; + return err; +} + +function notFound(message: string): Error & { code: string } { + const err = new Error(message) as Error & { code: string }; + err.code = "not_found"; + return err; +} + +/** Resolve a room by id or by name. */ +function findRoom(db: DB, key: string): ChatRoomRow | null { + const byId = db.prepare("SELECT * FROM chat_rooms WHERE id = ?").get(key) as + | ChatRoomRow + | undefined; + if (byId) return byId; + const byName = db.prepare("SELECT * FROM chat_rooms WHERE name = ?").get(key) as + | ChatRoomRow + | undefined; + return byName ?? null; +} + +export const chatCreate: RpcHandler = (raw, ctx) => { + const { name, metadata } = ChatCreateParams.parse(raw); + const existing = ctx.db.prepare("SELECT * FROM chat_rooms WHERE name = ?").get(name) as + | ChatRoomRow + | undefined; + if (existing) return { room: roomRowToSummary(existing) }; + + const id = crypto.randomUUID(); + const createdAt = Date.now(); + const metaStr = metadata ? JSON.stringify(metadata) : null; + ctx.db + .prepare("INSERT INTO chat_rooms (id, name, created_at, metadata) VALUES (?, ?, ?, ?)") + .run(id, name, createdAt, metaStr); + return { room: { id, name, createdAt, metadata: metadata ?? null } }; +}; + +export const chatList: RpcHandler = (_params, ctx) => { + const rows = ctx.db + .prepare("SELECT * FROM chat_rooms ORDER BY created_at ASC") + .all() as ChatRoomRow[]; + return { rooms: rows.map(roomRowToSummary) }; +}; + +export const chatPost: RpcHandler = (raw, ctx) => { + const params = ChatPostParams.parse(raw); + const room = findRoom(ctx.db, params.room); + if (!room) throw notFound(`chat room not found: ${params.room}`); + + if (params.replyTo !== undefined) { + const parent = ctx.db + .prepare("SELECT id, room_id FROM chat_messages WHERE id = ?") + .get(params.replyTo) as { id: number; room_id: string } | undefined; + if (!parent) throw badRequest(`reply_to message not found: ${params.replyTo}`); + if (parent.room_id !== room.id) { + throw badRequest("reply_to message is in a different room"); + } + } + + const mentions = params.mentions ?? extractMentions(params.body); + const mentionsStr = mentions.length > 0 ? JSON.stringify(mentions) : null; + const ts = Date.now(); + const result = ctx.db + .prepare( + "INSERT INTO chat_messages (room_id, author, reply_to, mentions, body, ts) VALUES (?, ?, ?, ?, ?, ?)", + ) + .run(room.id, params.author, params.replyTo ?? null, mentionsStr, params.body, ts); + const id = Number(result.lastInsertRowid); + const message: ChatMessage = { + id, + roomId: room.id, + author: params.author, + body: params.body, + ts, + replyTo: params.replyTo ?? null, + mentions, + }; + + ctx.hub.publish(Topics.chatRoom(room.id), message); + return { message }; +}; + +export const chatRead: RpcHandler = (raw, ctx) => { + const params = ChatReadParams.parse(raw); + const room = findRoom(ctx.db, params.room); + if (!room) throw notFound(`chat room not found: ${params.room}`); + const limit = Math.min(params.limit ?? 200, 1000); + const sinceMs = params.sinceMs ?? 0; + const rows = ctx.db + .prepare( + "SELECT * FROM chat_messages WHERE room_id = ? AND ts > ? ORDER BY ts ASC, id ASC LIMIT ?", + ) + .all(room.id, sinceMs, limit) as ChatMessageRow[]; + let messages = rows.map(messageRowToWire); + if (params.mentionsOnly) { + messages = messages.filter((m) => (m.mentions?.length ?? 0) > 0); + } + if (params.mentioning) { + messages = messages.filter((m) => mentionMatches(m.mentions, params.mentioning!)); + } + return { messages }; +}; + +/** + * chat.wait — blocks until a new message arrives in `room` that matches the + * optional `mentioning` filter, or until `timeoutMs` elapses. + */ +export const chatWait: RpcHandler = (raw, ctx) => { + const params = ChatWaitParams.parse(raw); + const room = findRoom(ctx.db, params.room); + if (!room) throw notFound(`chat room not found: ${params.room}`); + const timeoutMs = params.timeoutMs ?? 60_000; + const topic = Topics.chatRoom(room.id); + + return new Promise<{ message: ChatMessage | null; timedOut: boolean }>((resolve) => { + let resolved = false; + let unsubscribe: () => void = () => {}; + let timer: NodeJS.Timeout | null = null; + + const finish = (value: { message: ChatMessage | null; timedOut: boolean }): void => { + if (resolved) return; + resolved = true; + if (timer) clearTimeout(timer); + unsubscribe(); + resolve(value); + }; + + unsubscribe = ctx.hub.subscribeRaw(topic, (payload) => { + const message = payload as ChatMessage; + if (params.mentioning && !mentionMatches(message.mentions, params.mentioning)) return; + finish({ message, timedOut: false }); + }); + + timer = setTimeout(() => finish({ message: null, timedOut: true }), timeoutMs); + }); +}; diff --git a/packages/daemon/src/rpc/index.ts b/packages/daemon/src/rpc/index.ts index 5341c04..18449d8 100644 --- a/packages/daemon/src/rpc/index.ts +++ b/packages/daemon/src/rpc/index.ts @@ -3,6 +3,7 @@ import { authLogin } from "./auth.js"; import { healthGet } from "./health.js"; import { profileGet, profileList } from "./profile.js"; import { agentList, agentRun, agentSend, agentStop } from "./agent.js"; +import { chatCreate, chatList, chatPost, chatRead, chatWait } from "./chat.js"; import type { RpcHandler } from "./types.js"; /** Methods that do NOT require an authenticated session. */ @@ -17,6 +18,11 @@ export const handlers: Record = { [Methods.agent_run]: agentRun, [Methods.agent_stop]: agentStop, [Methods.agent_send]: agentSend, + [Methods.chat_create]: chatCreate, + [Methods.chat_list]: chatList, + [Methods.chat_post]: chatPost, + [Methods.chat_read]: chatRead, + [Methods.chat_wait]: chatWait, }; export type { RpcContext } from "./types.js"; diff --git a/tests/chat-rooms.test.ts b/tests/chat-rooms.test.ts new file mode 100644 index 0000000..d48032c --- /dev/null +++ b/tests/chat-rooms.test.ts @@ -0,0 +1,213 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { startDaemon, type DaemonHandle } from "@axiom-labs/arc-daemon"; +import { ArcClient, extractMentions, mentionMatches } from "@axiom-labs/arc-client"; + +interface TestCtx { + tmp: string; + port: number; + handle: DaemonHandle & { stop: () => Promise }; + client: ArcClient; + token: string; +} + +describe("chat rooms primitive — daemon + client", () => { + let ctx: TestCtx | null = null; + + beforeEach(async () => { + const tmp = fs.mkdtempSync(path.join(os.tmpdir(), "arc-chat-test-")); + process.env["ARC_DIR"] = tmp; + const port = 18200 + Math.floor(Math.random() * 800); + const handle = await startDaemon({ port, version: "1.0.0-alpha.0-chat-test" }); + const authFile = JSON.parse( + fs.readFileSync(path.join(tmp, "auth.json"), "utf8"), + ) as { rootToken: string }; + const client = new ArcClient({ + url: `ws://127.0.0.1:${port}`, + token: authFile.rootToken, + noReconnect: true, + }); + await client.connect(); + ctx = { tmp, port, handle, client, token: authFile.rootToken }; + }); + + afterEach(async () => { + if (!ctx) return; + await ctx.client.close(); + await ctx.handle.stop(); + fs.rmSync(ctx.tmp, { recursive: true, force: true }); + ctx = null; + }); + + it("extractMentions parses @tokens from body", () => { + expect(extractMentions("hey @agent-1 and @you").sort()).toEqual( + ["agent-1", "you"].sort(), + ); + expect(extractMentions("no mentions here")).toEqual([]); + expect(extractMentions("end of line @tail")).toEqual(["tail"]); + expect(extractMentions("punct:@a, @b!")).toEqual(["a", "b"]); + // Email addresses should not match because `@` is not preceded by a boundary. + expect(extractMentions("contact me@example.com")).toEqual([]); + }); + + it("mentionMatches honours @everyone", () => { + expect(mentionMatches(["agent-1"], "agent-1")).toBe(true); + expect(mentionMatches(["everyone"], "someone-else")).toBe(true); + expect(mentionMatches(["other"], "agent-1")).toBe(false); + expect(mentionMatches([], "x")).toBe(false); + expect(mentionMatches(undefined, "x")).toBe(false); + }); + + it("creates a room and lists it", async () => { + const created = await ctx!.client.chat.create({ name: "standup" }); + expect(created.room.name).toBe("standup"); + expect(typeof created.room.id).toBe("string"); + + const again = await ctx!.client.chat.create({ name: "standup" }); + expect(again.room.id).toBe(created.room.id); + + const list = await ctx!.client.chat.list(); + expect(list.rooms.map((r) => r.name)).toContain("standup"); + }); + + it("posts messages by name and id, reads them back, and threads replies", async () => { + await ctx!.client.chat.create({ name: "general" }); + + const first = await ctx!.client.chat.post({ + room: "general", + author: "user", + body: "hello world", + }); + expect(first.message.body).toBe("hello world"); + expect(first.message.author).toBe("user"); + expect(first.message.mentions).toEqual([]); + + const second = await ctx!.client.chat.post({ + room: "general", + author: "agent-bot", + body: "reply", + replyTo: first.message.id, + }); + expect(second.message.replyTo).toBe(first.message.id); + + const read = await ctx!.client.chat.read({ room: "general" }); + expect(read.messages).toHaveLength(2); + expect(read.messages[0]!.id).toBe(first.message.id); + expect(read.messages[1]!.replyTo).toBe(first.message.id); + }); + + it("extracts @mentions on post and filters by mentioning", async () => { + await ctx!.client.chat.create({ name: "triage" }); + await ctx!.client.chat.post({ + room: "triage", + author: "user", + body: "plain message", + }); + await ctx!.client.chat.post({ + room: "triage", + author: "user", + body: "hey @agent-1 can you take this?", + }); + await ctx!.client.chat.post({ + room: "triage", + author: "user", + body: "FYI @everyone", + }); + + const all = await ctx!.client.chat.read({ room: "triage" }); + expect(all.messages).toHaveLength(3); + expect(all.messages[0]!.mentions).toEqual([]); + expect(all.messages[1]!.mentions).toEqual(["agent-1"]); + expect(all.messages[2]!.mentions).toEqual(["everyone"]); + + const mentionsOnly = await ctx!.client.chat.read({ + room: "triage", + mentionsOnly: true, + }); + expect(mentionsOnly.messages).toHaveLength(2); + + const mentioningAgent1 = await ctx!.client.chat.read({ + room: "triage", + mentioning: "agent-1", + }); + // agent-1 + @everyone both match + expect(mentioningAgent1.messages).toHaveLength(2); + }); + + it("chat.wait resolves when a new mentioning message arrives", async () => { + await ctx!.client.chat.create({ name: "watch" }); + + // schedule a post ~50ms in the future + const scheduler = setTimeout(() => { + ctx!.client.chat + .post({ room: "watch", author: "sender", body: "yo @me" }) + .catch(() => {}); + }, 50); + + const result = await ctx!.client.chat.wait({ + room: "watch", + mentioning: "me", + timeoutMs: 5000, + }); + clearTimeout(scheduler); + + expect(result.timedOut).toBe(false); + expect(result.message).not.toBeNull(); + expect(result.message!.body).toContain("@me"); + expect(result.message!.mentions).toContain("me"); + }); + + it("chat.wait times out if no matching message arrives", async () => { + await ctx!.client.chat.create({ name: "quiet" }); + const result = await ctx!.client.chat.wait({ + room: "quiet", + mentioning: "me", + timeoutMs: 150, + }); + expect(result.timedOut).toBe(true); + expect(result.message).toBeNull(); + }); + + it("chat.read honours sinceMs", async () => { + await ctx!.client.chat.create({ name: "recent" }); + await ctx!.client.chat.post({ room: "recent", author: "a", body: "first" }); + // ensure ts spacing + await new Promise((r) => setTimeout(r, 10)); + const cutoff = Date.now(); + await new Promise((r) => setTimeout(r, 10)); + await ctx!.client.chat.post({ room: "recent", author: "b", body: "second" }); + + const after = await ctx!.client.chat.read({ + room: "recent", + sinceMs: cutoff, + }); + expect(after.messages).toHaveLength(1); + expect(after.messages[0]!.body).toBe("second"); + }); + + it("chat.post rejects reply_to pointing to another room", async () => { + const a = await ctx!.client.chat.create({ name: "a" }); + const b = await ctx!.client.chat.create({ name: "b" }); + const first = await ctx!.client.chat.post({ + room: a.room.name, + author: "user", + body: "hi", + }); + await expect( + ctx!.client.chat.post({ + room: b.room.name, + author: "user", + body: "cross", + replyTo: first.message.id, + }), + ).rejects.toThrow(/different room/); + }); + + it("chat.post returns not_found for an unknown room", async () => { + await expect( + ctx!.client.chat.post({ room: "ghost", author: "u", body: "hi" }), + ).rejects.toThrow(/chat room not found/); + }); +});