From fc468630ea8c574b99e91a0011c8b0b7a75eb8b1 Mon Sep 17 00:00:00 2001 From: Casey Brooks Date: Sat, 28 Feb 2026 00:43:04 +0000 Subject: [PATCH 1/6] feat(notifications): migrate platform notifications to grpc --- .github/workflows/ci.yml | 22 +- buf.gen.yaml | 8 + docker-compose.yml | 15 ++ docs/product-spec.md | 6 +- docs/technical-overview.md | 6 +- docs/ui/graph/index.md | 5 +- package.json | 2 +- packages/notifications/Dockerfile | 22 ++ packages/notifications/eslint.config.js | 41 ++++ packages/notifications/package.json | 31 +++ packages/notifications/src/broadcaster.ts | 32 +++ packages/notifications/src/config.ts | 56 +++++ packages/notifications/src/grpc.ts | 203 ++++++++++++++++++ packages/notifications/src/logger.ts | 7 + packages/notifications/src/main.ts | 51 +++++ packages/notifications/src/socket.ts | 143 ++++++++++++ packages/notifications/src/types.ts | 13 ++ packages/notifications/src/validation.ts | 26 +++ packages/notifications/tsconfig.eslint.json | 5 + packages/notifications/tsconfig.json | 9 + packages/platform-server/.env.example | 5 + .../__tests__/graph.module.di.smoke.test.ts | 35 +++ .../graph.socket.gateway.bus.test.ts | 9 +- .../__tests__/helpers/config.ts | 2 + .../__tests__/run-events.publish.test.ts | 3 +- .../__tests__/socket.events.test.ts | 90 ++++---- .../__tests__/socket.gateway.test.ts | 9 +- .../__tests__/socket.metrics.coalesce.test.ts | 25 +-- .../socket.node_status.integration.test.ts | 14 +- .../socket.realtime.integration.test.ts | 27 ++- .../sql.threads.metrics.queries.test.ts | 9 +- packages/platform-server/package.json | 2 + .../src/core/services/config.service.ts | 19 ++ .../src/gateway/gateway.module.ts | 3 +- .../src/gateway/graph.socket.gateway.ts | 22 +- .../notifications-client.module.ts | 37 ++++ .../notifications-grpc.publisher.ts | 96 +++++++++ .../ui-notifications.publisher.ts | 12 ++ packages/platform-ui/.env.example | 3 + packages/platform-ui/README.md | 6 +- packages/platform-ui/src/config.ts | 13 +- pnpm-lock.yaml | 71 ++++++ 42 files changed, 1103 insertions(+), 112 deletions(-) create mode 100644 packages/notifications/Dockerfile create mode 100644 packages/notifications/eslint.config.js create mode 100644 packages/notifications/package.json create mode 100644 packages/notifications/src/broadcaster.ts create mode 100644 packages/notifications/src/config.ts create mode 100644 packages/notifications/src/grpc.ts create mode 100644 packages/notifications/src/logger.ts create mode 100644 packages/notifications/src/main.ts create mode 100644 packages/notifications/src/socket.ts create mode 100644 packages/notifications/src/types.ts create mode 100644 packages/notifications/src/validation.ts create mode 100644 packages/notifications/tsconfig.eslint.json create mode 100644 packages/notifications/tsconfig.json create mode 100644 packages/platform-server/src/notifications/notifications-client.module.ts create mode 100644 packages/platform-server/src/notifications/notifications-grpc.publisher.ts create mode 100644 packages/platform-server/src/notifications/ui-notifications.publisher.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a79628e0e..023707390 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,7 +26,7 @@ jobs: uses: bufbuild/buf-setup-action@v1 - name: Generate runner protobufs - run: pnpm proto:generate + run: pnpm proto:gen - name: Get pnpm store directory id: pnpm-store @@ -56,7 +56,7 @@ jobs: run: pnpm install --frozen-lockfile - name: Generate protobufs - run: pnpm proto:generate + run: pnpm proto:gen - name: Type-check platform server run: pnpm --filter @agyn/platform-server run typecheck @@ -96,7 +96,7 @@ jobs: uses: bufbuild/buf-setup-action@v1 - name: Generate runner protobufs - run: pnpm proto:generate + run: pnpm proto:gen - name: Get pnpm store directory id: pnpm-store @@ -126,7 +126,7 @@ jobs: run: pnpm install --frozen-lockfile - name: Generate protobufs - run: pnpm proto:generate + run: pnpm proto:gen - name: Apply platform-server migrations env: @@ -159,7 +159,7 @@ jobs: uses: bufbuild/buf-setup-action@v1 - name: Generate runner protobufs - run: pnpm proto:generate + run: pnpm proto:gen - name: Get pnpm store directory id: pnpm-store @@ -189,7 +189,7 @@ jobs: run: pnpm install --frozen-lockfile - name: Generate protobufs - run: pnpm proto:generate + run: pnpm proto:gen - name: Start LiteLLM docker compose stack run: docker compose -f packages/platform-server/test/litellm/docker-compose.yml up -d @@ -228,7 +228,7 @@ jobs: uses: bufbuild/buf-setup-action@v1 - name: Generate runner protobufs - run: pnpm proto:generate + run: pnpm proto:gen - name: Get pnpm store directory id: pnpm-store @@ -258,7 +258,7 @@ jobs: run: pnpm install --frozen-lockfile - name: Generate protobufs - run: pnpm proto:generate + run: pnpm proto:gen - name: Run UI tests (@agyn/platform-ui) run: pnpm --filter @agyn/platform-ui test @@ -313,7 +313,7 @@ jobs: run: pnpm install --frozen-lockfile - name: Generate protobufs - run: pnpm proto:generate + run: pnpm proto:gen - name: Build Storybook (@agyn/platform-ui) run: pnpm --filter @agyn/platform-ui run build-storybook @@ -363,7 +363,7 @@ jobs: run: pnpm install --frozen-lockfile - name: Generate protobufs - run: pnpm proto:generate + run: pnpm proto:gen - name: Approve necessary build scripts run: pnpm approve-builds @prisma/client esbuild msw @@ -417,7 +417,7 @@ jobs: run: pnpm install --frozen-lockfile - name: Generate protobufs - run: pnpm proto:generate + run: pnpm proto:gen # Build all workspaces in topological order so library outputs exist - name: Build UI workspace (topo order) diff --git a/buf.gen.yaml b/buf.gen.yaml index 661c871eb..740569a82 100644 --- a/buf.gen.yaml +++ b/buf.gen.yaml @@ -8,6 +8,14 @@ plugins: out: packages/platform-server/src/proto/gen opt: - target=ts + - plugin: buf.build/bufbuild/es + out: packages/notifications/src/proto/gen + opt: + - target=ts + - plugin: buf.build/bufbuild/connect-es + out: packages/notifications/src/proto/gen + opt: + - target=ts - plugin: buf.build/bufbuild/es out: packages/docker-runner/src/proto/gen opt: diff --git a/docker-compose.yml b/docker-compose.yml index 0e0cb5717..afcb3623a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -246,6 +246,21 @@ services: start_period: 10s networks: - agents_net + + notifications: + build: + context: . + dockerfile: packages/notifications/Dockerfile + restart: unless-stopped + environment: + NOTIFICATIONS_HOST: 0.0.0.0 + NOTIFICATIONS_GRPC_PORT: 50051 + NOTIFICATIONS_SOCKET_PORT: 4000 + ports: + - "50051:50051" + - "4000:4000" + networks: + - agents_net volumes: vault-file: diff --git a/docs/product-spec.md b/docs/product-spec.md index 127ea8ee4..878de3216 100644 --- a/docs/product-spec.md +++ b/docs/product-spec.md @@ -20,7 +20,7 @@ Table of contents - Glossary and changelog templates (pointers) Overview and personas -- Graph-driven AI agent platform composing agents, tools, triggers, memory, and MCP servers into a live-updatable LangGraph runtime. The server exposes HTTP APIs and Socket.IO to manage the graph, provision nodes, execute tools, and observe runs. UI offers a builder to configure graphs and view checkpoints/status. +- Graph-driven AI agent platform composing agents, tools, triggers, memory, and MCP servers into a live-updatable LangGraph runtime. The platform-server exposes HTTP APIs and gRPC notifications, while a dedicated notifications service bridges those notifications to Socket.IO for the UI. UI offers a builder to configure graphs and view checkpoints/status. - Personas - Agent Builder (developer) - Platform Operator (SRE) @@ -32,7 +32,7 @@ Architecture and components - Unknown-key handling and retries: apply strips unknown config keys on schema validation errors and retries up to 3 times. - Checkpointing via Postgres (default); streaming UI integration planned. - Server - - HTTP APIs and Socket.IO for management and status streaming. + - HTTP APIs and a gRPC notifications publisher (Socket.IO bridge implemented by a separate notifications service). - Endpoints manage graph templates, graph state, node lifecycle/actions, dynamic-config schema, reminders, runs, vault proxy, and Nix proxy (when enabled). - Persistence - Graph store: filesystem dataset (format: 2) with deterministic edge IDs, dataset-level file locks, and staged working-tree swaps. Each upsert builds a full graph tree in a sibling directory, fsyncs it, and atomically swaps it into place (conflict/timeout/persist error modes preserved). @@ -109,6 +109,7 @@ Configuration matrix (server env vars) - LLM_PROVIDER (litellm | openai) - If `LLM_PROVIDER=litellm`: LITELLM_BASE_URL and LITELLM_MASTER_KEY - If `LLM_PROVIDER=openai`: OPENAI_API_KEY (OPENAI_BASE_URL optional) + - NOTIFICATIONS_GRPC_ADDR (gRPC endpoint for the notifications bridge) - Optional - GRAPH_REPO_PATH (default ./data/graph) - GRAPH_BRANCH (default main) @@ -117,6 +118,7 @@ Configuration matrix (server env vars) - VAULT_ADDR, VAULT_TOKEN - DOCKER_MIRROR_URL (default http://registry-mirror:5000) - DOCKER_RUNNER_GRPC_HOST, DOCKER_RUNNER_GRPC_PORT (or DOCKER_RUNNER_PORT), DOCKER_RUNNER_SHARED_SECRET (required for docker-runner), plus optional DOCKER_RUNNER_TIMEOUT_MS (default 30000), DOCKER_RUNNER_OPTIONAL (default true; set false to fail-fast), and DOCKER_RUNNER_CONNECT_* knobs (RETRY_BASE_DELAY_MS=500, RETRY_MAX_DELAY_MS=30000, RETRY_JITTER_MS=250, PROBE_INTERVAL_MS=30000, MAX_RETRIES=0 for unlimited background retries). + - NOTIFICATIONS_GRPC_DEADLINE_MS (default 3000) - MCP_TOOLS_STALE_TIMEOUT_MS - LANGGRAPH_CHECKPOINTER: postgres (default) - POSTGRES_URL (postgres connection string) diff --git a/docs/technical-overview.md b/docs/technical-overview.md index 1a8935389..1380a84dc 100644 --- a/docs/technical-overview.md +++ b/docs/technical-overview.md @@ -36,14 +36,15 @@ Design principles - Container isolation per thread: Tools and MCP operations run in per-thread containers to isolate state. Layers -- Application server: wires services, loads persisted graph, exposes minimal REST (templates/graph) and a Socket.IO stream for checkpoints. +- Application server: wires services, loads persisted graph, and publishes realtime graph events over gRPC. +- Notifications service: receives gRPC notifications from the platform-server and re-broadcasts them via Socket.IO to UI clients. - Graph runtime: live diff/apply engine enforcing reversible edges via ports and template registries. - Templates: declarative registration of node factories and their ports. - Triggers: external event sources (Slack, PR polling) that push messages into agents. - Nodes: graph components like LLM invocation and memory. - Tools: actions callable by the LLM (bash, GitHub clone, Slack message) and adapters. - MCP: local server inside a workspace container with transport over docker exec. -- Services: infra clients and helpers (config, docker container provision, Prisma/Postgres, Slack, GitHub, checkpointer, sockets). +- Services: infra clients and helpers (config, docker container provision, Prisma/Postgres, Slack, GitHub, checkpointer, notifications publisher). Workspace container platform - containerProvider.staticConfig.platform: Optional; enum of `linux/amd64` or `linux/arm64`. @@ -84,4 +85,5 @@ Defaults and toggles How to Develop & Test - Prereqs: Node.js 20+, pnpm 9+, Docker, Postgres - Run server: pnpm --filter @agyn/platform-server dev +- Run notifications bridge: pnpm --filter @agyn/notifications dev - Tests: pnpm --filter @agyn/platform-server test diff --git a/docs/ui/graph/index.md b/docs/ui/graph/index.md index 9a74d64e6..73368f181 100644 --- a/docs/ui/graph/index.md +++ b/docs/ui/graph/index.md @@ -3,13 +3,14 @@ Data flow - TemplatesProvider loads templates from `/graph/templates` (alias of `/api/templates`). Components consume capabilities to render controls. - Initial node status fetched via `GET /graph/nodes/:id/status`. -- Realtime updates: listen to Socket.IO on the default namespace for `node_status` events. Do not poll when sockets are available. +- Realtime updates: consume Socket.IO events from the notifications service (default namespace). Do not poll when sockets are available. - For dynamic-configurable nodes (e.g., MCP server), fetch JSON Schema via `GET /graph/nodes/:id/dynamic-config/schema` and render a dynamic form when `dynamicConfigReady` is true. - Refer to docs/graph/status-updates.md for event shapes and sequencing. Configuration - Required environment variables: - - VITE_API_BASE_URL: Agents API base URL (use the origin only; the UI appends `/api` for REST calls and `/socket.io` for websockets) + - VITE_API_BASE_URL: Agents API base URL (use the origin only; the UI appends `/api` for REST calls) + - VITE_SOCKET_BASE_URL (optional): override the notifications Socket.IO origin; defaults to the API origin - Tracing configuration has been removed; span data is no longer rendered in the builder sidebar. Related docs diff --git a/package.json b/package.json index f62a26d19..21f2fc543 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ "test": "pnpm -r --workspace-concurrency=1 run --if-present test", "convert-graphs": "pnpm --filter @agyn/graph-converter exec graph-converter", "postinstall": "pnpm -r --if-present run prepare", - "proto:generate": "buf generate buf.build/agynio/api", + "proto:gen": "buf generate buf.build/agynio/api", "deps:up:podman": "podman compose up -d" }, "keywords": [], diff --git a/packages/notifications/Dockerfile b/packages/notifications/Dockerfile new file mode 100644 index 000000000..22c574fe2 --- /dev/null +++ b/packages/notifications/Dockerfile @@ -0,0 +1,22 @@ +# syntax=docker/dockerfile:1.7 + +FROM node:20-alpine AS base +WORKDIR /app +RUN corepack enable + +FROM base AS deps +COPY package.json pnpm-lock.yaml pnpm-workspace.yaml ./ +COPY tsconfig.base.json tsconfig.json ./ +COPY packages/notifications/package.json packages/notifications/ +RUN pnpm install --filter @agyn/notifications --prod --frozen-lockfile + +FROM deps AS build +COPY packages/notifications packages/notifications +RUN pnpm --filter @agyn/notifications build + +FROM base AS runtime +ENV NODE_ENV=production +COPY --from=deps /app/node_modules node_modules +COPY --from=build /app/packages/notifications/dist dist +COPY packages/notifications/package.json package.json +CMD ["node", "dist/main.js"] diff --git a/packages/notifications/eslint.config.js b/packages/notifications/eslint.config.js new file mode 100644 index 000000000..1f0ea609a --- /dev/null +++ b/packages/notifications/eslint.config.js @@ -0,0 +1,41 @@ +import tseslint from 'typescript-eslint'; +import { fileURLToPath } from 'node:url'; +import path from 'node:path'; + +const __dirname = path.dirname(fileURLToPath(import.meta.url)); + +export default [ + { + ignores: ['dist/**', '**/dist/**', 'node_modules/**', 'src/proto/gen/**'], + }, + { + files: ['src/**/*.ts', 'src/**/*.tsx'], + languageOptions: { + parser: tseslint.parser, + parserOptions: { + project: ['./tsconfig.eslint.json'], + tsconfigRootDir: __dirname, + }, + }, + plugins: { '@typescript-eslint': tseslint.plugin }, + rules: { + '@typescript-eslint/no-explicit-any': 'error', + 'no-empty': ['error', { allowEmptyCatch: false }], + 'max-depth': ['error', 3], + 'no-useless-catch': 'error', + '@typescript-eslint/no-unused-vars': [ + 'error', + { argsIgnorePattern: '^_', varsIgnorePattern: '^_', caughtErrorsIgnorePattern: '^_' }, + ], + '@typescript-eslint/no-unsafe-assignment': 'error', + '@typescript-eslint/no-unsafe-member-access': 'error', + '@typescript-eslint/no-require-imports': 'error', + '@typescript-eslint/ban-ts-comment': 'error', + '@typescript-eslint/no-unsafe-function-type': 'error', + '@typescript-eslint/no-empty-object-type': 'error', + 'no-useless-escape': 'error', + 'prefer-const': 'error', + 'no-redeclare': 'error', + }, + }, +]; diff --git a/packages/notifications/package.json b/packages/notifications/package.json new file mode 100644 index 000000000..50381cbf7 --- /dev/null +++ b/packages/notifications/package.json @@ -0,0 +1,31 @@ +{ + "name": "@agyn/notifications", + "version": "1.0.0", + "private": true, + "type": "module", + "main": "src/main.ts", + "scripts": { + "dev": "tsx src/main.ts", + "build": "tsc -p tsconfig.json", + "start": "node dist/main.js", + "lint": "eslint .", + "test": "vitest run" + }, + "dependencies": { + "@bufbuild/protobuf": "^2.11.0", + "@connectrpc/connect": "^1.7.0", + "@connectrpc/connect-node": "^1.3.1", + "pino": "^10.1.0", + "socket.io": "^4.8.1", + "uuid": "^13.0.0", + "zod": "^4.1.9" + }, + "devDependencies": { + "@types/node": "^24.5.1", + "eslint": "^9.13.0", + "tsx": "^4.20.5", + "typescript": "^5.8.3", + "typescript-eslint": "^8.8.1", + "vitest": "^3.2.4" + } +} diff --git a/packages/notifications/src/broadcaster.ts b/packages/notifications/src/broadcaster.ts new file mode 100644 index 000000000..aa43f5da8 --- /dev/null +++ b/packages/notifications/src/broadcaster.ts @@ -0,0 +1,32 @@ +import type { Logger } from 'pino'; +import type { PublishedNotification } from './types'; + +type Listener = (notification: PublishedNotification) => void; + +export class NotificationBroadcaster { + private readonly listeners = new Set(); + + constructor(private readonly logger: Logger) {} + + publish(notification: PublishedNotification): void { + for (const listener of this.listeners) { + try { + listener(notification); + } catch (error) { + this.logger.warn( + { + error: error instanceof Error ? { name: error.name, message: error.message } : { message: String(error) }, + }, + 'notifications: listener failure', + ); + } + } + } + + subscribe(listener: Listener): () => void { + this.listeners.add(listener); + return () => { + this.listeners.delete(listener); + }; + } +} diff --git a/packages/notifications/src/config.ts b/packages/notifications/src/config.ts new file mode 100644 index 000000000..cbc334680 --- /dev/null +++ b/packages/notifications/src/config.ts @@ -0,0 +1,56 @@ +import { z } from 'zod'; + +const envSchema = z.object({ + NOTIFICATIONS_GRPC_PORT: z.coerce + .number() + .int() + .positive() + .default(50_051), + NOTIFICATIONS_SOCKET_PORT: z.coerce + .number() + .int() + .positive() + .default(4_000), + NOTIFICATIONS_HOST: z + .string() + .default('0.0.0.0') + .transform((value) => value.trim().length > 0 ? value.trim() : '0.0.0.0'), + NOTIFICATIONS_SOCKET_PATH: z + .string() + .default('/socket.io') + .transform((value) => (value.startsWith('/') ? value : `/${value}`)), + NOTIFICATIONS_SOCKET_CORS_ORIGINS: z.string().optional(), + LOG_LEVEL: z + .string() + .default('info') + .transform((value) => value.trim().length > 0 ? value.trim() : 'info'), +}); + +export type Config = { + grpcPort: number; + socketPort: number; + host: string; + socketPath: string; + socketCorsOrigins: string[]; + logLevel: string; +}; + +const parseCorsOrigins = (value: string | undefined): string[] => { + if (!value) return []; + return value + .split(',') + .map((part) => part.trim()) + .filter((part) => part.length > 0); +}; + +export const loadConfig = (): Config => { + const env = envSchema.parse(process.env); + return { + grpcPort: env.NOTIFICATIONS_GRPC_PORT, + socketPort: env.NOTIFICATIONS_SOCKET_PORT, + host: env.NOTIFICATIONS_HOST, + socketPath: env.NOTIFICATIONS_SOCKET_PATH, + socketCorsOrigins: parseCorsOrigins(env.NOTIFICATIONS_SOCKET_CORS_ORIGINS), + logLevel: env.LOG_LEVEL, + }; +}; diff --git a/packages/notifications/src/grpc.ts b/packages/notifications/src/grpc.ts new file mode 100644 index 000000000..6d1d35ccb --- /dev/null +++ b/packages/notifications/src/grpc.ts @@ -0,0 +1,203 @@ +import { randomUUID } from 'node:crypto'; +import type { Logger } from 'pino'; +import { create, type JsonObject } from '@bufbuild/protobuf'; +import { timestampFromDate } from '@bufbuild/protobuf/wkt'; +import { ConnectError, Code, type HandlerContext } from '@connectrpc/connect'; +import { connectNodeAdapter } from '@connectrpc/connect-node'; +import { createServer, type Http2Server } from 'node:http2'; +import { NotificationBroadcaster } from './broadcaster'; +import type { SocketBridge } from './socket'; +import { PublishInputSchema } from './validation'; +import type { JsonValue, PublishedNotification } from './types'; +import { NotificationsService } from './proto/gen/agynio/api/notifications/v1/notifications_connect.js'; +import { + NotificationEnvelopeSchema, + PublishResponseSchema, + SubscribeResponseSchema, + type PublishRequest, + type PublishResponse, + type SubscribeRequest, + type SubscribeResponse, + type NotificationEnvelope, +} from './proto/gen/agynio/api/notifications/v1/notifications_pb.js'; + +type GrpcServerOptions = { + host: string; + port: number; + broadcaster: NotificationBroadcaster; + socket: SocketBridge; + logger: Logger; +}; + +const isJsonObject = (value: unknown): value is Record => + value !== null && typeof value === 'object' && !Array.isArray(value); + +const isJsonValue = (value: unknown): value is JsonValue => { + if (value === null) return true; + const valueType = typeof value; + if (valueType === 'string' || valueType === 'number' || valueType === 'boolean') return true; + if (Array.isArray(value)) return value.every(isJsonValue); + if (isJsonObject(value)) { + for (const entry of Object.values(value)) { + if (!isJsonValue(entry)) return false; + } + return true; + } + return false; +}; + +const toJsonObject = (value: Record): JsonObject => { + const result: JsonObject = {}; + for (const [key, entry] of Object.entries(value)) { + if (!isJsonValue(entry)) { + throw new ConnectError('publish payload includes non-JSON value', Code.InvalidArgument); + } + result[key] = entry; + } + return result; +}; + +const toEnvelope = (notification: PublishedNotification): NotificationEnvelope => + create(NotificationEnvelopeSchema, { + id: notification.id, + ts: timestampFromDate(notification.createdAt), + source: notification.source, + event: notification.event, + rooms: notification.rooms, + payload: notification.payload, + }); + +export class GrpcServer { + private readonly logger: Logger; + private readonly server: Http2Server; + + constructor(private readonly options: GrpcServerOptions) { + this.logger = options.logger.child({ scope: 'grpc' }); + const publishImpl = async (request: PublishRequest): Promise => { + const parsed = PublishInputSchema.safeParse({ + event: request.event, + rooms: request.rooms, + source: request.source, + payload: request.payload, + }); + if (!parsed.success) { + this.logger.warn({ issues: parsed.error.issues }, 'publish request rejected'); + throw new ConnectError('invalid publish request', Code.InvalidArgument); + } + + const payload = parsed.data; + const notificationPayload = payload.payload ? toJsonObject(payload.payload) : undefined; + const notification: PublishedNotification = { + id: randomUUID(), + event: payload.event, + rooms: payload.rooms, + source: payload.source, + payload: notificationPayload, + createdAt: new Date(), + }; + + this.logger.debug({ event: notification.event, rooms: notification.rooms }, 'publish request accepted'); + this.options.socket.broadcast(notification); + this.options.broadcaster.publish(notification); + + return create(PublishResponseSchema, { + id: notification.id, + ts: timestampFromDate(notification.createdAt), + }); + }; + + const subscribeImpl = ( + _request: SubscribeRequest, + context: HandlerContext, + ): AsyncIterable => { + const queue: PublishedNotification[] = []; + let resolveQueue: ((value: PublishedNotification | null) => void) | null = null; + let finished = false; + + const push = (notification: PublishedNotification) => { + if (finished) return; + if (resolveQueue) { + const resolve = resolveQueue; + resolveQueue = null; + resolve(notification); + } else { + queue.push(notification); + } + }; + + const stop = () => { + if (finished) return; + finished = true; + if (resolveQueue) { + const resolve = resolveQueue; + resolveQueue = null; + resolve(null); + } + }; + + const unsubscribe = this.options.broadcaster.subscribe(push); + const abortHandler = (): void => { + stop(); + }; + context.signal.addEventListener('abort', abortHandler, { once: true }); + + const iterator = (async function* (this: GrpcServer) { + try { + while (true) { + if (queue.length > 0) { + const notification = queue.shift()!; + yield create(SubscribeResponseSchema, { envelope: toEnvelope(notification) }); + continue; + } + if (finished) break; + const notification = await new Promise((resolve) => { + resolveQueue = resolve; + }); + if (!notification) break; + yield create(SubscribeResponseSchema, { envelope: toEnvelope(notification) }); + } + } finally { + context.signal.removeEventListener('abort', abortHandler); + unsubscribe(); + stop(); + } + }.call(this)) as AsyncIterable; + + return iterator; + }; + + const handler = connectNodeAdapter({ + routes: (router) => { + router.service(NotificationsService as unknown as never, { + publish: publishImpl as unknown as never, + subscribe: subscribeImpl as unknown as never, + }); + }, + }); + + const server = createServer(); + server.on('stream', handler); + server.on('request', handler); + this.server = server; + } + + async start(): Promise { + const { host, port } = this.options; + await new Promise((resolve) => { + this.server.listen(port, host, () => resolve()); + }); + this.logger.info({ host, port }, 'grpc server listening'); + } + + async close(): Promise { + await new Promise((resolve, reject) => { + this.server.close((error: Error | undefined) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); + } +} diff --git a/packages/notifications/src/logger.ts b/packages/notifications/src/logger.ts new file mode 100644 index 000000000..549ce3423 --- /dev/null +++ b/packages/notifications/src/logger.ts @@ -0,0 +1,7 @@ +import pino from 'pino'; + +export const createLogger = (level: string) => + pino({ + name: 'notifications-service', + level, + }); diff --git a/packages/notifications/src/main.ts b/packages/notifications/src/main.ts new file mode 100644 index 000000000..cc7b8edef --- /dev/null +++ b/packages/notifications/src/main.ts @@ -0,0 +1,51 @@ +import { loadConfig } from './config'; +import { createLogger } from './logger'; +import { NotificationBroadcaster } from './broadcaster'; +import { SocketBridge } from './socket'; +import { GrpcServer } from './grpc'; + +const config = loadConfig(); +const logger = createLogger(config.logLevel); +const broadcaster = new NotificationBroadcaster(logger); +const socket = new SocketBridge({ + host: config.host, + port: config.socketPort, + path: config.socketPath, + corsOrigins: config.socketCorsOrigins, + logger, +}); +const grpc = new GrpcServer({ + host: config.host, + port: config.grpcPort, + broadcaster, + socket, + logger, +}); + +const start = async () => { + try { + await socket.start(); + await grpc.start(); + logger.info('notifications service started'); + } catch (error) { + logger.error({ error }, 'failed to start notifications service'); + process.exitCode = 1; + } +}; + +const shutdown = async (signal: string) => { + logger.info({ signal }, 'shutting down notifications service'); + try { + await Promise.all([grpc.close(), socket.close()]); + logger.info('notifications service stopped'); + process.exit(0); + } catch (error) { + logger.error({ error }, 'shutdown failed'); + process.exit(1); + } +}; + +process.once('SIGINT', () => shutdown('SIGINT')); +process.once('SIGTERM', () => shutdown('SIGTERM')); + +void start(); diff --git a/packages/notifications/src/socket.ts b/packages/notifications/src/socket.ts new file mode 100644 index 000000000..9c59270fe --- /dev/null +++ b/packages/notifications/src/socket.ts @@ -0,0 +1,143 @@ +import { createServer, type Server as HttpServer } from 'node:http'; +import type { IncomingHttpHeaders } from 'node:http'; +import { Server as SocketIOServer, type ServerOptions, type Socket } from 'socket.io'; +import type { Logger } from 'pino'; +import { SubscribePayloadSchema } from './validation'; +import type { PublishedNotification } from './types'; + +type SocketBridgeOptions = { + host: string; + port: number; + path: string; + corsOrigins: string[]; + logger: Logger; +}; + +export class SocketBridge { + private readonly httpServer: HttpServer; + private readonly io: SocketIOServer; + private readonly logger: Logger; + + constructor(private readonly options: SocketBridgeOptions) { + this.logger = options.logger.child({ scope: 'socket' }); + this.httpServer = createServer(); + const serverOptions: Partial = { + path: options.path, + transports: ['websocket'], + cors: { + origin: options.corsOrigins.length > 0 ? options.corsOrigins : '*', + methods: ['GET', 'POST', 'OPTIONS'], + credentials: false, + }, + allowRequest: (_req, callback) => { + callback(null, true); + }, + } satisfies Partial; + this.io = new SocketIOServer(this.httpServer, serverOptions); + this.io.on('connection', (socket) => this.handleConnection(socket)); + } + + async start(): Promise { + const { host, port } = this.options; + await new Promise((resolve) => { + this.httpServer.listen(port, host, () => resolve()); + }); + this.logger.info({ host, port, path: this.options.path }, 'socket server listening'); + } + + async close(): Promise { + await new Promise((resolve) => { + this.io.close(() => resolve()); + }); + await new Promise((resolve) => { + this.httpServer.close(() => resolve()); + }); + } + + broadcast(notification: PublishedNotification): void { + for (const room of notification.rooms) { + try { + this.io.to(room).emit(notification.event, notification.payload ?? {}); + } catch (error) { + this.logger.warn( + { + room, + event: notification.event, + error: error instanceof Error ? { name: error.name, message: error.message } : { message: String(error) }, + }, + 'socket emit failure', + ); + } + } + } + + private handleConnection(socket: Socket): void { + this.logger.info( + { + id: socket.id, + headers: this.sanitizeHeaders(socket.request.headers), + query: this.sanitizeQuery(socket.handshake.query as Record | undefined), + }, + 'client connected', + ); + + socket.on('subscribe', (payload: unknown, ack?: (response: unknown) => void) => { + const parsed = SubscribePayloadSchema.safeParse(payload); + if (!parsed.success) { + const issues = parsed.error.issues.map((issue) => ({ + path: issue.path, + message: issue.message, + code: issue.code, + })); + this.logger.warn({ socketId: socket.id, issues }, 'subscribe rejected'); + if (typeof ack === 'function') { + ack({ ok: false, error: 'invalid_payload', issues }); + } + return; + } + + const rooms = parsed.data.rooms ?? (parsed.data.room ? [parsed.data.room] : []); + for (const room of rooms) { + if (room.length > 0) socket.join(room); + } + if (typeof ack === 'function') { + ack({ ok: true, rooms }); + } + }); + + socket.on('error', (error) => { + this.logger.warn( + { + socketId: socket.id, + error: error instanceof Error ? { name: error.name, message: error.message } : { message: String(error) }, + }, + 'socket error', + ); + }); + + socket.on('disconnect', (reason) => { + this.logger.debug({ socketId: socket.id, reason }, 'client disconnected'); + }); + } + + private sanitizeHeaders(headers: IncomingHttpHeaders | undefined): Record { + if (!headers) return {}; + const sensitive = new Set(['authorization', 'cookie', 'set-cookie']); + const sanitized: Record = {}; + for (const [key, value] of Object.entries(headers)) { + if (!key) continue; + sanitized[key] = sensitive.has(key.toLowerCase()) ? '[REDACTED]' : value; + } + return sanitized; + } + + private sanitizeQuery(query: Record | undefined): Record { + if (!query) return {}; + const sensitive = new Set(['token', 'authorization', 'auth', 'api_key', 'access_token']); + const sanitized: Record = {}; + for (const [key, value] of Object.entries(query)) { + sanitized[key] = key && sensitive.has(key.toLowerCase()) ? '[REDACTED]' : value; + } + return sanitized; + } +} diff --git a/packages/notifications/src/types.ts b/packages/notifications/src/types.ts new file mode 100644 index 000000000..0a7c22711 --- /dev/null +++ b/packages/notifications/src/types.ts @@ -0,0 +1,13 @@ +export type JsonPrimitive = string | number | boolean | null; +export type JsonArray = JsonValue[]; +export type JsonObject = { [key: string]: JsonValue }; +export type JsonValue = JsonPrimitive | JsonArray | JsonObject; + +export type PublishedNotification = { + id: string; + event: string; + rooms: string[]; + source: string; + payload?: JsonObject; + createdAt: Date; +}; diff --git a/packages/notifications/src/validation.ts b/packages/notifications/src/validation.ts new file mode 100644 index 000000000..86c8f5748 --- /dev/null +++ b/packages/notifications/src/validation.ts @@ -0,0 +1,26 @@ +import { z } from 'zod'; + +export const RoomSchema = z.union([ + z.literal('threads'), + z.literal('graph'), + z.string().regex(/^thread:[0-9a-z-]{1,64}$/i), + z.string().regex(/^run:[0-9a-z-]{1,64}$/i), + z.string().regex(/^node:[0-9a-z-]{1,64}$/i), +]); + +export const SubscribePayloadSchema = z + .object({ + rooms: z.array(RoomSchema).min(1).optional(), + room: RoomSchema.optional(), + }) + .strict(); + +export const PublishInputSchema = z.object({ + rooms: z.array(RoomSchema).min(1), + event: z.string().min(1), + source: z.string().min(1), + payload: z.record(z.string(), z.unknown()).optional(), +}); + +export type PublishInput = z.infer; +export type SubscribePayload = z.infer; diff --git a/packages/notifications/tsconfig.eslint.json b/packages/notifications/tsconfig.eslint.json new file mode 100644 index 000000000..62a379d03 --- /dev/null +++ b/packages/notifications/tsconfig.eslint.json @@ -0,0 +1,5 @@ +{ + "extends": "./tsconfig.json", + "include": ["src"], + "compilerOptions": {} +} diff --git a/packages/notifications/tsconfig.json b/packages/notifications/tsconfig.json new file mode 100644 index 000000000..4cdf26d84 --- /dev/null +++ b/packages/notifications/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src", + "tsBuildInfoFile": "dist/.tsbuildinfo" + }, + "include": ["src/**/*.ts"] +} diff --git a/packages/platform-server/.env.example b/packages/platform-server/.env.example index 490eaf020..b7e494dbc 100644 --- a/packages/platform-server/.env.example +++ b/packages/platform-server/.env.example @@ -48,6 +48,11 @@ DOCKER_RUNNER_SHARED_SECRET=dev-shared-secret # DOCKER_RUNNER_TIMEOUT_MS=30000 # DOCKER_RUNNER_CONNECT_MAX_RETRIES=0 +# Notifications service gRPC endpoint +NOTIFICATIONS_GRPC_ADDR=notifications:50051 +# Optional deadline override (ms) +# NOTIFICATIONS_GRPC_DEADLINE_MS=3000 + # Nix cache proxy (ncps) endpoints # NCPS_URL_SERVER=http://localhost:8501 # NCPS_URL_CONTAINER=http://ncps:8501 diff --git a/packages/platform-server/__tests__/graph.module.di.smoke.test.ts b/packages/platform-server/__tests__/graph.module.di.smoke.test.ts index 5b289f0f8..5ed56f0cc 100644 --- a/packages/platform-server/__tests__/graph.module.di.smoke.test.ts +++ b/packages/platform-server/__tests__/graph.module.di.smoke.test.ts @@ -27,6 +27,38 @@ import { GraphSocketGateway } from '../src/gateway/graph.socket.gateway'; import { GatewayModule } from '../src/gateway/gateway.module'; import { LiveGraphRuntime } from '../src/graph-core/liveGraph.manager'; import { runnerConfigDefaults } from './helpers/config'; +import { UI_NOTIFICATIONS_PUBLISHER } from '../src/notifications/ui-notifications.publisher'; + +const connectMocks = vi.hoisted(() => { + const publish = vi.fn().mockResolvedValue(undefined); + return { + publish, + createPromiseClient: vi.fn(() => ({ publish })), + }; +}); + +vi.mock('@connectrpc/connect', () => ({ + createPromiseClient: connectMocks.createPromiseClient, +})); + +const connectNodeMocks = vi.hoisted(() => ({ + createConnectTransport: vi.fn(() => ({})), +})); + +vi.mock('@connectrpc/connect-node', () => ({ + createConnectTransport: connectNodeMocks.createConnectTransport, +})); + +vi.mock('../src/proto/gen/agynio/api/notifications/v1/notifications_connect', () => ({ + NotificationsService: { + typeName: 'agynio.api.notifications.v1.NotificationsService', + methods: { + publish: { + name: 'Publish', + }, + }, + }, +})); process.env.LLM_PROVIDER = 'openai'; process.env.AGENTS_DATABASE_URL = process.env.AGENTS_DATABASE_URL || 'postgres://localhost:5432/test'; @@ -177,6 +209,8 @@ if (!shouldRunDbTests) { litellmBaseUrl: 'http://127.0.0.1:4000', litellmMasterKey: 'test-master-key', ...runnerConfigDefaults, + notificationsGrpcAddr: 'notifications:50051', + notificationsGrpcDeadlineMs: 3000, }), ); ConfigService.register(configServiceStub); @@ -218,6 +252,7 @@ if (!shouldRunDbTests) { builder.overrideProvider(RunEventsService).useFactory(() => runEventsStub as RunEventsService); builder.overrideProvider(AgentsPersistenceService).useFactory(() => agentsPersistenceStub as AgentsPersistenceService); builder.overrideProvider(ThreadsMetricsService).useFactory(() => threadsMetricsStub as ThreadsMetricsService); + builder.overrideProvider(UI_NOTIFICATIONS_PUBLISHER).useValue({ publishToRooms: vi.fn().mockResolvedValue(undefined) }); builder.overrideProvider(VaultService).useFactory(() => vaultStub as VaultService); builder.overrideProvider(SlackAdapter).useFactory(() => slackAdapterStub as SlackAdapter); builder.overrideProvider(LiveGraphRuntime).useFactory(() => liveRuntimeStub); diff --git a/packages/platform-server/__tests__/graph.socket.gateway.bus.test.ts b/packages/platform-server/__tests__/graph.socket.gateway.bus.test.ts index 0fcb7e70c..f0415fb56 100644 --- a/packages/platform-server/__tests__/graph.socket.gateway.bus.test.ts +++ b/packages/platform-server/__tests__/graph.socket.gateway.bus.test.ts @@ -115,8 +115,15 @@ function createGatewayTestContext(): GatewayTestContext { const runtime = { subscribe: vi.fn() } as any; const metrics = { getThreadsMetrics: vi.fn().mockResolvedValue({}) } as any; const prisma = { getClient: vi.fn().mockReturnValue({ $queryRaw: vi.fn().mockResolvedValue([]) }) } as any; + const notificationsPublisher = { publishToRooms: vi.fn().mockResolvedValue(undefined) }; - const gateway = new GraphSocketGateway(runtime, metrics, prisma, eventsBus as EventsBusService); + const gateway = new GraphSocketGateway( + runtime, + metrics, + prisma, + eventsBus as EventsBusService, + notificationsPublisher, + ); const internalLogger = (gateway as unknown as { logger: { warn: (...args: unknown[]) => void; error: (...args: unknown[]) => void; log: (...args: unknown[]) => void; debug: (...args: unknown[]) => void } }).logger; const logger = { warn: vi.spyOn(internalLogger, 'warn').mockImplementation(() => undefined), diff --git a/packages/platform-server/__tests__/helpers/config.ts b/packages/platform-server/__tests__/helpers/config.ts index fb0ad899a..c5426796c 100644 --- a/packages/platform-server/__tests__/helpers/config.ts +++ b/packages/platform-server/__tests__/helpers/config.ts @@ -4,6 +4,8 @@ export const runnerConfigDefaults = { dockerRunnerSharedSecret: 'test-shared-secret', dockerRunnerGrpcHost: '127.0.0.1', dockerRunnerGrpcPort: 7171, + notificationsGrpcAddr: 'notifications:50051', + notificationsGrpcDeadlineMs: 3000, } as const; const defaultConfigInput = { diff --git a/packages/platform-server/__tests__/run-events.publish.test.ts b/packages/platform-server/__tests__/run-events.publish.test.ts index 3d8bc2774..709bc81c2 100644 --- a/packages/platform-server/__tests__/run-events.publish.test.ts +++ b/packages/platform-server/__tests__/run-events.publish.test.ts @@ -37,7 +37,8 @@ maybeDescribe('RunEventsService publishEvent broadcasting', () => { const runtime = { subscribe: vi.fn() } as any; const metrics = { getThreadsMetrics: vi.fn().mockResolvedValue({}) } as any; const prismaStub = { getClient: vi.fn().mockReturnValue({ $queryRaw: vi.fn().mockResolvedValue([]) }) } as any; - gateway = new GraphSocketGateway(runtime, metrics, prismaStub, eventsBus); + const notificationsPublisher = { publishToRooms: vi.fn().mockResolvedValue(undefined) }; + gateway = new GraphSocketGateway(runtime, metrics, prismaStub, eventsBus, notificationsPublisher); emitRunEventSpy = vi.spyOn(gateway, 'emitRunEvent'); await gateway.onModuleInit(); }); diff --git a/packages/platform-server/__tests__/socket.events.test.ts b/packages/platform-server/__tests__/socket.events.test.ts index 6a2a597d8..2245d1de7 100644 --- a/packages/platform-server/__tests__/socket.events.test.ts +++ b/packages/platform-server/__tests__/socket.events.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect, vi } from 'vitest'; -import { FastifyAdapter } from '@nestjs/platform-fastify'; +import { createServer } from 'http'; import { GraphSocketGateway } from '../src/gateway/graph.socket.gateway'; import { PrismaService } from '../src/core/services/prisma.service'; import { ThreadsMetricsService } from '../src/agents/threads.metrics.service'; @@ -11,9 +11,7 @@ class TestNode extends Node> { } describe('Socket events', () => { - it('emits node_status on provision/deprovision', async () => { - const adapter = new FastifyAdapter(); - const fastify = adapter.getInstance(); + it('publishes node_status on provision/deprovision', async () => { let listener: ((ev: { nodeId: string; prev: string; next: string; at: number }) => void) | undefined; const runtimeStub = { subscribe: (fn: typeof listener) => { listener = fn; return () => {}; } } as unknown as import('../src/graph-core/liveGraph.manager').LiveGraphRuntime; const prismaStub = { getClient: () => ({ $queryRaw: async () => [] }) } as unknown as PrismaService; @@ -31,15 +29,11 @@ describe('Socket events', () => { subscribeToThreadMetrics: () => () => {}, subscribeToThreadMetricsAncestors: () => () => {}, }; - const gateway = new GraphSocketGateway(runtimeStub, metrics, prismaStub, eventsBusStub as any); - gateway.init({ server: fastify.server }); - - const emitMap = new Map>(); - const toSpy = vi.fn((room: string) => { - if (!emitMap.has(room)) emitMap.set(room, vi.fn()); - return { emit: emitMap.get(room)! }; - }); - (gateway as any).io = { to: toSpy }; + const notificationsPublisher = { publishToRooms: vi.fn().mockResolvedValue(undefined) }; + const gateway = new GraphSocketGateway(runtimeStub, metrics, prismaStub, eventsBusStub as any, notificationsPublisher); + const server = createServer(); + gateway.onModuleInit(); + gateway.init({ server }); const node = new TestNode(); node.init({ nodeId: 'n1' }); @@ -50,21 +44,17 @@ describe('Socket events', () => { listener?.({ nodeId: 'n1', prev: 'ready', next: 'deprovisioning', at: now + 2 }); listener?.({ nodeId: 'n1', prev: 'deprovisioning', next: 'not_ready', at: now + 3 }); - expect(toSpy).toHaveBeenCalledWith('graph'); - expect(toSpy).toHaveBeenCalledWith('node:n1'); - const graphEmitter = emitMap.get('graph'); - const nodeEmitter = emitMap.get('node:n1'); - expect(graphEmitter).toBeTruthy(); - expect(nodeEmitter).toBeTruthy(); - expect(graphEmitter).toHaveBeenCalledTimes(4); - expect(nodeEmitter).toHaveBeenCalledTimes(4); - const payload = graphEmitter?.mock.calls[0]?.[1]; - expect(payload).toMatchObject({ nodeId: 'n1', provisionStatus: { state: 'provisioning' } }); + const calls = notificationsPublisher.publishToRooms.mock.calls.filter((call) => call[0].event === 'node_status'); + expect(calls).toHaveLength(4); + for (const call of calls) { + const [{ rooms, payload }] = call; + expect(rooms).toEqual(expect.arrayContaining(['graph', 'node:n1'])); + expect(payload).toMatchObject({ nodeId: 'n1' }); + } + server.close(); }); - it('emits node_state via NodeStateService bridge', async () => { - const adapter = new FastifyAdapter(); - const fastify = adapter.getInstance(); + it('publishes node_state via NodeStateService bridge', async () => { const runtimeStub = { subscribe: () => () => {} } as unknown as import('../src/graph-core/liveGraph.manager').LiveGraphRuntime; const prismaStub = { getClient: () => ({ $queryRaw: async () => [] }) } as unknown as PrismaService; const metrics = new ThreadsMetricsService(prismaStub as any); @@ -81,24 +71,20 @@ describe('Socket events', () => { subscribeToThreadMetrics: () => () => {}, subscribeToThreadMetricsAncestors: () => () => {}, }; - const gateway = new GraphSocketGateway(runtimeStub, metrics, prismaStub, eventsBusStub as any); - gateway.init({ server: fastify.server }); - const emitMap = new Map>(); - const toSpy = vi.fn((room: string) => { - if (!emitMap.has(room)) emitMap.set(room, vi.fn()); - return { emit: emitMap.get(room)! }; - }); - (gateway as any).io = { to: toSpy }; + const notificationsPublisher = { publishToRooms: vi.fn().mockResolvedValue(undefined) }; + const gateway = new GraphSocketGateway(runtimeStub, metrics, prismaStub, eventsBusStub as any, notificationsPublisher); + gateway.onModuleInit(); gateway.emitNodeState('n1', { k: 'v' }); - expect(toSpy).toHaveBeenCalledWith('graph'); - expect(toSpy).toHaveBeenCalledWith('node:n1'); - expect(emitMap.get('graph')).toHaveBeenCalledWith('node_state', expect.objectContaining({ nodeId: 'n1', state: { k: 'v' } })); - expect(emitMap.get('node:n1')).toHaveBeenCalledWith('node_state', expect.objectContaining({ nodeId: 'n1', state: { k: 'v' } })); + expect(notificationsPublisher.publishToRooms).toHaveBeenCalledWith( + expect.objectContaining({ + rooms: expect.arrayContaining(['graph', 'node:n1']), + event: 'node_state', + payload: expect.objectContaining({ nodeId: 'n1', state: { k: 'v' } }), + }), + ); }); - it('emits reminder count to graph and node rooms', async () => { - const adapter = new FastifyAdapter(); - const fastify = adapter.getInstance(); + it('publishes reminder count to graph and node rooms', async () => { const runtimeStub = { subscribe: () => () => {} } as unknown as import('../src/graph-core/liveGraph.manager').LiveGraphRuntime; const prismaStub = { getClient: () => ({ $queryRaw: async () => [] }) } as unknown as PrismaService; const metrics = new ThreadsMetricsService(prismaStub as any); @@ -115,18 +101,16 @@ describe('Socket events', () => { subscribeToThreadMetrics: () => () => {}, subscribeToThreadMetricsAncestors: () => () => {}, }; - const gateway = new GraphSocketGateway(runtimeStub, metrics, prismaStub, eventsBusStub as any); - gateway.init({ server: fastify.server }); - const emitMap = new Map>(); - const toSpy = vi.fn((room: string) => { - if (!emitMap.has(room)) emitMap.set(room, vi.fn()); - return { emit: emitMap.get(room)! }; - }); - (gateway as any).io = { to: toSpy }; + const notificationsPublisher = { publishToRooms: vi.fn().mockResolvedValue(undefined) }; + const gateway = new GraphSocketGateway(runtimeStub, metrics, prismaStub, eventsBusStub as any, notificationsPublisher); + gateway.onModuleInit(); gateway.emitReminderCount('n1', 3, Date.now()); - expect(toSpy).toHaveBeenCalledWith('graph'); - expect(toSpy).toHaveBeenCalledWith('node:n1'); - expect(emitMap.get('graph')).toHaveBeenCalledWith('node_reminder_count', expect.objectContaining({ nodeId: 'n1', count: 3 })); - expect(emitMap.get('node:n1')).toHaveBeenCalledWith('node_reminder_count', expect.objectContaining({ nodeId: 'n1', count: 3 })); + expect(notificationsPublisher.publishToRooms).toHaveBeenCalledWith( + expect.objectContaining({ + rooms: expect.arrayContaining(['graph', 'node:n1']), + event: 'node_reminder_count', + payload: expect.objectContaining({ nodeId: 'n1', count: 3 }), + }), + ); }); }); diff --git a/packages/platform-server/__tests__/socket.gateway.test.ts b/packages/platform-server/__tests__/socket.gateway.test.ts index cf1efbb9f..f01a6b2f1 100644 --- a/packages/platform-server/__tests__/socket.gateway.test.ts +++ b/packages/platform-server/__tests__/socket.gateway.test.ts @@ -24,7 +24,14 @@ describe('GraphSocketGateway', () => { subscribeToThreadMetrics: () => () => {}, subscribeToThreadMetricsAncestors: () => () => {}, }; - const gateway = new GraphSocketGateway(runtimeStub, metrics, prismaStub, eventsBusStub as any); + const notificationsPublisher = { publishToRooms: async () => undefined }; + const gateway = new GraphSocketGateway( + runtimeStub, + metrics, + prismaStub, + eventsBusStub as any, + notificationsPublisher, + ); expect(() => gateway.init({ server: fastify.server })).not.toThrow(); }); }); diff --git a/packages/platform-server/__tests__/socket.metrics.coalesce.test.ts b/packages/platform-server/__tests__/socket.metrics.coalesce.test.ts index ed6c42b33..1bb665bcd 100644 --- a/packages/platform-server/__tests__/socket.metrics.coalesce.test.ts +++ b/packages/platform-server/__tests__/socket.metrics.coalesce.test.ts @@ -1,12 +1,9 @@ import { describe, it, expect, vi } from 'vitest'; -import { FastifyAdapter } from '@nestjs/platform-fastify'; import { GraphSocketGateway } from '../src/gateway/graph.socket.gateway'; describe('GraphSocketGateway metrics coalescing', () => { it('coalesces multiple schedules into single batch computation', async () => { vi.useFakeTimers(); - const adapter = new FastifyAdapter(); - const fastify = adapter.getInstance(); const runtimeStub = { subscribe: () => () => {} } as unknown as import('../src/graph-core/liveGraph.manager').LiveGraphRuntime; // Stub metrics service to capture calls const getThreadsMetrics = vi.fn(async (_ids: string[]) => @@ -27,11 +24,15 @@ describe('GraphSocketGateway metrics coalescing', () => { subscribeToThreadMetrics: () => () => {}, subscribeToThreadMetricsAncestors: () => () => {}, }; - const gateway = new GraphSocketGateway(runtimeStub, metricsStub, prismaStub, eventsBusStub as any); - // Attach and stub io emit sink - gateway.init({ server: fastify.server }); - const captured: Array<{ room: string; event: string; payload: any }> = []; - (gateway as any)['io'] = { to: (room: string) => ({ emit: (event: string, payload: any) => { captured.push({ room, event, payload }); } }) }; + const notificationsPublisher = { publishToRooms: vi.fn().mockResolvedValue(undefined) }; + const gateway = new GraphSocketGateway( + runtimeStub, + metricsStub, + prismaStub, + eventsBusStub as any, + notificationsPublisher, + ); + gateway.onModuleInit(); gateway.scheduleThreadMetrics('t1'); gateway.scheduleThreadMetrics('t2'); @@ -42,10 +43,10 @@ describe('GraphSocketGateway metrics coalescing', () => { // Assert single batch computation and grouped emits to both rooms expect(getThreadsMetrics).toHaveBeenCalledTimes(1); expect(getThreadsMetrics).toHaveBeenCalledWith(['t1', 't2']); - const activityThreadsRoom = captured.filter((e) => e.event === 'thread_activity_changed' && e.room === 'threads'); - const remindersThreadsRoom = captured.filter((e) => e.event === 'thread_reminders_count' && e.room === 'threads'); - expect(activityThreadsRoom.map((e) => e.payload.threadId).sort()).toEqual(['t1', 't2']); - expect(remindersThreadsRoom.map((e) => e.payload.threadId).sort()).toEqual(['t1', 't2']); + const activityCalls = notificationsPublisher.publishToRooms.mock.calls.filter((call) => call[0].event === 'thread_activity_changed'); + const remindersCalls = notificationsPublisher.publishToRooms.mock.calls.filter((call) => call[0].event === 'thread_reminders_count'); + expect(activityCalls.map((call) => call[0].payload.threadId).sort()).toEqual(['t1', 't2']); + expect(remindersCalls.map((call) => call[0].payload.threadId).sort()).toEqual(['t1', 't2']); vi.useRealTimers(); }); }); diff --git a/packages/platform-server/__tests__/socket.node_status.integration.test.ts b/packages/platform-server/__tests__/socket.node_status.integration.test.ts index aa00be962..022f6eeaf 100644 --- a/packages/platform-server/__tests__/socket.node_status.integration.test.ts +++ b/packages/platform-server/__tests__/socket.node_status.integration.test.ts @@ -1,5 +1,4 @@ import { describe, it, expect } from 'vitest'; -import { FastifyAdapter } from '@nestjs/platform-fastify'; import { GraphSocketGateway } from '../src/gateway/graph.socket.gateway'; import Node from '../src/nodes/base/Node'; @@ -7,8 +6,6 @@ class DummyNode extends Node> { getPortConfig() { return describe('Gateway node_status integration', () => { it('broadcasts on node lifecycle changes', async () => { - const adapter = new FastifyAdapter(); - const fastify = adapter.getInstance(); const runtimeStub = { subscribe: () => () => {} } as unknown as import('../src/graph-core/liveGraph.manager').LiveGraphRuntime; const metricsStub = { getThreadsMetrics: async () => ({}) }; const prismaStub = { @@ -29,8 +26,15 @@ describe('Gateway node_status integration', () => { subscribeToThreadMetrics: () => () => {}, subscribeToThreadMetricsAncestors: () => () => {}, }; - const gateway = new GraphSocketGateway(runtimeStub, metricsStub as any, prismaStub as any, eventsBusStub as any); - gateway.init({ server: fastify.server }); + const notificationsPublisher = { publishToRooms: async () => undefined }; + const gateway = new GraphSocketGateway( + runtimeStub, + metricsStub as any, + prismaStub as any, + eventsBusStub as any, + notificationsPublisher, + ); + gateway.onModuleInit(); const node = new DummyNode(); node.init({ nodeId: 'nX' }); await node.provision(); diff --git a/packages/platform-server/__tests__/socket.realtime.integration.test.ts b/packages/platform-server/__tests__/socket.realtime.integration.test.ts index 9b5eaeef9..b4740dc6f 100644 --- a/packages/platform-server/__tests__/socket.realtime.integration.test.ts +++ b/packages/platform-server/__tests__/socket.realtime.integration.test.ts @@ -144,7 +144,14 @@ if (!shouldRunRealtimeTests) { await new Promise((resolve) => server.listen(0, resolve)); const { port } = server.address() as AddressInfo; const eventsBus = createEventsBusNoop(); - const gateway = new GraphSocketGateway(runtime, metricsDouble.service, prismaStub, eventsBus); + const notificationsPublisher = { publishToRooms: async () => undefined }; + const gateway = new GraphSocketGateway( + runtime, + metricsDouble.service, + prismaStub, + eventsBus, + notificationsPublisher, + ); gateway.onModuleInit(); gateway.init({ server }); @@ -203,7 +210,14 @@ if (!shouldRunRealtimeTests) { const prismaService = ({ getClient: () => prisma }) as PrismaService; const runEvents = new RunEventsService(prismaService); const eventsBus = new EventsBusService(runEvents); - const gateway = new GraphSocketGateway(runtime, metricsDouble.service, prismaService, eventsBus); + const notificationsPublisher = { publishToRooms: async () => undefined }; + const gateway = new GraphSocketGateway( + runtime, + metricsDouble.service, + prismaService, + eventsBus, + notificationsPublisher, + ); gateway.onModuleInit(); const server = createServer(); @@ -267,7 +281,14 @@ if (!shouldRunRealtimeTests) { const prismaService = ({ getClient: () => prisma }) as PrismaService; const runEvents = new RunEventsService(prismaService); const eventsBus = new EventsBusService(runEvents); - const gateway = new GraphSocketGateway(runtime, metricsDouble.service, prismaService, eventsBus); + const notificationsPublisher = { publishToRooms: async () => undefined }; + const gateway = new GraphSocketGateway( + runtime, + metricsDouble.service, + prismaService, + eventsBus, + notificationsPublisher, + ); gateway.onModuleInit(); const server = createServer(); diff --git a/packages/platform-server/__tests__/sql.threads.metrics.queries.test.ts b/packages/platform-server/__tests__/sql.threads.metrics.queries.test.ts index 1b9e81b59..a2c3b1b3b 100644 --- a/packages/platform-server/__tests__/sql.threads.metrics.queries.test.ts +++ b/packages/platform-server/__tests__/sql.threads.metrics.queries.test.ts @@ -64,7 +64,14 @@ describe('SQL: WITH RECURSIVE and UUID casts', () => { const metricsStub = { getThreadsMetrics: vi.fn(async () => ({})) }; const runtimeStub = { subscribe: () => () => {} } as any; const eventsBusStub = {} as any; - const gateway = new GraphSocketGateway(runtimeStub, metricsStub as any, prismaStub, eventsBusStub); + const notificationsPublisher = { publishToRooms: vi.fn().mockResolvedValue(undefined) }; + const gateway = new GraphSocketGateway( + runtimeStub, + metricsStub as any, + prismaStub, + eventsBusStub, + notificationsPublisher, + ); const scheduled: string[] = []; // Spy/override scheduleThreadMetrics to capture scheduled ids diff --git a/packages/platform-server/package.json b/packages/platform-server/package.json index 742c3a8c1..4d9bb69ae 100644 --- a/packages/platform-server/package.json +++ b/packages/platform-server/package.json @@ -25,6 +25,8 @@ "@agyn/json-schema-to-zod": "workspace:*", "@agyn/docker-runner": "workspace:*", "@agyn/llm": "workspace:*", + "@connectrpc/connect": "^1.7.0", + "@connectrpc/connect-node": "^1.7.0", "@bufbuild/protobuf": "^2.11.0", "@grpc/grpc-js": "^1.12.2", "@fastify/cors": "^11.1.0", diff --git a/packages/platform-server/src/core/services/config.service.ts b/packages/platform-server/src/core/services/config.service.ts index d31a2066a..503467c73 100644 --- a/packages/platform-server/src/core/services/config.service.ts +++ b/packages/platform-server/src/core/services/config.service.ts @@ -109,6 +109,17 @@ export const configSchema = z.object({ const num = typeof v === 'number' ? v : Number(v); return Number.isFinite(num) && num >= 0 ? num : 0; }), + notificationsGrpcAddr: z + .string() + .min(1, 'NOTIFICATIONS_GRPC_ADDR is required') + .transform((value) => value.trim()), + notificationsGrpcDeadlineMs: z + .union([z.string(), z.number()]) + .default('3000') + .transform((value) => { + const num = typeof value === 'number' ? value : Number(value); + return Number.isFinite(num) && num > 0 ? num : 3000; + }), // Nix search/proxy settings nixAllowedChannels: z .string() @@ -513,6 +524,12 @@ export class ConfigService implements Config { get agentsDatabaseUrl(): string { return this.params.agentsDatabaseUrl; } + get notificationsGrpcAddr(): string { + return this.params.notificationsGrpcAddr; + } + get notificationsGrpcDeadlineMs(): number { + return this.params.notificationsGrpcDeadlineMs; + } get corsOrigins(): string[] { return this.params.corsOrigins ?? []; } @@ -557,6 +574,8 @@ export class ConfigService implements Config { dockerRunnerConnectRetryJitterMs: process.env.DOCKER_RUNNER_CONNECT_RETRY_JITTER_MS, dockerRunnerConnectProbeIntervalMs: process.env.DOCKER_RUNNER_CONNECT_PROBE_INTERVAL_MS, dockerRunnerConnectMaxRetries: process.env.DOCKER_RUNNER_CONNECT_MAX_RETRIES, + notificationsGrpcAddr: process.env.NOTIFICATIONS_GRPC_ADDR, + notificationsGrpcDeadlineMs: process.env.NOTIFICATIONS_GRPC_DEADLINE_MS, nixAllowedChannels: process.env.NIX_ALLOWED_CHANNELS, nixHttpTimeoutMs: process.env.NIX_HTTP_TIMEOUT_MS, nixCacheTtlMs: process.env.NIX_CACHE_TTL_MS, diff --git a/packages/platform-server/src/gateway/gateway.module.ts b/packages/platform-server/src/gateway/gateway.module.ts index 57526a09a..59f783ce9 100644 --- a/packages/platform-server/src/gateway/gateway.module.ts +++ b/packages/platform-server/src/gateway/gateway.module.ts @@ -2,9 +2,10 @@ import { Module } from '@nestjs/common'; import { GraphApiModule } from '../graph/graph-api.module'; import { EventsModule } from '../events/events.module'; import { GraphSocketGateway } from './graph.socket.gateway'; +import { NotificationsClientModule } from '../notifications/notifications-client.module'; @Module({ - imports: [GraphApiModule, EventsModule], + imports: [GraphApiModule, EventsModule, NotificationsClientModule], providers: [GraphSocketGateway], exports: [GraphSocketGateway], }) diff --git a/packages/platform-server/src/gateway/graph.socket.gateway.ts b/packages/platform-server/src/gateway/graph.socket.gateway.ts index 8eb04c8b8..ba893a6e4 100644 --- a/packages/platform-server/src/gateway/graph.socket.gateway.ts +++ b/packages/platform-server/src/gateway/graph.socket.gateway.ts @@ -19,6 +19,10 @@ import { import type { ToolOutputChunkPayload, ToolOutputTerminalPayload } from '../events/run-events.service'; import { ThreadsMetricsService } from '../agents/threads.metrics.service'; import { PrismaService } from '../core/services/prisma.service'; +import { + UI_NOTIFICATIONS_PUBLISHER, + type UiNotificationsPublisher, +} from '../notifications/ui-notifications.publisher'; // Strict outbound event payloads export const NodeStatusEventSchema = z @@ -116,6 +120,7 @@ export class GraphSocketGateway implements OnModuleInit, OnModuleDestroy { @Inject(ThreadsMetricsService) private readonly metrics: ThreadsMetricsService, @Inject(PrismaService) private readonly prismaService: PrismaService, @Inject(EventsBusService) private readonly eventsBus: EventsBusService, + @Inject(UI_NOTIFICATIONS_PUBLISHER) private readonly notificationsPublisher: UiNotificationsPublisher, ) {} onModuleInit(): void { @@ -462,7 +467,6 @@ export class GraphSocketGateway implements OnModuleInit, OnModuleDestroy { payload: T, schema: z.ZodType, ) { - if (!this.io) return; const parsed = schema.safeParse(payload); if (!parsed.success) { this.logger.error( @@ -623,7 +627,7 @@ export class GraphSocketGateway implements OnModuleInit, OnModuleDestroy { const ids = Array.from(new Set(this.pendingThreads)); this.pendingThreads.clear(); this.metricsTimer = null; - if (!this.io || ids.length === 0) return; + if (ids.length === 0) return; try { const map = await this.metrics.getThreadsMetrics(ids); for (const id of ids) { @@ -688,18 +692,16 @@ export class GraphSocketGateway implements OnModuleInit, OnModuleDestroy { event: string, payload: unknown, ) { - if (!this.io || rooms.length === 0) return; - for (const room of rooms) { - try { - this.io.to(room).emit(event, payload); - } catch (error) { + if (rooms.length === 0) return; + void this.notificationsPublisher + .publishToRooms({ rooms, event, payload, source: 'platform-server' }) + .catch((error: unknown) => { const errPayload = error instanceof Error ? { name: error.name, message: error.message } : { message: String(error) }; this.logger.warn( - `GraphSocketGateway: emit error ${this.formatContext({ event, room, error: errPayload })}`, + `GraphSocketGateway: notifications publish error ${this.formatContext({ event, rooms, error: errPayload })}`, ); - } - } + }); } private formatContext(context: Record): string { diff --git a/packages/platform-server/src/notifications/notifications-client.module.ts b/packages/platform-server/src/notifications/notifications-client.module.ts new file mode 100644 index 000000000..9f9524c4a --- /dev/null +++ b/packages/platform-server/src/notifications/notifications-client.module.ts @@ -0,0 +1,37 @@ +import { Module } from '@nestjs/common'; +import { CoreModule } from '../core/core.module'; +import { ConfigService } from '../core/services/config.service'; +import { + NotificationsGrpcPublisher, + NOTIFICATIONS_GRPC_PUBLISHER_PROVIDER, + NOTIFICATIONS_PUBLISHER_CONFIG, + type NotificationsPublisherConfig, +} from './notifications-grpc.publisher'; +import { UI_NOTIFICATIONS_PUBLISHER } from './ui-notifications.publisher'; + +const toBaseUrl = (addr: string): string => { + const trimmed = addr.trim(); + if (!trimmed.includes('://')) { + return `http://${trimmed}`; + } + return trimmed; +}; + +@Module({ + imports: [CoreModule], + providers: [ + NotificationsGrpcPublisher, + { + provide: NOTIFICATIONS_PUBLISHER_CONFIG, + useFactory: (config: ConfigService): NotificationsPublisherConfig => ({ + baseUrl: toBaseUrl(config.notificationsGrpcAddr), + deadlineMs: config.notificationsGrpcDeadlineMs, + source: 'platform-server', + }), + inject: [ConfigService], + }, + NOTIFICATIONS_GRPC_PUBLISHER_PROVIDER, + ], + exports: [UI_NOTIFICATIONS_PUBLISHER], +}) +export class NotificationsClientModule {} diff --git a/packages/platform-server/src/notifications/notifications-grpc.publisher.ts b/packages/platform-server/src/notifications/notifications-grpc.publisher.ts new file mode 100644 index 000000000..296966334 --- /dev/null +++ b/packages/platform-server/src/notifications/notifications-grpc.publisher.ts @@ -0,0 +1,96 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { createPromiseClient } from '@connectrpc/connect'; +import { createConnectTransport } from '@connectrpc/connect-node'; +import { NotificationsService } from '../proto/gen/agynio/api/notifications/v1/notifications_connect'; +import type { UiNotificationPublishRequest, UiNotificationsPublisher } from './ui-notifications.publisher'; +import { UI_NOTIFICATIONS_PUBLISHER } from './ui-notifications.publisher'; + +export type NotificationsPublisherConfig = { + baseUrl: string; + deadlineMs: number; + source: string; +}; + +export const NOTIFICATIONS_PUBLISHER_CONFIG = Symbol('NOTIFICATIONS_PUBLISHER_CONFIG'); + +function isJsonRecord(value: unknown): value is Record { + return value !== null && typeof value === 'object' && !Array.isArray(value); +} + +type NotificationsPublishClient = { + publish( + request: { + event: string; + rooms: string[]; + payload?: Record | undefined; + source?: string; + }, + options?: { signal?: AbortSignal }, + ): Promise; +}; + +function isNotificationsPublishClient(value: unknown): value is NotificationsPublishClient { + return ( + typeof value === 'object' && + value !== null && + typeof (value as { publish?: unknown }).publish === 'function' + ); +} + +@Injectable() +export class NotificationsGrpcPublisher implements UiNotificationsPublisher { + private readonly client: NotificationsPublishClient; + + constructor(@Inject(NOTIFICATIONS_PUBLISHER_CONFIG) private readonly config: NotificationsPublisherConfig) { + /* + * connect-es marks generated service definitions with @ts-nocheck, so createPromiseClient() currently + * erases type information to `any`. We validate the publish method below before storing the client. + */ + const candidate = createPromiseClient( + NotificationsService, + createConnectTransport({ + baseUrl: config.baseUrl, + httpVersion: '2', + }), + ); + if (!isNotificationsPublishClient(candidate)) { + throw new Error('Notifications gRPC client missing publish implementation'); + } + this.client = candidate; + } + + async publishToRooms(request: UiNotificationPublishRequest): Promise { + const { rooms, event, payload, source } = request; + if (!Array.isArray(rooms) || rooms.length === 0) return; + + let jsonPayload: Record | undefined; + if (payload !== undefined) { + if (!isJsonRecord(payload)) { + const descriptor = payload === null ? 'null' : Array.isArray(payload) ? 'array' : typeof payload; + throw new Error(`NotificationsGrpcPublisher only supports object payloads; received ${descriptor}`); + } + jsonPayload = payload; + } + + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), this.config.deadlineMs); + try { + await this.client.publish( + { + event, + rooms, + payload: jsonPayload, + source: source ?? this.config.source, + }, + { signal: controller.signal }, + ); + } finally { + clearTimeout(timer); + } + } +} + +export const NOTIFICATIONS_GRPC_PUBLISHER_PROVIDER = { + provide: UI_NOTIFICATIONS_PUBLISHER, + useExisting: NotificationsGrpcPublisher, +}; diff --git a/packages/platform-server/src/notifications/ui-notifications.publisher.ts b/packages/platform-server/src/notifications/ui-notifications.publisher.ts new file mode 100644 index 000000000..b6bdb2f5b --- /dev/null +++ b/packages/platform-server/src/notifications/ui-notifications.publisher.ts @@ -0,0 +1,12 @@ +export type UiNotificationPublishRequest = { + rooms: string[]; + event: string; + payload: unknown; + source?: string; +}; + +export interface UiNotificationsPublisher { + publishToRooms(request: UiNotificationPublishRequest): Promise; +} + +export const UI_NOTIFICATIONS_PUBLISHER = Symbol('UI_NOTIFICATIONS_PUBLISHER'); diff --git a/packages/platform-ui/.env.example b/packages/platform-ui/.env.example index 44755da72..390b2bbae 100644 --- a/packages/platform-ui/.env.example +++ b/packages/platform-ui/.env.example @@ -3,6 +3,9 @@ # Base URL for Agents Platform API (do not include /api) VITE_API_BASE_URL=http://localhost:3010 +# Optional: override notifications socket endpoint (defaults to VITE_API_BASE_URL) +VITE_SOCKET_BASE_URL=http://localhost:4000 + # Optional: override tracing server base URL # Defaults to ${VITE_API_BASE_URL}/tracing when not set VITE_TRACING_SERVER_URL=http://localhost:4319 diff --git a/packages/platform-ui/README.md b/packages/platform-ui/README.md index db3813731..c773e693a 100644 --- a/packages/platform-ui/README.md +++ b/packages/platform-ui/README.md @@ -7,8 +7,9 @@ Quickstart - Run tests: pnpm -w -F @agyn/platform-ui test - Dev: pnpm -w -F @agyn/platform-ui dev -Env configuration (required) -- VITE_API_BASE_URL: base URL for the Agents API used by the UI. Set to your server origin (e.g., https://agents.example.com). **Do not include `/api`;** REST requests add it automatically and websockets connect to `/socket.io` on the same origin. +Env configuration +- VITE_API_BASE_URL (required): base URL for the Agents API used by the UI. Set to your server origin (e.g., https://agents.example.com). **Do not include `/api`;** REST requests add it automatically. +- VITE_SOCKET_BASE_URL (optional): override the notifications Socket.IO origin. Defaults to `VITE_API_BASE_URL` when unset; the UI always connects to `/socket.io` on the resolved origin. - VITE_UI_MOCK_SIDEBAR (optional, default `false`): enable mock sidebar templates only for local prototyping. Requires a dev build (`import.meta.env.DEV`). Never enable in production builds. API base URL @@ -16,6 +17,7 @@ API base URL Notes - Legacy VITE_GRAPH_API_BASE has been removed. Use VITE_API_BASE_URL. +- Socket connections default to the same origin as the API unless VITE_SOCKET_BASE_URL is provided. Provider setup ```tsx diff --git a/packages/platform-ui/src/config.ts b/packages/platform-ui/src/config.ts index 2a0062b93..73093948a 100644 --- a/packages/platform-ui/src/config.ts +++ b/packages/platform-ui/src/config.ts @@ -3,14 +3,16 @@ type ViteEnv = { VITE_API_BASE_URL?: string; + VITE_SOCKET_BASE_URL?: string; STORYBOOK?: string; }; function resolveStorybookFallback(name: keyof ViteEnv): string | null { - if (name !== 'VITE_API_BASE_URL') return null; const isStorybook = import.meta.env?.STORYBOOK === 'true'; if (!isStorybook) return null; - return 'http://localhost:4173/api'; + if (name === 'VITE_API_BASE_URL') return 'http://localhost:4173/api'; + if (name === 'VITE_SOCKET_BASE_URL') return 'http://localhost:4173'; + return null; } function requireEnv(name: keyof ViteEnv): string { @@ -50,9 +52,12 @@ function deriveBase(raw: string, options: { stripApi: boolean }): string { } const rawApiBase = requireEnv('VITE_API_BASE_URL'); - const apiBaseUrl = deriveBase(rawApiBase, { stripApi: true }); -const socketBaseUrl = deriveBase(rawApiBase, { stripApi: true }); + +const socketEnv = import.meta.env?.VITE_SOCKET_BASE_URL; +const useApiFallback = !(typeof socketEnv === 'string' && socketEnv.trim().length > 0); +const rawSocketBase = useApiFallback ? rawApiBase : (socketEnv as string); +const socketBaseUrl = deriveBase(rawSocketBase, { stripApi: useApiFallback }); export const config = { apiBaseUrl, socketBaseUrl, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index fe0208086..85fa6f060 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -128,6 +128,49 @@ importers: specifier: ^4.1.12 version: 4.1.12 + packages/notifications: + dependencies: + '@bufbuild/protobuf': + specifier: ^2.11.0 + version: 2.11.0 + '@connectrpc/connect': + specifier: ^1.7.0 + version: 1.7.0(@bufbuild/protobuf@2.11.0) + '@connectrpc/connect-node': + specifier: ^1.3.1 + version: 1.7.0(@bufbuild/protobuf@2.11.0)(@connectrpc/connect@1.7.0(@bufbuild/protobuf@2.11.0)) + pino: + specifier: ^10.1.0 + version: 10.1.0 + socket.io: + specifier: ^4.8.1 + version: 4.8.1 + uuid: + specifier: ^13.0.0 + version: 13.0.0 + zod: + specifier: ^4.1.9 + version: 4.1.12 + devDependencies: + '@types/node': + specifier: ^24.5.1 + version: 24.5.2 + eslint: + specifier: ^9.13.0 + version: 9.36.0(jiti@2.5.1) + tsx: + specifier: ^4.20.5 + version: 4.20.5 + typescript: + specifier: ^5.8.3 + version: 5.8.3 + typescript-eslint: + specifier: ^8.8.1 + version: 8.44.0(eslint@9.36.0(jiti@2.5.1))(typescript@5.8.3) + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/debug@4.1.12)(@types/node@24.5.2)(jiti@2.5.1)(jsdom@27.1.0(postcss@8.5.6))(lightningcss@1.30.1)(msw@2.11.3(@types/node@24.5.2)(typescript@5.8.3))(tsx@4.20.5)(yaml@2.8.1) + packages/platform-server: dependencies: '@agyn/docker-runner': @@ -145,6 +188,12 @@ importers: '@bufbuild/protobuf': specifier: ^2.11.0 version: 2.11.0 + '@connectrpc/connect': + specifier: ^1.7.0 + version: 1.7.0(@bufbuild/protobuf@2.11.0) + '@connectrpc/connect-node': + specifier: ^1.7.0 + version: 1.7.0(@bufbuild/protobuf@2.11.0)(@connectrpc/connect@1.7.0(@bufbuild/protobuf@2.11.0)) '@fastify/cors': specifier: ^11.1.0 version: 11.1.0 @@ -953,6 +1002,18 @@ packages: peerDependencies: commander: ~13.1.0 + '@connectrpc/connect-node@1.7.0': + resolution: {integrity: sha512-6vaPIkG/NyhxlYgytLoR9KYbPhczEboFB2OYWkA9qvUz1K7efXfeGrlRxoLtpa+r8VxyIOw73w5ktNe743nD+A==} + engines: {node: '>=16.0.0'} + peerDependencies: + '@bufbuild/protobuf': ^1.10.0 + '@connectrpc/connect': 1.7.0 + + '@connectrpc/connect@1.7.0': + resolution: {integrity: sha512-iNKdJRi69YP3mq6AePRT8F/HrxWCewrhxnLMNm0vpqXAR8biwzRtO6Hjx80C6UvtKJ5sFmffQT7I4Baecz389w==} + peerDependencies: + '@bufbuild/protobuf': ^1.10.0 + '@cspotcode/source-map-support@0.8.1': resolution: {integrity: sha512-IchNf6dN4tHoMFIn/7OE8LWZ19Y6q/67Bmf6vnGREv8RSbBVb9LPJxEcnwrcwX6ixSvaiGoomAUvu4YSxXrVgw==} engines: {node: '>=12'} @@ -8805,6 +8866,16 @@ snapshots: dependencies: commander: 13.1.0 + '@connectrpc/connect-node@1.7.0(@bufbuild/protobuf@2.11.0)(@connectrpc/connect@1.7.0(@bufbuild/protobuf@2.11.0))': + dependencies: + '@bufbuild/protobuf': 2.11.0 + '@connectrpc/connect': 1.7.0(@bufbuild/protobuf@2.11.0) + undici: 6.19.8 + + '@connectrpc/connect@1.7.0(@bufbuild/protobuf@2.11.0)': + dependencies: + '@bufbuild/protobuf': 2.11.0 + '@cspotcode/source-map-support@0.8.1': dependencies: '@jridgewell/trace-mapping': 0.3.9 From f56cb012b55c743b62240c932a03f658107c2fe2 Mon Sep 17 00:00:00 2001 From: Casey Brooks Date: Sat, 28 Feb 2026 02:12:29 +0000 Subject: [PATCH 2/6] test(platform-server): stub litellm deps for e2e --- .../__e2e__/app.bootstrap.smoke.test.ts | 32 ++++++++++ .../__e2e__/bootstrap.di.test.ts | 58 ++++++++++++++++++- .../__e2e__/graph.socket.gateway.e2e.test.ts | 30 ++++++++++ .../llmSettings.adminStatus.e2e.test.ts | 7 +++ .../__e2e__/llmSettings.models.e2e.test.ts | 7 +++ .../__e2e__/llmProvisioner.bootstrap.test.ts | 1 + .../__tests__/config.service.fromEnv.test.ts | 7 +++ .../platform-server/__tests__/vitest.setup.ts | 2 + 8 files changed, 143 insertions(+), 1 deletion(-) diff --git a/packages/platform-server/__e2e__/app.bootstrap.smoke.test.ts b/packages/platform-server/__e2e__/app.bootstrap.smoke.test.ts index 3c3af549f..a633efe40 100644 --- a/packages/platform-server/__e2e__/app.bootstrap.smoke.test.ts +++ b/packages/platform-server/__e2e__/app.bootstrap.smoke.test.ts @@ -34,6 +34,10 @@ import { DockerRunnerConnectivityMonitor } from '../src/infra/container/dockerRu import { DockerRunnerStatusService } from '../src/infra/container/dockerRunnerStatus.service'; import { DockerRunnerRequestError } from '../src/infra/container/runnerGrpc.client'; import { HealthController } from '../src/infra/health/health.controller'; +import { + UI_NOTIFICATIONS_PUBLISHER, + type UiNotificationsPublisher, +} from '../src/notifications/ui-notifications.publisher'; process.env.LLM_PROVIDER = process.env.LLM_PROVIDER || 'litellm'; process.env.AGENTS_DATABASE_URL = process.env.AGENTS_DATABASE_URL || 'postgres://localhost:5432/test'; @@ -41,6 +45,27 @@ process.env.NCPS_ENABLED = process.env.NCPS_ENABLED || 'false'; process.env.CONTAINERS_CLEANUP_ENABLED = process.env.CONTAINERS_CLEANUP_ENABLED || 'false'; process.env.LITELLM_BASE_URL = process.env.LITELLM_BASE_URL || 'http://127.0.0.1:4000'; process.env.LITELLM_MASTER_KEY = process.env.LITELLM_MASTER_KEY || 'sk-dev-master-1234'; +process.env.NOTIFICATIONS_GRPC_ADDR = process.env.NOTIFICATIONS_GRPC_ADDR || 'notifications:50051'; +process.env.NOTIFICATIONS_GRPC_DEADLINE_MS = process.env.NOTIFICATIONS_GRPC_DEADLINE_MS || '3000'; + +vi.mock('../src/notifications/notifications-grpc.publisher', () => { + class NotificationsGrpcPublisher { + async publishToRooms(): Promise { + return Promise.resolve(); + } + } + + const NOTIFICATIONS_PUBLISHER_CONFIG = Symbol('NOTIFICATIONS_PUBLISHER_CONFIG'); + + return { + NotificationsGrpcPublisher, + NOTIFICATIONS_PUBLISHER_CONFIG, + NOTIFICATIONS_GRPC_PUBLISHER_PROVIDER: { + provide: UI_NOTIFICATIONS_PUBLISHER, + useExisting: NotificationsGrpcPublisher, + }, + }; +}); const TEST_TIMEOUT_MS = 15_000; const agentProbeToken = Symbol('agent_node_probe'); @@ -63,6 +88,7 @@ type BootstrapStubs = { dockerClientStub: Partial; volumeGcStub: Partial; connectivityMonitorStub: Partial; + notificationsPublisherStub: UiNotificationsPublisher; }; const createBootstrapStubs = (): BootstrapStubs => { @@ -206,6 +232,9 @@ const createBootstrapStubs = (): BootstrapStubs => { onModuleInit: vi.fn(), onModuleDestroy: vi.fn(), } satisfies Partial; + const notificationsPublisherStub: UiNotificationsPublisher = { + publishToRooms: vi.fn().mockResolvedValue(undefined), + }; return { prismaServiceStub, @@ -225,6 +254,7 @@ const createBootstrapStubs = (): BootstrapStubs => { dockerClientStub, volumeGcStub, connectivityMonitorStub, + notificationsPublisherStub, }; }; @@ -263,6 +293,8 @@ const applyBootstrapOverrides = ( .useValue(stubs.liveGraphRuntimeStub) .overrideProvider(LLMProvisioner) .useValue(stubs.llmProvisionerStub) + .overrideProvider(UI_NOTIFICATIONS_PUBLISHER) + .useValue(stubs.notificationsPublisherStub) .overrideProvider(LLMSettingsService) .useValue({}) .overrideProvider(DockerWorkspaceEventsWatcher) diff --git a/packages/platform-server/__e2e__/bootstrap.di.test.ts b/packages/platform-server/__e2e__/bootstrap.di.test.ts index 6b184e8d9..f8c0a9c89 100644 --- a/packages/platform-server/__e2e__/bootstrap.di.test.ts +++ b/packages/platform-server/__e2e__/bootstrap.di.test.ts @@ -1,6 +1,6 @@ import 'reflect-metadata'; import { FastifyAdapter } from '@nestjs/platform-fastify'; -import { describe, it, beforeEach, afterEach, expect } from 'vitest'; +import { describe, it, beforeEach, afterEach, expect, vi } from 'vitest'; import { NestFactory } from '@nestjs/core'; import { mkdtempSync, rmSync } from 'node:fs'; import { createServer, type IncomingMessage, type Server } from 'node:http'; @@ -27,10 +27,66 @@ const REQUIRED_ENV = { CONTAINERS_CLEANUP_ENABLED: 'false', VOLUME_GC_ENABLED: 'false', NCPS_ENABLED: 'false', + NOTIFICATIONS_GRPC_ADDR: 'notifications:50051', } as const; const TEST_TIMEOUT_MS = 20_000; +vi.mock('../src/notifications/notifications-grpc.publisher', async () => { + const { UI_NOTIFICATIONS_PUBLISHER } = await vi.importActual< + typeof import('../src/notifications/ui-notifications.publisher') + >('../src/notifications/ui-notifications.publisher'); + + class NotificationsGrpcPublisher { + async publishToRooms(): Promise { + return Promise.resolve(); + } + } + + const NOTIFICATIONS_PUBLISHER_CONFIG = Symbol('NOTIFICATIONS_PUBLISHER_CONFIG'); + + return { + NotificationsGrpcPublisher, + NOTIFICATIONS_PUBLISHER_CONFIG, + NOTIFICATIONS_GRPC_PUBLISHER_PROVIDER: { + provide: UI_NOTIFICATIONS_PUBLISHER, + useExisting: NotificationsGrpcPublisher, + }, + }; +}); + +vi.mock('../src/llm/provisioners/litellm.key.store', () => { + class LiteLLMKeyStore { + private readonly store = new Map(); + + async load(alias: string): Promise<{ alias: string; key: string; expiresAt: Date | null } | null> { + const record = this.store.get(alias); + if (!record) return null; + return { alias, ...record }; + } + + async save(record: { alias: string; key: string; expiresAt: Date | null }): Promise { + this.store.set(record.alias, { key: record.key, expiresAt: record.expiresAt }); + } + + async delete(alias: string): Promise { + this.store.delete(alias); + } + } + + return { LiteLLMKeyStore }; +}); + +vi.mock('../src/core/services/startupRecovery.service', () => { + class StartupRecoveryService { + async onApplicationBootstrap(): Promise { + return Promise.resolve(); + } + } + + return { StartupRecoveryService }; +}); + describe('Production bootstrap DI', () => { let savedEnv: Record = {}; let graphRepoPath: string; diff --git a/packages/platform-server/__e2e__/graph.socket.gateway.e2e.test.ts b/packages/platform-server/__e2e__/graph.socket.gateway.e2e.test.ts index 3ef36fc91..53f464862 100644 --- a/packages/platform-server/__e2e__/graph.socket.gateway.e2e.test.ts +++ b/packages/platform-server/__e2e__/graph.socket.gateway.e2e.test.ts @@ -5,6 +5,7 @@ import type { FastifyInstance } from 'fastify'; import type { AddressInfo } from 'net'; import { PassThrough } from 'node:stream'; import { io as createClient, type Socket } from 'socket.io-client'; +import type { Server as SocketIOServer } from 'socket.io'; import WebSocket, { type RawData } from 'ws'; import type { MessageKind, RunStatus } from '@prisma/client'; @@ -17,6 +18,11 @@ import { PrismaService } from '../src/core/services/prisma.service'; import { ContainerTerminalGateway } from '../src/infra/container/terminal.gateway'; import { TerminalSessionsService, type TerminalSessionRecord } from '../src/infra/container/terminal.sessions.service'; import { DockerRunnerStatusService } from '../src/infra/container/dockerRunnerStatus.service'; +import { + UI_NOTIFICATIONS_PUBLISHER, + type UiNotificationPublishRequest, + type UiNotificationsPublisher, +} from '../src/notifications/ui-notifications.publisher'; import { WorkspaceProvider, type WorkspaceKey, @@ -34,6 +40,20 @@ import type { WorkspaceLogsSession, } from '../src/workspace/runtime/workspace.runtime.provider'; +class NotificationsPublisherStub implements UiNotificationsPublisher { + private emitter: ((rooms: string[], event: string, payload: unknown) => void) | null = null; + + setEmitter(emitter: (rooms: string[], event: string, payload: unknown) => void): void { + this.emitter = emitter; + } + + async publishToRooms(request: UiNotificationPublishRequest): Promise { + if (!this.emitter) return; + const { rooms, event, payload } = request; + this.emitter(rooms, event, payload); + } +} + class LiveGraphRuntimeStub { subscribe() { return () => undefined; @@ -325,6 +345,7 @@ describe('Socket gateway real server handshakes', () => { { provide: DockerRunnerStatusService, useValue: runnerStatusStub }, EventsBusService, RunEventsService, + { provide: UI_NOTIFICATIONS_PUBLISHER, useClass: NotificationsPublisherStub }, ], }).compile(); @@ -342,6 +363,15 @@ describe('Socket gateway real server handshakes', () => { graphGateway = app.get(GraphSocketGateway); graphGateway.init({ server: fastify.server }); + const notificationsPublisher = app.get(UI_NOTIFICATIONS_PUBLISHER) as NotificationsPublisherStub; + notificationsPublisher.setEmitter((rooms, event, payload) => { + const io = (graphGateway as unknown as { io?: SocketIOServer }).io; + if (!io) throw new Error('GraphSocketGateway missing io instance'); + for (const room of rooms) { + io.to(room).emit(event, payload); + } + }); + await app.listen(0, '127.0.0.1'); const addressInfo = fastify.server.address() as AddressInfo; diff --git a/packages/platform-server/__e2e__/llmSettings.adminStatus.e2e.test.ts b/packages/platform-server/__e2e__/llmSettings.adminStatus.e2e.test.ts index 66a7adbaf..4e66dba03 100644 --- a/packages/platform-server/__e2e__/llmSettings.adminStatus.e2e.test.ts +++ b/packages/platform-server/__e2e__/llmSettings.adminStatus.e2e.test.ts @@ -15,6 +15,7 @@ describe('LLM settings controller (admin-status endpoint)', () => { agentsDbUrl: process.env.AGENTS_DATABASE_URL, litellmBaseUrl: process.env.LITELLM_BASE_URL, litellmMasterKey: process.env.LITELLM_MASTER_KEY, + notificationsGrpcAddr: process.env.NOTIFICATIONS_GRPC_ADDR, }; beforeAll(async () => { @@ -22,6 +23,7 @@ describe('LLM settings controller (admin-status endpoint)', () => { process.env.AGENTS_DATABASE_URL = 'postgres://localhost:5432/test'; process.env.LITELLM_BASE_URL = process.env.LITELLM_BASE_URL || 'http://127.0.0.1:4000'; process.env.LITELLM_MASTER_KEY = process.env.LITELLM_MASTER_KEY || 'sk-dev-master-1234'; + process.env.NOTIFICATIONS_GRPC_ADDR = process.env.NOTIFICATIONS_GRPC_ADDR || 'notifications:50051'; ConfigService.clearInstanceForTest(); ConfigService.fromEnv(); @@ -42,6 +44,11 @@ describe('LLM settings controller (admin-status endpoint)', () => { process.env.AGENTS_DATABASE_URL = previousEnv.agentsDbUrl; process.env.LITELLM_BASE_URL = previousEnv.litellmBaseUrl; process.env.LITELLM_MASTER_KEY = previousEnv.litellmMasterKey; + if (previousEnv.notificationsGrpcAddr === undefined) { + delete process.env.NOTIFICATIONS_GRPC_ADDR; + } else { + process.env.NOTIFICATIONS_GRPC_ADDR = previousEnv.notificationsGrpcAddr; + } }); it('injects ConfigService and serves admin status when LiteLLM env is configured', async () => { diff --git a/packages/platform-server/__e2e__/llmSettings.models.e2e.test.ts b/packages/platform-server/__e2e__/llmSettings.models.e2e.test.ts index dd0da50ac..84ca31d7b 100644 --- a/packages/platform-server/__e2e__/llmSettings.models.e2e.test.ts +++ b/packages/platform-server/__e2e__/llmSettings.models.e2e.test.ts @@ -15,6 +15,7 @@ describe('LLM settings controller (models endpoint)', () => { agentsDbUrl: process.env.AGENTS_DATABASE_URL, litellmBaseUrl: process.env.LITELLM_BASE_URL, litellmMasterKey: process.env.LITELLM_MASTER_KEY, + notificationsGrpcAddr: process.env.NOTIFICATIONS_GRPC_ADDR, }; beforeAll(async () => { @@ -22,6 +23,7 @@ describe('LLM settings controller (models endpoint)', () => { process.env.AGENTS_DATABASE_URL = 'postgres://localhost:5432/test'; process.env.LITELLM_BASE_URL = process.env.LITELLM_BASE_URL || 'http://127.0.0.1:4000'; process.env.LITELLM_MASTER_KEY = process.env.LITELLM_MASTER_KEY || 'sk-dev-master-1234'; + process.env.NOTIFICATIONS_GRPC_ADDR = process.env.NOTIFICATIONS_GRPC_ADDR || 'notifications:50051'; ConfigService.clearInstanceForTest(); ConfigService.fromEnv(); @@ -43,6 +45,11 @@ describe('LLM settings controller (models endpoint)', () => { process.env.AGENTS_DATABASE_URL = previousEnv.agentsDbUrl; process.env.LITELLM_BASE_URL = previousEnv.litellmBaseUrl; process.env.LITELLM_MASTER_KEY = previousEnv.litellmMasterKey; + if (previousEnv.notificationsGrpcAddr === undefined) { + delete process.env.NOTIFICATIONS_GRPC_ADDR; + } else { + process.env.NOTIFICATIONS_GRPC_ADDR = previousEnv.notificationsGrpcAddr; + } }); it('returns model list via injected service', async () => { diff --git a/packages/platform-server/__tests__/__e2e__/llmProvisioner.bootstrap.test.ts b/packages/platform-server/__tests__/__e2e__/llmProvisioner.bootstrap.test.ts index e29160bd6..b3558f43b 100644 --- a/packages/platform-server/__tests__/__e2e__/llmProvisioner.bootstrap.test.ts +++ b/packages/platform-server/__tests__/__e2e__/llmProvisioner.bootstrap.test.ts @@ -18,6 +18,7 @@ describe('LiteLLMProvisioner bootstrap (DI smoke)', () => { LITELLM_BASE_URL: 'http://127.0.0.1:4000', LITELLM_MASTER_KEY: 'sk-test', AGENTS_DATABASE_URL: 'postgresql://postgres:postgres@localhost:5432/agents_test', + NOTIFICATIONS_GRPC_ADDR: 'notifications:50051', }; beforeEach(() => { diff --git a/packages/platform-server/__tests__/config.service.fromEnv.test.ts b/packages/platform-server/__tests__/config.service.fromEnv.test.ts index d1f7adbb5..f4a15e566 100644 --- a/packages/platform-server/__tests__/config.service.fromEnv.test.ts +++ b/packages/platform-server/__tests__/config.service.fromEnv.test.ts @@ -7,6 +7,7 @@ const previousEnv: Record = { litellmBaseUrl: process.env.LITELLM_BASE_URL, litellmMasterKey: process.env.LITELLM_MASTER_KEY, agentsDbUrl: process.env.AGENTS_DATABASE_URL, + notificationsGrpcAddr: process.env.NOTIFICATIONS_GRPC_ADDR, }; describe('ConfigService.fromEnv', () => { @@ -15,6 +16,11 @@ describe('ConfigService.fromEnv', () => { process.env.LITELLM_BASE_URL = previousEnv.litellmBaseUrl; process.env.LITELLM_MASTER_KEY = previousEnv.litellmMasterKey; process.env.AGENTS_DATABASE_URL = previousEnv.agentsDbUrl; + if (previousEnv.notificationsGrpcAddr === undefined) { + delete process.env.NOTIFICATIONS_GRPC_ADDR; + } else { + process.env.NOTIFICATIONS_GRPC_ADDR = previousEnv.notificationsGrpcAddr; + } ConfigService.clearInstanceForTest(); }); @@ -23,6 +29,7 @@ describe('ConfigService.fromEnv', () => { process.env.LITELLM_BASE_URL = 'http://127.0.0.1:4000/'; process.env.LITELLM_MASTER_KEY = ' sk-dev-master-1234 '; process.env.AGENTS_DATABASE_URL = 'postgresql://agents:agents@localhost:5443/agents'; + process.env.NOTIFICATIONS_GRPC_ADDR = 'notifications:50051'; const config = ConfigService.fromEnv(); diff --git a/packages/platform-server/__tests__/vitest.setup.ts b/packages/platform-server/__tests__/vitest.setup.ts index 50ef21e8a..0ea5131d9 100644 --- a/packages/platform-server/__tests__/vitest.setup.ts +++ b/packages/platform-server/__tests__/vitest.setup.ts @@ -7,3 +7,5 @@ process.env.DOCKER_RUNNER_GRPC_HOST ||= 'docker-runner'; process.env.DOCKER_RUNNER_GRPC_PORT ||= process.env.DOCKER_RUNNER_PORT || '7171'; process.env.DOCKER_RUNNER_PORT ||= process.env.DOCKER_RUNNER_GRPC_PORT; process.env.DOCKER_RUNNER_SHARED_SECRET ||= 'test-shared-secret'; +process.env.NOTIFICATIONS_GRPC_ADDR ||= 'notifications:50051'; +process.env.NOTIFICATIONS_GRPC_DEADLINE_MS ||= '3000'; From 4016c832e99b680498bdb24053f8718891714b6d Mon Sep 17 00:00:00 2001 From: Casey Brooks Date: Sat, 28 Feb 2026 03:16:52 +0000 Subject: [PATCH 3/6] fix(grpc): align connect client to v2 --- packages/notifications/package.json | 4 +- packages/notifications/src/grpc.ts | 2 +- .../__tests__/graph.module.di.smoke.test.ts | 6 +-- packages/platform-server/package.json | 4 +- .../notifications-grpc.publisher.ts | 8 ++-- packages/platform-server/vitest.config.ts | 1 + pnpm-lock.yaml | 39 +++++++++---------- 7 files changed, 32 insertions(+), 32 deletions(-) diff --git a/packages/notifications/package.json b/packages/notifications/package.json index 50381cbf7..2fb145a5c 100644 --- a/packages/notifications/package.json +++ b/packages/notifications/package.json @@ -13,8 +13,8 @@ }, "dependencies": { "@bufbuild/protobuf": "^2.11.0", - "@connectrpc/connect": "^1.7.0", - "@connectrpc/connect-node": "^1.3.1", + "@connectrpc/connect": "^2.1.1", + "@connectrpc/connect-node": "^2.1.1", "pino": "^10.1.0", "socket.io": "^4.8.1", "uuid": "^13.0.0", diff --git a/packages/notifications/src/grpc.ts b/packages/notifications/src/grpc.ts index 6d1d35ccb..5a56a754b 100644 --- a/packages/notifications/src/grpc.ts +++ b/packages/notifications/src/grpc.ts @@ -9,7 +9,7 @@ import { NotificationBroadcaster } from './broadcaster'; import type { SocketBridge } from './socket'; import { PublishInputSchema } from './validation'; import type { JsonValue, PublishedNotification } from './types'; -import { NotificationsService } from './proto/gen/agynio/api/notifications/v1/notifications_connect.js'; +import { NotificationsService } from './proto/gen/agynio/api/notifications/v1/notifications_pb.js'; import { NotificationEnvelopeSchema, PublishResponseSchema, diff --git a/packages/platform-server/__tests__/graph.module.di.smoke.test.ts b/packages/platform-server/__tests__/graph.module.di.smoke.test.ts index 5ed56f0cc..3670a6c00 100644 --- a/packages/platform-server/__tests__/graph.module.di.smoke.test.ts +++ b/packages/platform-server/__tests__/graph.module.di.smoke.test.ts @@ -33,12 +33,12 @@ const connectMocks = vi.hoisted(() => { const publish = vi.fn().mockResolvedValue(undefined); return { publish, - createPromiseClient: vi.fn(() => ({ publish })), + createClient: vi.fn(() => ({ publish })), }; }); vi.mock('@connectrpc/connect', () => ({ - createPromiseClient: connectMocks.createPromiseClient, + createClient: connectMocks.createClient, })); const connectNodeMocks = vi.hoisted(() => ({ @@ -49,7 +49,7 @@ vi.mock('@connectrpc/connect-node', () => ({ createConnectTransport: connectNodeMocks.createConnectTransport, })); -vi.mock('../src/proto/gen/agynio/api/notifications/v1/notifications_connect', () => ({ +vi.mock('../src/proto/gen/agynio/api/notifications/v1/notifications_pb.js', () => ({ NotificationsService: { typeName: 'agynio.api.notifications.v1.NotificationsService', methods: { diff --git a/packages/platform-server/package.json b/packages/platform-server/package.json index 4d9bb69ae..23c879a0a 100644 --- a/packages/platform-server/package.json +++ b/packages/platform-server/package.json @@ -25,8 +25,8 @@ "@agyn/json-schema-to-zod": "workspace:*", "@agyn/docker-runner": "workspace:*", "@agyn/llm": "workspace:*", - "@connectrpc/connect": "^1.7.0", - "@connectrpc/connect-node": "^1.7.0", + "@connectrpc/connect": "^2.1.1", + "@connectrpc/connect-node": "^2.1.1", "@bufbuild/protobuf": "^2.11.0", "@grpc/grpc-js": "^1.12.2", "@fastify/cors": "^11.1.0", diff --git a/packages/platform-server/src/notifications/notifications-grpc.publisher.ts b/packages/platform-server/src/notifications/notifications-grpc.publisher.ts index 296966334..dc13eca04 100644 --- a/packages/platform-server/src/notifications/notifications-grpc.publisher.ts +++ b/packages/platform-server/src/notifications/notifications-grpc.publisher.ts @@ -1,7 +1,7 @@ import { Inject, Injectable } from '@nestjs/common'; -import { createPromiseClient } from '@connectrpc/connect'; +import { createClient } from '@connectrpc/connect'; import { createConnectTransport } from '@connectrpc/connect-node'; -import { NotificationsService } from '../proto/gen/agynio/api/notifications/v1/notifications_connect'; +import { NotificationsService } from '../proto/gen/agynio/api/notifications/v1/notifications_pb.js'; import type { UiNotificationPublishRequest, UiNotificationsPublisher } from './ui-notifications.publisher'; import { UI_NOTIFICATIONS_PUBLISHER } from './ui-notifications.publisher'; @@ -43,10 +43,10 @@ export class NotificationsGrpcPublisher implements UiNotificationsPublisher { constructor(@Inject(NOTIFICATIONS_PUBLISHER_CONFIG) private readonly config: NotificationsPublisherConfig) { /* - * connect-es marks generated service definitions with @ts-nocheck, so createPromiseClient() currently + * connect-es marks generated service definitions with @ts-nocheck, so createClient() currently * erases type information to `any`. We validate the publish method below before storing the client. */ - const candidate = createPromiseClient( + const candidate = createClient( NotificationsService, createConnectTransport({ baseUrl: config.baseUrl, diff --git a/packages/platform-server/vitest.config.ts b/packages/platform-server/vitest.config.ts index f7c8f8046..0ddb3f576 100644 --- a/packages/platform-server/vitest.config.ts +++ b/packages/platform-server/vitest.config.ts @@ -10,6 +10,7 @@ export default defineConfig({ DOCKER_RUNNER_GRPC_HOST: "docker-runner", DOCKER_RUNNER_GRPC_PORT: "7171", DOCKER_RUNNER_SHARED_SECRET: "test-shared-secret", + NOTIFICATIONS_GRPC_ADDR: "notifications:50051", }, fileParallelism: false, coverage: { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 85fa6f060..10c6481ea 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -134,11 +134,11 @@ importers: specifier: ^2.11.0 version: 2.11.0 '@connectrpc/connect': - specifier: ^1.7.0 - version: 1.7.0(@bufbuild/protobuf@2.11.0) + specifier: ^2.1.1 + version: 2.1.1(@bufbuild/protobuf@2.11.0) '@connectrpc/connect-node': - specifier: ^1.3.1 - version: 1.7.0(@bufbuild/protobuf@2.11.0)(@connectrpc/connect@1.7.0(@bufbuild/protobuf@2.11.0)) + specifier: ^2.1.1 + version: 2.1.1(@bufbuild/protobuf@2.11.0)(@connectrpc/connect@2.1.1(@bufbuild/protobuf@2.11.0)) pino: specifier: ^10.1.0 version: 10.1.0 @@ -189,11 +189,11 @@ importers: specifier: ^2.11.0 version: 2.11.0 '@connectrpc/connect': - specifier: ^1.7.0 - version: 1.7.0(@bufbuild/protobuf@2.11.0) + specifier: ^2.1.1 + version: 2.1.1(@bufbuild/protobuf@2.11.0) '@connectrpc/connect-node': - specifier: ^1.7.0 - version: 1.7.0(@bufbuild/protobuf@2.11.0)(@connectrpc/connect@1.7.0(@bufbuild/protobuf@2.11.0)) + specifier: ^2.1.1 + version: 2.1.1(@bufbuild/protobuf@2.11.0)(@connectrpc/connect@2.1.1(@bufbuild/protobuf@2.11.0)) '@fastify/cors': specifier: ^11.1.0 version: 11.1.0 @@ -1002,17 +1002,17 @@ packages: peerDependencies: commander: ~13.1.0 - '@connectrpc/connect-node@1.7.0': - resolution: {integrity: sha512-6vaPIkG/NyhxlYgytLoR9KYbPhczEboFB2OYWkA9qvUz1K7efXfeGrlRxoLtpa+r8VxyIOw73w5ktNe743nD+A==} - engines: {node: '>=16.0.0'} + '@connectrpc/connect-node@2.1.1': + resolution: {integrity: sha512-s3TfsI1XF+n+1z6MBS9rTnFsxxR4Rw5wmdEnkQINli81ESGxcsfaEet8duzq8LVuuCupmhUsgpRo0Nv9pZkufg==} + engines: {node: '>=20'} peerDependencies: - '@bufbuild/protobuf': ^1.10.0 - '@connectrpc/connect': 1.7.0 + '@bufbuild/protobuf': ^2.7.0 + '@connectrpc/connect': 2.1.1 - '@connectrpc/connect@1.7.0': - resolution: {integrity: sha512-iNKdJRi69YP3mq6AePRT8F/HrxWCewrhxnLMNm0vpqXAR8biwzRtO6Hjx80C6UvtKJ5sFmffQT7I4Baecz389w==} + '@connectrpc/connect@2.1.1': + resolution: {integrity: sha512-JzhkaTvM73m2K1URT6tv53k2RwngSmCXLZJgK580qNQOXRzZRR/BCMfZw3h+90JpnG6XksP5bYT+cz0rpUzUWQ==} peerDependencies: - '@bufbuild/protobuf': ^1.10.0 + '@bufbuild/protobuf': ^2.7.0 '@cspotcode/source-map-support@0.8.1': resolution: {integrity: sha512-IchNf6dN4tHoMFIn/7OE8LWZ19Y6q/67Bmf6vnGREv8RSbBVb9LPJxEcnwrcwX6ixSvaiGoomAUvu4YSxXrVgw==} @@ -8866,13 +8866,12 @@ snapshots: dependencies: commander: 13.1.0 - '@connectrpc/connect-node@1.7.0(@bufbuild/protobuf@2.11.0)(@connectrpc/connect@1.7.0(@bufbuild/protobuf@2.11.0))': + '@connectrpc/connect-node@2.1.1(@bufbuild/protobuf@2.11.0)(@connectrpc/connect@2.1.1(@bufbuild/protobuf@2.11.0))': dependencies: '@bufbuild/protobuf': 2.11.0 - '@connectrpc/connect': 1.7.0(@bufbuild/protobuf@2.11.0) - undici: 6.19.8 + '@connectrpc/connect': 2.1.1(@bufbuild/protobuf@2.11.0) - '@connectrpc/connect@1.7.0(@bufbuild/protobuf@2.11.0)': + '@connectrpc/connect@2.1.1(@bufbuild/protobuf@2.11.0)': dependencies: '@bufbuild/protobuf': 2.11.0 From ab455071c99dc828a37c15b1aea4c0eb71ea1bd5 Mon Sep 17 00:00:00 2001 From: Casey Brooks Date: Sat, 28 Feb 2026 04:12:20 +0000 Subject: [PATCH 4/6] feat(notifications): route events via redis --- docker-compose.yml | 21 +- docs/product-spec.md | 1 + docs/technical-overview.md | 2 +- .../__tests__/grpc.server.test.ts | 127 +++++++++++ .../__tests__/redis-notifications.test.ts | 132 ++++++++++++ packages/notifications/package.json | 2 +- packages/notifications/src/config.ts | 34 +-- packages/notifications/src/grpc.ts | 49 +++-- packages/notifications/src/main.ts | 19 +- .../notifications/src/redis-notifications.ts | 202 ++++++++++++++++++ packages/notifications/src/socket.ts | 143 ------------- .../notifications-grpc.publisher.ts | 44 +--- pnpm-lock.yaml | 70 +++++- 13 files changed, 610 insertions(+), 236 deletions(-) create mode 100644 packages/notifications/__tests__/grpc.server.test.ts create mode 100644 packages/notifications/__tests__/redis-notifications.test.ts create mode 100644 packages/notifications/src/redis-notifications.ts delete mode 100644 packages/notifications/src/socket.ts diff --git a/docker-compose.yml b/docker-compose.yml index afcb3623a..de12459a4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -247,18 +247,35 @@ services: networks: - agents_net + redis: + image: redis:7-alpine + container_name: redis + restart: unless-stopped + ports: + - "6379:6379" + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + networks: + - agents_net + notifications: build: context: . dockerfile: packages/notifications/Dockerfile restart: unless-stopped + depends_on: + redis: + condition: service_healthy environment: NOTIFICATIONS_HOST: 0.0.0.0 NOTIFICATIONS_GRPC_PORT: 50051 - NOTIFICATIONS_SOCKET_PORT: 4000 + NOTIFICATIONS_REDIS_URL: redis://redis:6379 + NOTIFICATIONS_CHANNEL: notifications.v1 ports: - "50051:50051" - - "4000:4000" networks: - agents_net diff --git a/docs/product-spec.md b/docs/product-spec.md index 878de3216..e6fa46ab3 100644 --- a/docs/product-spec.md +++ b/docs/product-spec.md @@ -141,6 +141,7 @@ Runbooks - Verify: curl http://localhost:3010/api/templates; open UI; connect socket to observe node_status when provisioning. - Docker Compose stack - Services: postgres, vault (auto-init), registry-mirror. + - Notifications: gRPC-only service publishes to Redis (`NOTIFICATIONS_REDIS_URL`, default `redis://redis:6379`) on channel `NOTIFICATIONS_CHANNEL` (default `notifications.v1`); Socket.IO fan-out is handled by the gateway. - Observability: Tracing services have been removed; follow upcoming observability docs for replacements. - Vault init: vault/auto-init.sh populates root token/unseal keys; set VAULT_ENABLED=true and VAULT_ADDR/VAULT_TOKEN. - Postgres checkpointer: LANGGRAPH_CHECKPOINTER defaults to postgres; configure POSTGRES_URL for the checkpointer connection. diff --git a/docs/technical-overview.md b/docs/technical-overview.md index 1380a84dc..92f216120 100644 --- a/docs/technical-overview.md +++ b/docs/technical-overview.md @@ -37,7 +37,7 @@ Design principles Layers - Application server: wires services, loads persisted graph, and publishes realtime graph events over gRPC. -- Notifications service: receives gRPC notifications from the platform-server and re-broadcasts them via Socket.IO to UI clients. +- Notifications service: receives gRPC notifications from the platform-server, publishes them to Redis, and the notifications gateway fan-outs the Redis channel to Socket.IO clients. - Graph runtime: live diff/apply engine enforcing reversible edges via ports and template registries. - Templates: declarative registration of node factories and their ports. - Triggers: external event sources (Slack, PR polling) that push messages into agents. diff --git a/packages/notifications/__tests__/grpc.server.test.ts b/packages/notifications/__tests__/grpc.server.test.ts new file mode 100644 index 000000000..01f06a3af --- /dev/null +++ b/packages/notifications/__tests__/grpc.server.test.ts @@ -0,0 +1,127 @@ +import { randomUUID } from 'node:crypto'; +import pino from 'pino'; +import { describe, expect, it } from 'vitest'; +import { create } from '@bufbuild/protobuf'; +import type { HandlerContext } from '@connectrpc/connect'; +import { GrpcServer } from '../src/grpc'; +import type { NotificationFanout } from '../src/redis-notifications'; +import type { PublishedNotification } from '../src/types'; +import { + PublishRequestSchema, + SubscribeRequestSchema, +} from '../src/proto/gen/agynio/api/notifications/v1/notifications_pb.js'; + +class StubFanout implements NotificationFanout { + readonly published: PublishedNotification[] = []; + private readonly listeners = new Set<(notification: PublishedNotification) => void>(); + + async publish(notification: PublishedNotification): Promise { + this.published.push(notification); + for (const listener of this.listeners) { + listener(notification); + } + } + + subscribe(listener: (notification: PublishedNotification) => void): () => void { + this.listeners.add(listener); + return () => { + this.listeners.delete(listener); + }; + } + + emit(notification: PublishedNotification): void { + for (const listener of this.listeners) { + listener(notification); + } + } +} + +const makeServer = (fanout: StubFanout) => + new GrpcServer({ + host: '127.0.0.1', + port: 0, + notifications: fanout, + logger: pino({ level: 'silent' }), + }); + +const makeContext = (): HandlerContext => ({ + signal: new AbortController().signal, + values: new Map(), + requestHeader: new Headers(), +}); + +describe('GrpcServer publish', () => { + it('rejects invalid publish requests', async () => { + const fanout = new StubFanout(); + const server = makeServer(fanout); + const request = create(PublishRequestSchema, { + event: 'agent.updated', + rooms: [], + source: 'platform-server', + }); + + await expect(server.publish(request, makeContext())).rejects.toThrowError('invalid publish request'); + await server.close(); + }); + + it('publishes notifications with generated identifiers', async () => { + const fanout = new StubFanout(); + const server = makeServer(fanout); + const request = create(PublishRequestSchema, { + event: 'agent.updated', + rooms: ['graph'], + source: 'platform-server', + payload: { status: 'ready' }, + }); + + const response = await server.publish(request, makeContext()); + + expect(response.id).toMatch(/[0-9a-f-]{36}/i); + expect(fanout.published).toHaveLength(1); + expect(fanout.published[0]).toMatchObject({ + event: 'agent.updated', + rooms: ['graph'], + source: 'platform-server', + payload: { status: 'ready' }, + }); + expect(fanout.published[0]?.id).toBe(response.id); + + await server.close(); + }); +}); + +describe('GrpcServer subscribe', () => { + it('streams redis-delivered notifications to subscribers', async () => { + const fanout = new StubFanout(); + const server = makeServer(fanout); + const abortController = new AbortController(); + const context = { signal: abortController.signal }; + const request = create(SubscribeRequestSchema, {}); + + const iterable = server.subscribe(request, context); + const iterator = iterable[Symbol.asyncIterator](); + + const pending = iterator.next(); + const notification: PublishedNotification = { + id: randomUUID(), + event: 'agent.updated', + rooms: ['graph'], + source: 'platform-server', + payload: { status: 'ready' }, + createdAt: new Date('2025-01-02T03:04:05Z'), + }; + fanout.emit(notification); + + const result = await pending; + expect(result.done).toBe(false); + expect(result.value?.envelope?.event).toBe('agent.updated'); + expect(result.value?.envelope?.rooms).toEqual(['graph']); + expect(result.value?.envelope?.source).toBe('platform-server'); + + abortController.abort(); + const final = await iterator.next(); + expect(final.done).toBe(true); + + await server.close(); + }); +}); diff --git a/packages/notifications/__tests__/redis-notifications.test.ts b/packages/notifications/__tests__/redis-notifications.test.ts new file mode 100644 index 000000000..eeb0fe916 --- /dev/null +++ b/packages/notifications/__tests__/redis-notifications.test.ts @@ -0,0 +1,132 @@ +import { EventEmitter } from 'node:events'; +import pino from 'pino'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { RedisNotificationBus } from '../src/redis-notifications'; +import type { PublishedNotification } from '../src/types'; + +class StubRedis extends EventEmitter { + published: { channel: string; message: string }[] = []; + subscribedChannel: string | null = null; + connected = false; + quitCalled = false; + disconnectCalled = false; + + async connect(): Promise { + this.connected = true; + } + + async subscribe(channel: string): Promise { + this.subscribedChannel = channel; + return 1; + } + + async unsubscribe(channel: string): Promise { + if (this.subscribedChannel === channel) { + this.subscribedChannel = null; + } + return 1; + } + + async publish(channel: string, message: string): Promise { + this.published.push({ channel, message }); + return 1; + } + + async quit(): Promise { + this.quitCalled = true; + this.connected = false; + } + + disconnect(): void { + this.disconnectCalled = true; + this.connected = false; + } + + override off(event: string | symbol, listener: (...args: unknown[]) => void): this { + return super.off(event, listener); + } +} + +describe('RedisNotificationBus', () => { + let publisher: StubRedis; + let subscriber: StubRedis; + let bus: RedisNotificationBus; + + beforeEach(() => { + publisher = new StubRedis(); + subscriber = new StubRedis(); + bus = new RedisNotificationBus({ + redisUrl: 'redis://localhost:6379', + channel: 'notifications.v1', + logger: pino({ level: 'silent' }), + createClient: (label) => (label === 'publisher' ? publisher : subscriber), + }); + }); + + afterEach(async () => { + await bus.close(); + }); + + it('publishes notifications to redis and forwards to listeners', async () => { + const received: PublishedNotification[] = []; + bus.subscribe((notification) => received.push(notification)); + + await bus.start(); + + const notification: PublishedNotification = { + id: '11111111-1111-4111-8111-111111111111', + event: 'agent.updated', + rooms: ['graph'], + source: 'platform-server', + payload: { status: 'ready' }, + createdAt: new Date('2025-01-02T03:04:05.000Z'), + }; + + await bus.publish(notification); + + expect(publisher.published).toHaveLength(1); + const messagePayload = JSON.parse(publisher.published[0]?.message ?? '{}'); + expect(messagePayload).toMatchObject({ + id: notification.id, + tsIso: notification.createdAt.toISOString(), + source: notification.source, + event: notification.event, + rooms: notification.rooms, + payload: notification.payload, + }); + + subscriber.emit('message', 'notifications.v1', publisher.published[0]?.message ?? ''); + + expect(received).toHaveLength(1); + expect(received[0]).toMatchObject({ + id: notification.id, + source: notification.source, + event: notification.event, + rooms: notification.rooms, + payload: notification.payload, + }); + expect(received[0]?.createdAt.toISOString()).toBe(notification.createdAt.toISOString()); + }); + + it('drops malformed messages without notifying listeners', async () => { + const listener = vi.fn(); + bus.subscribe(listener); + + await bus.start(); + + subscriber.emit('message', 'notifications.v1', 'not json'); + subscriber.emit('message', 'notifications.v1', JSON.stringify({ id: 'bad-id' })); + + expect(listener).not.toHaveBeenCalled(); + }); + + it('unsubscribes and closes redis clients on shutdown', async () => { + await bus.start(); + + await bus.close(); + + expect(subscriber.subscribedChannel).toBeNull(); + expect(publisher.quitCalled).toBe(true); + expect(publisher.connected).toBe(false); + }); +}); diff --git a/packages/notifications/package.json b/packages/notifications/package.json index 2fb145a5c..fad3295a1 100644 --- a/packages/notifications/package.json +++ b/packages/notifications/package.json @@ -15,8 +15,8 @@ "@bufbuild/protobuf": "^2.11.0", "@connectrpc/connect": "^2.1.1", "@connectrpc/connect-node": "^2.1.1", + "ioredis": "^5.4.2", "pino": "^10.1.0", - "socket.io": "^4.8.1", "uuid": "^13.0.0", "zod": "^4.1.9" }, diff --git a/packages/notifications/src/config.ts b/packages/notifications/src/config.ts index cbc334680..0fce38a46 100644 --- a/packages/notifications/src/config.ts +++ b/packages/notifications/src/config.ts @@ -6,20 +6,18 @@ const envSchema = z.object({ .int() .positive() .default(50_051), - NOTIFICATIONS_SOCKET_PORT: z.coerce - .number() - .int() - .positive() - .default(4_000), NOTIFICATIONS_HOST: z .string() .default('0.0.0.0') .transform((value) => value.trim().length > 0 ? value.trim() : '0.0.0.0'), - NOTIFICATIONS_SOCKET_PATH: z + NOTIFICATIONS_REDIS_URL: z .string() - .default('/socket.io') - .transform((value) => (value.startsWith('/') ? value : `/${value}`)), - NOTIFICATIONS_SOCKET_CORS_ORIGINS: z.string().optional(), + .min(1, 'NOTIFICATIONS_REDIS_URL is required') + .transform((value) => value.trim()), + NOTIFICATIONS_CHANNEL: z + .string() + .default('notifications.v1') + .transform((value) => value.trim().length > 0 ? value.trim() : 'notifications.v1'), LOG_LEVEL: z .string() .default('info') @@ -28,29 +26,19 @@ const envSchema = z.object({ export type Config = { grpcPort: number; - socketPort: number; host: string; - socketPath: string; - socketCorsOrigins: string[]; + redisUrl: string; + redisChannel: string; logLevel: string; }; -const parseCorsOrigins = (value: string | undefined): string[] => { - if (!value) return []; - return value - .split(',') - .map((part) => part.trim()) - .filter((part) => part.length > 0); -}; - export const loadConfig = (): Config => { const env = envSchema.parse(process.env); return { grpcPort: env.NOTIFICATIONS_GRPC_PORT, - socketPort: env.NOTIFICATIONS_SOCKET_PORT, host: env.NOTIFICATIONS_HOST, - socketPath: env.NOTIFICATIONS_SOCKET_PATH, - socketCorsOrigins: parseCorsOrigins(env.NOTIFICATIONS_SOCKET_CORS_ORIGINS), + redisUrl: env.NOTIFICATIONS_REDIS_URL, + redisChannel: env.NOTIFICATIONS_CHANNEL, logLevel: env.LOG_LEVEL, }; }; diff --git a/packages/notifications/src/grpc.ts b/packages/notifications/src/grpc.ts index 5a56a754b..3fbff1044 100644 --- a/packages/notifications/src/grpc.ts +++ b/packages/notifications/src/grpc.ts @@ -5,10 +5,9 @@ import { timestampFromDate } from '@bufbuild/protobuf/wkt'; import { ConnectError, Code, type HandlerContext } from '@connectrpc/connect'; import { connectNodeAdapter } from '@connectrpc/connect-node'; import { createServer, type Http2Server } from 'node:http2'; -import { NotificationBroadcaster } from './broadcaster'; -import type { SocketBridge } from './socket'; import { PublishInputSchema } from './validation'; import type { JsonValue, PublishedNotification } from './types'; +import type { NotificationFanout } from './redis-notifications'; import { NotificationsService } from './proto/gen/agynio/api/notifications/v1/notifications_pb.js'; import { NotificationEnvelopeSchema, @@ -24,8 +23,7 @@ import { type GrpcServerOptions = { host: string; port: number; - broadcaster: NotificationBroadcaster; - socket: SocketBridge; + notifications: NotificationFanout; logger: Logger; }; @@ -73,7 +71,7 @@ export class GrpcServer { constructor(private readonly options: GrpcServerOptions) { this.logger = options.logger.child({ scope: 'grpc' }); - const publishImpl = async (request: PublishRequest): Promise => { + const publishImpl = async (request: PublishRequest, _context: HandlerContext): Promise => { const parsed = PublishInputSchema.safeParse({ event: request.event, rooms: request.rooms, @@ -97,8 +95,19 @@ export class GrpcServer { }; this.logger.debug({ event: notification.event, rooms: notification.rooms }, 'publish request accepted'); - this.options.socket.broadcast(notification); - this.options.broadcaster.publish(notification); + try { + await this.options.notifications.publish(notification); + } catch (error) { + this.logger.error( + { + event: notification.event, + rooms: notification.rooms, + error: error instanceof Error ? { name: error.name, message: error.message } : { message: String(error) }, + }, + 'publish request failed', + ); + throw new ConnectError('failed to publish notification', Code.Internal); + } return create(PublishResponseSchema, { id: notification.id, @@ -135,7 +144,7 @@ export class GrpcServer { } }; - const unsubscribe = this.options.broadcaster.subscribe(push); + const unsubscribe = this.options.notifications.subscribe(push); const abortHandler = (): void => { stop(); }; @@ -166,11 +175,14 @@ export class GrpcServer { return iterator; }; + this.publish = publishImpl; + this.subscribe = subscribeImpl; + const handler = connectNodeAdapter({ routes: (router) => { - router.service(NotificationsService as unknown as never, { - publish: publishImpl as unknown as never, - subscribe: subscribeImpl as unknown as never, + router.service(NotificationsService, { + publish: this.publish, + subscribe: this.subscribe, }); }, }); @@ -181,6 +193,10 @@ export class GrpcServer { this.server = server; } + readonly publish: (request: PublishRequest, context: HandlerContext) => Promise; + + readonly subscribe: (request: SubscribeRequest, context: HandlerContext) => AsyncIterable; + async start(): Promise { const { host, port } = this.options; await new Promise((resolve) => { @@ -192,11 +208,16 @@ export class GrpcServer { async close(): Promise { await new Promise((resolve, reject) => { this.server.close((error: Error | undefined) => { - if (error) { - reject(error); - } else { + if (!error) { + resolve(); + return; + } + const errorWithCode = error as NodeJS.ErrnoException; + if (errorWithCode.code === 'ERR_SERVER_NOT_RUNNING') { resolve(); + return; } + reject(error); }); }); } diff --git a/packages/notifications/src/main.ts b/packages/notifications/src/main.ts index cc7b8edef..9dd9bc355 100644 --- a/packages/notifications/src/main.ts +++ b/packages/notifications/src/main.ts @@ -1,30 +1,25 @@ import { loadConfig } from './config'; import { createLogger } from './logger'; -import { NotificationBroadcaster } from './broadcaster'; -import { SocketBridge } from './socket'; import { GrpcServer } from './grpc'; +import { RedisNotificationBus } from './redis-notifications'; const config = loadConfig(); const logger = createLogger(config.logLevel); -const broadcaster = new NotificationBroadcaster(logger); -const socket = new SocketBridge({ - host: config.host, - port: config.socketPort, - path: config.socketPath, - corsOrigins: config.socketCorsOrigins, +const notifications = new RedisNotificationBus({ + channel: config.redisChannel, + redisUrl: config.redisUrl, logger, }); const grpc = new GrpcServer({ host: config.host, port: config.grpcPort, - broadcaster, - socket, + notifications, logger, }); const start = async () => { try { - await socket.start(); + await notifications.start(); await grpc.start(); logger.info('notifications service started'); } catch (error) { @@ -36,7 +31,7 @@ const start = async () => { const shutdown = async (signal: string) => { logger.info({ signal }, 'shutting down notifications service'); try { - await Promise.all([grpc.close(), socket.close()]); + await Promise.all([grpc.close(), notifications.close()]); logger.info('notifications service stopped'); process.exit(0); } catch (error) { diff --git a/packages/notifications/src/redis-notifications.ts b/packages/notifications/src/redis-notifications.ts new file mode 100644 index 000000000..cd064a2ba --- /dev/null +++ b/packages/notifications/src/redis-notifications.ts @@ -0,0 +1,202 @@ +import Redis from 'ioredis'; +import type { Logger } from 'pino'; +import { z } from 'zod'; +import { NotificationBroadcaster } from './broadcaster'; +import type { JsonObject, PublishedNotification } from './types'; +import { RoomSchema } from './validation'; + +export type NotificationFanout = { + publish(notification: PublishedNotification): Promise; + subscribe(listener: (notification: PublishedNotification) => void): () => void; +}; + +type RedisNotificationBusOptions = { + redisUrl: string; + channel: string; + logger: Logger; + createClient?: (label: 'publisher' | 'subscriber') => Redis; +}; + +const EnvelopeSchema = z + .object({ + id: z.string().uuid(), + tsIso: z.string().datetime(), + source: z.string().min(1), + event: z.string().min(1), + rooms: z.array(RoomSchema).min(1), + payload: z.record(z.string(), z.unknown()).optional(), + }) + .strict(); + +type Envelope = z.infer; + +const isJsonObject = (value: unknown): value is Record => + value !== null && typeof value === 'object' && !Array.isArray(value); + +const isJsonValue = (value: unknown): boolean => { + if (value === null) return true; + const valueType = typeof value; + if (valueType === 'string' || valueType === 'number' || valueType === 'boolean') return true; + if (Array.isArray(value)) return value.every(isJsonValue); + if (isJsonObject(value)) { + for (const entry of Object.values(value)) { + if (!isJsonValue(entry)) return false; + } + return true; + } + return false; +}; + +const normalizeJsonObject = (value: Record): JsonObject | null => { + const result: JsonObject = {}; + for (const [key, entry] of Object.entries(value)) { + if (!isJsonValue(entry)) { + return null; + } + result[key] = entry as JsonObject[string]; + } + return result; +}; + +export class RedisNotificationBus implements NotificationFanout { + private readonly publisher: Redis; + private readonly subscriber: Redis; + private readonly broadcaster: NotificationBroadcaster; + private readonly logger: Logger; + private subscribed = false; + private readonly handleMessageBound: (channel: string, message: string) => void; + + constructor(private readonly options: RedisNotificationBusOptions) { + const createClient = options.createClient ?? ((label: 'publisher' | 'subscriber') => new Redis(options.redisUrl, { + lazyConnect: true, + name: `notifications-${label}`, + autoResubscribe: true, + autoResendUnfulfilledCommands: true, + })); + + this.logger = options.logger.child({ scope: 'redis' }); + this.publisher = createClient('publisher'); + this.subscriber = createClient('subscriber'); + this.broadcaster = new NotificationBroadcaster(this.logger.child({ scope: 'fanout' })); + this.handleMessageBound = (channel: string, message: string) => this.handleMessage(channel, message); + + this.subscriber.on('message', this.handleMessageBound); + this.subscriber.on('ready', () => { + this.logger.info({ channel: this.options.channel }, 'redis subscriber ready'); + }); + this.subscriber.on('reconnecting', (delay: number) => { + this.logger.warn({ delay }, 'redis subscriber reconnecting'); + }); + this.subscriber.on('end', () => { + this.logger.error('redis subscriber connection closed'); + }); + this.subscriber.on('error', (error: Error) => { + this.logger.error({ error: { name: error.name, message: error.message } }, 'redis subscriber error'); + }); + this.publisher.on('error', (error: Error) => { + this.logger.error({ error: { name: error.name, message: error.message } }, 'redis publisher error'); + }); + } + + async start(): Promise { + if (this.subscribed) return; + await Promise.all([this.publisher.connect(), this.subscriber.connect()]); + await this.subscriber.subscribe(this.options.channel); + this.subscribed = true; + this.logger.info({ channel: this.options.channel }, 'redis notifications subscribed'); + } + + async close(): Promise { + this.subscriber.off('message', this.handleMessageBound); + if (this.subscribed) { + try { + await this.subscriber.unsubscribe(this.options.channel); + } catch (error) { + this.logger.warn( + { error: error instanceof Error ? { name: error.name, message: error.message } : { message: String(error) } }, + 'failed to unsubscribe from redis channel', + ); + } + } + await Promise.all([this.quitSafely(this.subscriber), this.quitSafely(this.publisher)]); + this.subscribed = false; + } + + async publish(notification: PublishedNotification): Promise { + if (!this.subscribed) { + throw new Error('redis notification bus not started'); + } + const envelope: Envelope = { + id: notification.id, + tsIso: notification.createdAt.toISOString(), + source: notification.source, + event: notification.event, + rooms: notification.rooms, + payload: notification.payload, + }; + const payload = JSON.stringify(envelope); + await this.publisher.publish(this.options.channel, payload); + } + + subscribe(listener: (notification: PublishedNotification) => void): () => void { + return this.broadcaster.subscribe(listener); + } + + private handleMessage(channel: string, message: string): void { + if (channel !== this.options.channel) { + this.logger.warn({ channel }, 'received notification for unexpected channel'); + return; + } + + let parsedJson: unknown; + try { + parsedJson = JSON.parse(message); + } catch (error) { + this.logger.warn( + { error: error instanceof Error ? { name: error.name, message: error.message } : { message: String(error) } }, + 'failed to parse notification payload', + ); + return; + } + + const parsed = EnvelopeSchema.safeParse(parsedJson); + if (!parsed.success) { + this.logger.warn({ issues: parsed.error.issues }, 'invalid notification payload'); + return; + } + + const { id, tsIso, source, event, rooms, payload } = parsed.data; + let payloadObject: JsonObject | undefined; + if (payload) { + const normalized = normalizeJsonObject(payload); + if (!normalized) { + this.logger.warn({ id }, 'invalid notification payload: contains non-JSON values'); + return; + } + payloadObject = normalized; + } + const createdAt = new Date(tsIso); + const notification: PublishedNotification = { + id, + source, + event, + rooms, + payload: payloadObject, + createdAt, + }; + + this.broadcaster.publish(notification); + } + + private async quitSafely(client: Redis): Promise { + try { + await client.quit(); + } catch (error) { + this.logger.warn( + { error: error instanceof Error ? { name: error.name, message: error.message } : { message: String(error) } }, + 'redis client quit failed, forcing disconnect', + ); + client.disconnect(false); + } + } +} diff --git a/packages/notifications/src/socket.ts b/packages/notifications/src/socket.ts deleted file mode 100644 index 9c59270fe..000000000 --- a/packages/notifications/src/socket.ts +++ /dev/null @@ -1,143 +0,0 @@ -import { createServer, type Server as HttpServer } from 'node:http'; -import type { IncomingHttpHeaders } from 'node:http'; -import { Server as SocketIOServer, type ServerOptions, type Socket } from 'socket.io'; -import type { Logger } from 'pino'; -import { SubscribePayloadSchema } from './validation'; -import type { PublishedNotification } from './types'; - -type SocketBridgeOptions = { - host: string; - port: number; - path: string; - corsOrigins: string[]; - logger: Logger; -}; - -export class SocketBridge { - private readonly httpServer: HttpServer; - private readonly io: SocketIOServer; - private readonly logger: Logger; - - constructor(private readonly options: SocketBridgeOptions) { - this.logger = options.logger.child({ scope: 'socket' }); - this.httpServer = createServer(); - const serverOptions: Partial = { - path: options.path, - transports: ['websocket'], - cors: { - origin: options.corsOrigins.length > 0 ? options.corsOrigins : '*', - methods: ['GET', 'POST', 'OPTIONS'], - credentials: false, - }, - allowRequest: (_req, callback) => { - callback(null, true); - }, - } satisfies Partial; - this.io = new SocketIOServer(this.httpServer, serverOptions); - this.io.on('connection', (socket) => this.handleConnection(socket)); - } - - async start(): Promise { - const { host, port } = this.options; - await new Promise((resolve) => { - this.httpServer.listen(port, host, () => resolve()); - }); - this.logger.info({ host, port, path: this.options.path }, 'socket server listening'); - } - - async close(): Promise { - await new Promise((resolve) => { - this.io.close(() => resolve()); - }); - await new Promise((resolve) => { - this.httpServer.close(() => resolve()); - }); - } - - broadcast(notification: PublishedNotification): void { - for (const room of notification.rooms) { - try { - this.io.to(room).emit(notification.event, notification.payload ?? {}); - } catch (error) { - this.logger.warn( - { - room, - event: notification.event, - error: error instanceof Error ? { name: error.name, message: error.message } : { message: String(error) }, - }, - 'socket emit failure', - ); - } - } - } - - private handleConnection(socket: Socket): void { - this.logger.info( - { - id: socket.id, - headers: this.sanitizeHeaders(socket.request.headers), - query: this.sanitizeQuery(socket.handshake.query as Record | undefined), - }, - 'client connected', - ); - - socket.on('subscribe', (payload: unknown, ack?: (response: unknown) => void) => { - const parsed = SubscribePayloadSchema.safeParse(payload); - if (!parsed.success) { - const issues = parsed.error.issues.map((issue) => ({ - path: issue.path, - message: issue.message, - code: issue.code, - })); - this.logger.warn({ socketId: socket.id, issues }, 'subscribe rejected'); - if (typeof ack === 'function') { - ack({ ok: false, error: 'invalid_payload', issues }); - } - return; - } - - const rooms = parsed.data.rooms ?? (parsed.data.room ? [parsed.data.room] : []); - for (const room of rooms) { - if (room.length > 0) socket.join(room); - } - if (typeof ack === 'function') { - ack({ ok: true, rooms }); - } - }); - - socket.on('error', (error) => { - this.logger.warn( - { - socketId: socket.id, - error: error instanceof Error ? { name: error.name, message: error.message } : { message: String(error) }, - }, - 'socket error', - ); - }); - - socket.on('disconnect', (reason) => { - this.logger.debug({ socketId: socket.id, reason }, 'client disconnected'); - }); - } - - private sanitizeHeaders(headers: IncomingHttpHeaders | undefined): Record { - if (!headers) return {}; - const sensitive = new Set(['authorization', 'cookie', 'set-cookie']); - const sanitized: Record = {}; - for (const [key, value] of Object.entries(headers)) { - if (!key) continue; - sanitized[key] = sensitive.has(key.toLowerCase()) ? '[REDACTED]' : value; - } - return sanitized; - } - - private sanitizeQuery(query: Record | undefined): Record { - if (!query) return {}; - const sensitive = new Set(['token', 'authorization', 'auth', 'api_key', 'access_token']); - const sanitized: Record = {}; - for (const [key, value] of Object.entries(query)) { - sanitized[key] = key && sensitive.has(key.toLowerCase()) ? '[REDACTED]' : value; - } - return sanitized; - } -} diff --git a/packages/platform-server/src/notifications/notifications-grpc.publisher.ts b/packages/platform-server/src/notifications/notifications-grpc.publisher.ts index dc13eca04..11f71390e 100644 --- a/packages/platform-server/src/notifications/notifications-grpc.publisher.ts +++ b/packages/platform-server/src/notifications/notifications-grpc.publisher.ts @@ -1,5 +1,5 @@ import { Inject, Injectable } from '@nestjs/common'; -import { createClient } from '@connectrpc/connect'; +import { createClient, type Client } from '@connectrpc/connect'; import { createConnectTransport } from '@connectrpc/connect-node'; import { NotificationsService } from '../proto/gen/agynio/api/notifications/v1/notifications_pb.js'; import type { UiNotificationPublishRequest, UiNotificationsPublisher } from './ui-notifications.publisher'; @@ -17,46 +17,16 @@ function isJsonRecord(value: unknown): value is Record { return value !== null && typeof value === 'object' && !Array.isArray(value); } -type NotificationsPublishClient = { - publish( - request: { - event: string; - rooms: string[]; - payload?: Record | undefined; - source?: string; - }, - options?: { signal?: AbortSignal }, - ): Promise; -}; - -function isNotificationsPublishClient(value: unknown): value is NotificationsPublishClient { - return ( - typeof value === 'object' && - value !== null && - typeof (value as { publish?: unknown }).publish === 'function' - ); -} - @Injectable() export class NotificationsGrpcPublisher implements UiNotificationsPublisher { - private readonly client: NotificationsPublishClient; + private readonly client: Client; constructor(@Inject(NOTIFICATIONS_PUBLISHER_CONFIG) private readonly config: NotificationsPublisherConfig) { - /* - * connect-es marks generated service definitions with @ts-nocheck, so createClient() currently - * erases type information to `any`. We validate the publish method below before storing the client. - */ - const candidate = createClient( - NotificationsService, - createConnectTransport({ - baseUrl: config.baseUrl, - httpVersion: '2', - }), - ); - if (!isNotificationsPublishClient(candidate)) { - throw new Error('Notifications gRPC client missing publish implementation'); - } - this.client = candidate; + const transport = createConnectTransport({ + baseUrl: config.baseUrl, + httpVersion: '2', + }); + this.client = createClient(NotificationsService, transport); } async publishToRooms(request: UiNotificationPublishRequest): Promise { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 10c6481ea..535bc2d8b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -139,12 +139,12 @@ importers: '@connectrpc/connect-node': specifier: ^2.1.1 version: 2.1.1(@bufbuild/protobuf@2.11.0)(@connectrpc/connect@2.1.1(@bufbuild/protobuf@2.11.0)) + ioredis: + specifier: ^5.4.2 + version: 5.10.0 pino: specifier: ^10.1.0 version: 10.1.0 - socket.io: - specifier: ^4.8.1 - version: 4.8.1 uuid: specifier: ^13.0.0 version: 13.0.0 @@ -1582,6 +1582,9 @@ packages: '@types/node': optional: true + '@ioredis/commands@1.5.1': + resolution: {integrity: sha512-JH8ZL/ywcJyR9MmJ5BNqZllXNZQqQbnVZOqpPQqE1vHiFgAw4NHbvE0FOduNU8IX9babitBT46571OnPTT0Zcw==} + '@isaacs/cliui@8.0.2': resolution: {integrity: sha512-O8jcjabXaleOG9DQ0+ARXWZBTfnP4WNAqzuiJK7ll44AmxGKv/J2M4TPjxjY3znBCfvBXFzucm1twdyFybFqEA==} engines: {node: '>=12'} @@ -4666,6 +4669,10 @@ packages: resolution: {integrity: sha512-eYm0QWBtUrBWZWG0d386OGAw16Z995PiOVo2B7bjWSbHedGl5e0ZWaq65kOGgUSNesEIDkB9ISbTg/JK9dhCZA==} engines: {node: '>=6'} + cluster-key-slot@1.1.2: + resolution: {integrity: sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==} + engines: {node: '>=0.10.0'} + cmdk@1.1.1: resolution: {integrity: sha512-Vsv7kFaXm+ptHDMZ7izaRsP70GgrW9NBNGswt9OZaVBLlE0SNpDq8eu/VGXyF9r7M0azK3Wy7OlYXsuyYLFzHg==} peerDependencies: @@ -4975,6 +4982,10 @@ packages: resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==} engines: {node: '>=0.4.0'} + denque@2.1.0: + resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==} + engines: {node: '>=0.10'} + depd@2.0.0: resolution: {integrity: sha512-g7nH6P6dyDioJogAAGprGpCtVImJhpPk/roCzdb3fIh61/s/nPsfR6onyMwkCAR/OlC3yBC0lESvUoQEAssIrw==} engines: {node: '>= 0.8'} @@ -5836,6 +5847,10 @@ packages: resolution: {integrity: sha512-5Hh7Y1wQbvY5ooGgPbDaL5iYLAPzMTUrjMulskHLH6wnv/A+1q5rgEaiuqEjB+oxGXIVZs1FF+R/KPN3ZSQYYg==} engines: {node: '>=12'} + ioredis@5.10.0: + resolution: {integrity: sha512-HVBe9OFuqs+Z6n64q09PQvP1/R4Bm+30PAyyD4wIEqssh3v9L21QjCVk4kRLucMBcDokJTcLjsGeVRlq/nH6DA==} + engines: {node: '>=12.22.0'} + ipaddr.js@1.9.1: resolution: {integrity: sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g==} engines: {node: '>= 0.10'} @@ -6439,9 +6454,15 @@ packages: lodash.camelcase@4.3.0: resolution: {integrity: sha512-TwuEnCnxbc3rAvhf/LbG7tJUDzhqXyFnv3dtzLOPgCG/hODL7WFnsbwktkD7yUV0RrreP/l1PALq/YSg6VvjlA==} + lodash.defaults@4.2.0: + resolution: {integrity: sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==} + lodash.flattendeep@4.4.0: resolution: {integrity: sha512-uHaJFihxmJcEX3kT4I23ABqKKalJ/zDrDg0lsFtc1h+3uw49SIJ5beyhx5ExVRti3AvKoOJngIj7xz3oylPdWQ==} + lodash.isarguments@3.1.0: + resolution: {integrity: sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==} + lodash.merge@4.6.2: resolution: {integrity: sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ==} @@ -7469,6 +7490,14 @@ packages: resolution: {integrity: sha512-6tDA8g98We0zd0GvVeMT9arEOnTw9qM03L9cJXaCjrip1OO764RDBLBfrB4cwzNGDj5OA5ioymC9GkizgWJDUg==} engines: {node: '>=8'} + redis-errors@1.2.0: + resolution: {integrity: sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==} + engines: {node: '>=4'} + + redis-parser@3.0.0: + resolution: {integrity: sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==} + engines: {node: '>=4'} + redux@4.2.1: resolution: {integrity: sha512-LAUYz4lc+Do8/g7aeRa8JkyDErK6ekstQaqWQrNRW//MY1TvCEpMtpTWvlQ+FPbWCx+Xixu/6SHt5N0HR+SB4w==} @@ -7772,6 +7801,9 @@ packages: resolution: {integrity: sha512-WjlahMgHmCJpqzU8bIBy4qtsZdU9lRlcZE3Lvyej6t4tuOuv1vk57OW3MBrj6hXBFx/nNoC9MPMTcr5YA7NQbg==} engines: {node: '>=6'} + standard-as-callback@2.1.0: + resolution: {integrity: sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==} + state-local@1.0.7: resolution: {integrity: sha512-HTEHMNieakEnoe33shBYcZ7NX83ACUjCu8c40iOGEZsngj9zRnkqS9j1pqQPXwobB0ZcVTk27REb7COQ0UR59w==} @@ -9379,6 +9411,8 @@ snapshots: optionalDependencies: '@types/node': 24.5.2 + '@ioredis/commands@1.5.1': {} + '@isaacs/cliui@8.0.2': dependencies: string-width: 5.1.2 @@ -12904,6 +12938,8 @@ snapshots: clsx@2.1.1: {} + cluster-key-slot@1.1.2: {} + cmdk@1.1.1(@types/react-dom@19.1.9(@types/react@19.1.13))(@types/react@19.1.13)(react-dom@19.1.1(react@19.1.1))(react@19.1.1): dependencies: '@radix-ui/react-compose-refs': 1.1.2(@types/react@19.1.13)(react@19.1.1) @@ -13189,6 +13225,8 @@ snapshots: delayed-stream@1.0.0: {} + denque@2.1.0: {} + depd@2.0.0: {} dequal@2.0.3: {} @@ -14210,6 +14248,20 @@ snapshots: internmap@2.0.3: {} + ioredis@5.10.0: + dependencies: + '@ioredis/commands': 1.5.1 + cluster-key-slot: 1.1.2 + debug: 4.4.3 + denque: 2.1.0 + lodash.defaults: 4.2.0 + lodash.isarguments: 3.1.0 + redis-errors: 1.2.0 + redis-parser: 3.0.0 + standard-as-callback: 2.1.0 + transitivePeerDependencies: + - supports-color + ipaddr.js@1.9.1: {} ipaddr.js@2.2.0: {} @@ -14993,8 +15045,12 @@ snapshots: lodash.camelcase@4.3.0: {} + lodash.defaults@4.2.0: {} + lodash.flattendeep@4.4.0: {} + lodash.isarguments@3.1.0: {} + lodash.merge@4.6.2: {} lodash@4.17.21: {} @@ -16308,6 +16364,12 @@ snapshots: indent-string: 4.0.0 strip-indent: 3.0.0 + redis-errors@1.2.0: {} + + redis-parser@3.0.0: + dependencies: + redis-errors: 1.2.0 + redux@4.2.1: dependencies: '@babel/runtime': 7.28.4 @@ -16707,6 +16769,8 @@ snapshots: dependencies: type-fest: 0.7.1 + standard-as-callback@2.1.0: {} + state-local@1.0.7: {} statuses@2.0.1: {} From 7b38953738511b77fa54c235184c7e2a31f6c44b Mon Sep 17 00:00:00 2001 From: Casey Brooks Date: Sat, 28 Feb 2026 04:31:40 +0000 Subject: [PATCH 5/6] fix(platform-server): require rooms for notifications --- .../notifications-grpc.publisher.test.ts | 66 +++++++++++++++++++ .../notifications-grpc.publisher.ts | 4 +- 2 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 packages/platform-server/__tests__/notifications-grpc.publisher.test.ts diff --git a/packages/platform-server/__tests__/notifications-grpc.publisher.test.ts b/packages/platform-server/__tests__/notifications-grpc.publisher.test.ts new file mode 100644 index 000000000..0ff45a14c --- /dev/null +++ b/packages/platform-server/__tests__/notifications-grpc.publisher.test.ts @@ -0,0 +1,66 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { NotificationsGrpcPublisher } from '../src/notifications/notifications-grpc.publisher'; + +const { publishMock, createClientMock, createTransportMock } = vi.hoisted(() => ({ + publishMock: vi.fn(), + createClientMock: vi.fn(), + createTransportMock: vi.fn(), +})); + +vi.mock('@connectrpc/connect', () => ({ + __esModule: true, + createClient: createClientMock, +})); + +vi.mock('@connectrpc/connect-node', () => ({ + __esModule: true, + createConnectTransport: createTransportMock, +})); + +const config = { + baseUrl: 'http://notifications.local', + deadlineMs: 1_000, + source: 'platform-server', +}; + +describe('NotificationsGrpcPublisher', () => { + beforeEach(() => { + publishMock.mockReset(); + createClientMock.mockReset(); + createTransportMock.mockReset(); + createTransportMock.mockImplementation(() => ({})); + createClientMock.mockImplementation(() => ({ publish: publishMock })); + }); + + it('throws when no rooms are provided', async () => { + const publisher = new NotificationsGrpcPublisher(config); + + await expect( + publisher.publishToRooms({ rooms: [], event: 'node_status', payload: {}, source: 'test-source' }), + ).rejects.toThrowError('NotificationsGrpcPublisher requires at least one room'); + + expect(publishMock).not.toHaveBeenCalled(); + }); + + it('publishes when rooms are provided', async () => { + publishMock.mockResolvedValue(undefined); + const publisher = new NotificationsGrpcPublisher(config); + + await publisher.publishToRooms({ + rooms: ['threads'], + event: 'node_status', + payload: { ok: true }, + source: 'test-source', + }); + + expect(publishMock).toHaveBeenCalledTimes(1); + const [request, options] = publishMock.mock.calls[0] ?? []; + expect(request).toEqual({ + event: 'node_status', + rooms: ['threads'], + payload: { ok: true }, + source: 'test-source', + }); + expect(options?.signal).toBeInstanceOf(AbortSignal); + }); +}); diff --git a/packages/platform-server/src/notifications/notifications-grpc.publisher.ts b/packages/platform-server/src/notifications/notifications-grpc.publisher.ts index 11f71390e..9074c9831 100644 --- a/packages/platform-server/src/notifications/notifications-grpc.publisher.ts +++ b/packages/platform-server/src/notifications/notifications-grpc.publisher.ts @@ -31,7 +31,9 @@ export class NotificationsGrpcPublisher implements UiNotificationsPublisher { async publishToRooms(request: UiNotificationPublishRequest): Promise { const { rooms, event, payload, source } = request; - if (!Array.isArray(rooms) || rooms.length === 0) return; + if (!Array.isArray(rooms) || rooms.length === 0) { + throw new Error('NotificationsGrpcPublisher requires at least one room'); + } let jsonPayload: Record | undefined; if (payload !== undefined) { From 6fb6d513e26912e5cb3193e4a29c842261a16592 Mon Sep 17 00:00:00 2001 From: Casey Brooks Date: Sat, 28 Feb 2026 04:34:50 +0000 Subject: [PATCH 6/6] fix(platform-server): normalize notification payload --- .../notifications-grpc.publisher.test.ts | 17 +++++++++++ .../notifications-grpc.publisher.ts | 28 +++++++++++++++++-- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/packages/platform-server/__tests__/notifications-grpc.publisher.test.ts b/packages/platform-server/__tests__/notifications-grpc.publisher.test.ts index 0ff45a14c..6bd1c9140 100644 --- a/packages/platform-server/__tests__/notifications-grpc.publisher.test.ts +++ b/packages/platform-server/__tests__/notifications-grpc.publisher.test.ts @@ -63,4 +63,21 @@ describe('NotificationsGrpcPublisher', () => { }); expect(options?.signal).toBeInstanceOf(AbortSignal); }); + + it('rejects payloads with non-JSON values', async () => { + const publisher = new NotificationsGrpcPublisher(config); + + await expect( + publisher.publishToRooms({ + rooms: ['threads'], + event: 'node_status', + payload: { invalid: () => undefined }, + source: 'test-source', + }), + ).rejects.toThrowError( + 'NotificationsGrpcPublisher payload values must be JSON-serializable; field "invalid" is function', + ); + + expect(publishMock).not.toHaveBeenCalled(); + }); }); diff --git a/packages/platform-server/src/notifications/notifications-grpc.publisher.ts b/packages/platform-server/src/notifications/notifications-grpc.publisher.ts index 9074c9831..63d588763 100644 --- a/packages/platform-server/src/notifications/notifications-grpc.publisher.ts +++ b/packages/platform-server/src/notifications/notifications-grpc.publisher.ts @@ -4,6 +4,7 @@ import { createConnectTransport } from '@connectrpc/connect-node'; import { NotificationsService } from '../proto/gen/agynio/api/notifications/v1/notifications_pb.js'; import type { UiNotificationPublishRequest, UiNotificationsPublisher } from './ui-notifications.publisher'; import { UI_NOTIFICATIONS_PUBLISHER } from './ui-notifications.publisher'; +import type { JsonObject, JsonValue } from '@bufbuild/protobuf'; export type NotificationsPublisherConfig = { baseUrl: string; @@ -17,6 +18,29 @@ function isJsonRecord(value: unknown): value is Record { return value !== null && typeof value === 'object' && !Array.isArray(value); } +function isJsonValue(value: unknown): value is JsonValue { + if (value === null) return true; + const kind = typeof value; + if (kind === 'string' || kind === 'number' || kind === 'boolean') return true; + if (Array.isArray(value)) return value.every(isJsonValue); + if (!isJsonRecord(value)) return false; + return Object.values(value).every(isJsonValue); +} + +function toJsonObject(value: Record): JsonObject { + const result: JsonObject = {}; + for (const [key, entry] of Object.entries(value)) { + if (!isJsonValue(entry)) { + const descriptor = entry === null ? 'null' : Array.isArray(entry) ? 'array' : typeof entry; + throw new Error( + `NotificationsGrpcPublisher payload values must be JSON-serializable; field "${key}" is ${descriptor}`, + ); + } + result[key] = entry; + } + return result; +} + @Injectable() export class NotificationsGrpcPublisher implements UiNotificationsPublisher { private readonly client: Client; @@ -35,13 +59,13 @@ export class NotificationsGrpcPublisher implements UiNotificationsPublisher { throw new Error('NotificationsGrpcPublisher requires at least one room'); } - let jsonPayload: Record | undefined; + let jsonPayload: JsonObject | undefined; if (payload !== undefined) { if (!isJsonRecord(payload)) { const descriptor = payload === null ? 'null' : Array.isArray(payload) ? 'array' : typeof payload; throw new Error(`NotificationsGrpcPublisher only supports object payloads; received ${descriptor}`); } - jsonPayload = payload; + jsonPayload = toJsonObject(payload); } const controller = new AbortController();