Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 56 additions & 5 deletions packages/client/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import WebSocket from "ws";
import { z } from "zod";
import {
AgentAttachParams,
AgentAttachResult,
AgentEventPayload,
AgentListResult,
AgentOkResult,
AgentRunParams,
Expand All @@ -18,6 +21,9 @@ import {
} from "./protocol.js";
import { decodeFrame, encodeControl, encodeFrame } from "./frame.js";

/** Length (in bytes) of the agent UUID prefix on terminal frames. */
const TERMINAL_ID_LEN = 36;

export interface ArcClientOptions {
/** WebSocket URL, e.g. `ws://127.0.0.1:7272`. */
url: string;
Expand Down Expand Up @@ -45,6 +51,7 @@ export class ArcClient {
private pending = new Map<string, Pending>();
private topicHandlers = new Map<string, Set<TopicHandler>>();
private terminalHandler: TerminalHandler | null = null;
private agentTerminalHandlers = new Map<string, TerminalHandler>();
private reconnectAttempt = 0;
private closed = false;
private idCounter = 0;
Expand Down Expand Up @@ -76,6 +83,7 @@ export class ArcClient {
this.closed = true;
this.ws?.close();
this.ws = null;
this.agentTerminalHandlers.clear();
}

/** Typed request/response RPC. */
Expand Down Expand Up @@ -150,6 +158,41 @@ export class ArcClient {
this.call(Methods.agent_send, AgentSendParams.parse(params)).then((r) =>
AgentOkResult.parse(r),
),
/**
* Subscribe to `agent:<id>` events and route terminal bytes for that id
* to `onTerminal`. Returns the initial replay + a dispose function.
*/
attach: async (
agentId: string,
handlers: {
onEvent?: (event: z.infer<typeof AgentEventPayload>) => void;
onTerminal?: TerminalHandler;
} = {},
attachParams: Omit<z.infer<typeof AgentAttachParams>, "agentId"> = {},
): Promise<{ initial: z.infer<typeof AgentAttachResult>; dispose: () => Promise<void> }> => {
const initial = AgentAttachResult.parse(
await this.call(
Methods.agent_attach,
AgentAttachParams.parse({ agentId, ...attachParams }),
),
);
const topic = `agent:${agentId}`;
const unsubEvents = await this.subscribe(topic, (payload) => {
if (handlers.onEvent) {
handlers.onEvent(payload as z.infer<typeof AgentEventPayload>);
}
});
if (handlers.onTerminal) {
this.agentTerminalHandlers.set(agentId, handlers.onTerminal);
}
return {
initial,
dispose: async () => {
await unsubEvents();
this.agentTerminalHandlers.delete(agentId);
},
};
},
};

// --- Internals -----------------------------------------------------------
Expand Down Expand Up @@ -201,11 +244,19 @@ export class ArcClient {
this.handleEnvelope(envelope);
return;
}
if (frame.channel === Channel.Terminal && this.terminalHandler) {
// Terminal frames carry agent id as the first UTF-8 line, then raw bytes.
// For Phase 1 there's no producer yet; payload is opaque — handed to the
// consumer verbatim with a placeholder agent id.
this.terminalHandler("", frame.payload);
if (frame.channel === Channel.Terminal) {
// Terminal frames are prefixed with a 36-byte ASCII UUID identifying
// the source agent, followed by raw bytes.
const payload = frame.payload;
const agentId =
payload.length >= TERMINAL_ID_LEN
? new TextDecoder().decode(payload.subarray(0, TERMINAL_ID_LEN))
: "";
const bytes =
payload.length >= TERMINAL_ID_LEN ? payload.subarray(TERMINAL_ID_LEN) : payload;
const perAgent = this.agentTerminalHandlers.get(agentId);
if (perAgent) perAgent(agentId, bytes);
if (this.terminalHandler) this.terminalHandler(agentId, bytes);
}
}

Expand Down
23 changes: 23 additions & 0 deletions packages/client/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export const Methods = {
agent_run: "agent.run",
agent_stop: "agent.stop",
agent_send: "agent.send",
agent_attach: "agent.attach",
} as const;
export type MethodName = (typeof Methods)[keyof typeof Methods];

Expand Down Expand Up @@ -115,6 +116,28 @@ export const AgentStopParams = z.object({ agentId: z.string() });
export const AgentSendParams = z.object({ agentId: z.string(), text: z.string() });
export const AgentOkResult = z.object({ ok: z.literal(true) });

export const AgentAttachParams = z.object({
agentId: z.string(),
sinceEpoch: z.number().optional(),
sinceSeq: z.number().optional(),
limit: z.number().int().min(1).max(2000).optional(),
});

export const AgentEventPayload = z.object({
agentId: z.string(),
epoch: z.number(),
seq: z.number(),
ts: z.number(),
kind: z.string(),
payload: z.record(z.string(), z.unknown()).optional(),
});

export const AgentAttachResult = z.object({
agentId: z.string(),
status: z.string(),
events: z.array(AgentEventPayload),
});

// --- Topic helpers ----------------------------------------------------------

export const Topics = {
Expand Down
Loading
Loading