diff --git a/backend/.env.example b/backend/.env.example index 8afdc41bb..623187b33 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -90,4 +90,10 @@ RATE_LIMIT_ENABLED=true # Controls which proxy headers are trusted for IP extraction in rate limiting. # Set to 'cloudflare' to trust CF-Connecting-IP, 'akamai' for True-Client-IP. # Leave empty to use the direct socket IP only (all proxy headers are ignored). -TRUSTED_PROXY= \ No newline at end of file +TRUSTED_PROXY= + +# Agents +# Comma-separated list of enabled agent IDs. Empty = all enabled. (default: '') +ENABLED_AGENTS= +# Allow users to register custom agents with arbitrary URLs. (default: false) +ALLOW_CUSTOM_AGENTS=false \ No newline at end of file diff --git a/backend/drizzle/0013_next_ulik.sql b/backend/drizzle/0013_next_ulik.sql new file mode 100644 index 000000000..98b0de8fc --- /dev/null +++ b/backend/drizzle/0013_next_ulik.sql @@ -0,0 +1,28 @@ +CREATE TABLE "powersync"."agents" ( + "id" text NOT NULL, + "name" text, + "type" text, + "transport" text, + "command" text, + "args" text, + "url" text, + "auth_method" text, + "icon" text, + "is_system" integer DEFAULT 0, + "enabled" integer DEFAULT 1, + "registry_id" text, + "installed_version" text, + "registry_version" text, + "distribution_type" text, + "install_path" text, + "package_name" text, + "description" text, + "default_hash" text, + "deleted_at" timestamp, + "user_id" text NOT NULL, + CONSTRAINT "agents_id_user_id_pk" PRIMARY KEY("id","user_id") +); +--> statement-breakpoint +ALTER TABLE "powersync"."chat_threads" ADD COLUMN "agent_id" text;--> statement-breakpoint +ALTER TABLE "powersync"."agents" ADD CONSTRAINT "agents_user_id_user_id_fk" FOREIGN KEY ("user_id") REFERENCES "public"."user"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint +CREATE INDEX "idx_agents_user_id" ON "powersync"."agents" USING btree ("user_id"); \ No newline at end of file diff --git a/backend/drizzle/meta/0013_snapshot.json b/backend/drizzle/meta/0013_snapshot.json new file mode 100644 index 000000000..fec463410 --- /dev/null +++ b/backend/drizzle/meta/0013_snapshot.json @@ -0,0 +1,2178 @@ +{ + "id": "b4cb00cf-4826-45e1-bfe5-461b088a7b64", + "prevId": "5801d64a-0298-464d-b220-736beb9ff7e9", + "version": "7", + "dialect": "postgresql", + "tables": { + "public.account": { + "name": "account", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "account_id": { + "name": "account_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "provider_id": { + "name": "provider_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "access_token": { + "name": "access_token", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "refresh_token": { + "name": "refresh_token", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "id_token": { + "name": "id_token", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "access_token_expires_at": { + "name": "access_token_expires_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "refresh_token_expires_at": { + "name": "refresh_token_expires_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "scope": { + "name": "scope", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "password": { + "name": "password", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "account_userId_idx": { + "name": "account_userId_idx", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "account_user_id_user_id_fk": { + "name": "account_user_id_user_id_fk", + "tableFrom": "account", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.session": { + "name": "session", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "expires_at": { + "name": "expires_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true + }, + "token": { + "name": "token", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true + }, + "ip_address": { + "name": "ip_address", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "user_agent": { + "name": "user_agent", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "device_id": { + "name": "device_id", + "type": "text", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "session_userId_idx": { + "name": "session_userId_idx", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "session_deviceId_idx": { + "name": "session_deviceId_idx", + "columns": [ + { + "expression": "device_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "session_user_id_user_id_fk": { + "name": "session_user_id_user_id_fk", + "tableFrom": "session", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "session_token_unique": { + "name": "session_token_unique", + "nullsNotDistinct": false, + "columns": [ + "token" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.user": { + "name": "user", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "email": { + "name": "email", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "email_verified": { + "name": "email_verified", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "image": { + "name": "image", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "is_new": { + "name": "is_new", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "user_email_unique": { + "name": "user_email_unique", + "nullsNotDistinct": false, + "columns": [ + "email" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.verification": { + "name": "verification", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "identifier": { + "name": "identifier", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "value": { + "name": "value", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "expires_at": { + "name": "expires_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "verification_identifier_idx": { + "name": "verification_identifier_idx", + "columns": [ + { + "expression": "identifier", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.waitlist": { + "name": "waitlist", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "email": { + "name": "email", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true, + "default": "'pending'" + }, + "batch_id": { + "name": "batch_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "waitlist_status_idx": { + "name": "waitlist_status_idx", + "columns": [ + { + "expression": "status", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "waitlist_batch_id_idx": { + "name": "waitlist_batch_id_idx", + "columns": [ + { + "expression": "batch_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "waitlist_email_unique": { + "name": "waitlist_email_unique", + "nullsNotDistinct": false, + "columns": [ + "email" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "powersync.agents": { + "name": "agents", + "schema": "powersync", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "type": { + "name": "type", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "transport": { + "name": "transport", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "command": { + "name": "command", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "args": { + "name": "args", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "url": { + "name": "url", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "auth_method": { + "name": "auth_method", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "icon": { + "name": "icon", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "is_system": { + "name": "is_system", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 0 + }, + "enabled": { + "name": "enabled", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 1 + }, + "registry_id": { + "name": "registry_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "installed_version": { + "name": "installed_version", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "registry_version": { + "name": "registry_version", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "distribution_type": { + "name": "distribution_type", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "install_path": { + "name": "install_path", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "package_name": { + "name": "package_name", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "default_hash": { + "name": "default_hash", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "idx_agents_user_id": { + "name": "idx_agents_user_id", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "agents_user_id_user_id_fk": { + "name": "agents_user_id_user_id_fk", + "tableFrom": "agents", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "agents_id_user_id_pk": { + "name": "agents_id_user_id_pk", + "columns": [ + "id", + "user_id" + ] + } + }, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "powersync.chat_messages": { + "name": "chat_messages", + "schema": "powersync", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "content": { + "name": "content", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "role": { + "name": "role", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "parts": { + "name": "parts", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "chat_thread_id": { + "name": "chat_thread_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "model_id": { + "name": "model_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "parent_id": { + "name": "parent_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "cache": { + "name": "cache", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "metadata": { + "name": "metadata", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "idx_chat_messages_user_id": { + "name": "idx_chat_messages_user_id", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "chat_messages_user_id_user_id_fk": { + "name": "chat_messages_user_id_user_id_fk", + "tableFrom": "chat_messages", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "powersync.chat_threads": { + "name": "chat_threads", + "schema": "powersync", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "title": { + "name": "title", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "is_encrypted": { + "name": "is_encrypted", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 0 + }, + "triggered_by": { + "name": "triggered_by", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "was_triggered_by_automation": { + "name": "was_triggered_by_automation", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 0 + }, + "context_size": { + "name": "context_size", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "mode_id": { + "name": "mode_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "agent_id": { + "name": "agent_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "idx_chat_threads_user_id": { + "name": "idx_chat_threads_user_id", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "chat_threads_user_id_user_id_fk": { + "name": "chat_threads_user_id_user_id_fk", + "tableFrom": "chat_threads", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "powersync.devices": { + "name": "devices", + "schema": "powersync", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "trusted": { + "name": "trusted", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "approval_pending": { + "name": "approval_pending", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "public_key": { + "name": "public_key", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "mlkem_public_key": { + "name": "mlkem_public_key", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "last_seen": { + "name": "last_seen", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "default": "now()" + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "default": "now()" + }, + "revoked_at": { + "name": "revoked_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "idx_devices_user_id": { + "name": "idx_devices_user_id", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "devices_user_id_user_id_fk": { + "name": "devices_user_id_user_id_fk", + "tableFrom": "devices", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "powersync.mcp_servers": { + "name": "mcp_servers", + "schema": "powersync", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "type": { + "name": "type", + "type": "text", + "primaryKey": false, + "notNull": false, + "default": "'http'" + }, + "url": { + "name": "url", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "command": { + "name": "command", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "args": { + "name": "args", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "enabled": { + "name": "enabled", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 1 + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "default": "now()" + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "idx_mcp_servers_user_id": { + "name": "idx_mcp_servers_user_id", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "mcp_servers_user_id_user_id_fk": { + "name": "mcp_servers_user_id_user_id_fk", + "tableFrom": "mcp_servers", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "powersync.model_profiles": { + "name": "model_profiles", + "schema": "powersync", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "temperature": { + "name": "temperature", + "type": "real", + "primaryKey": false, + "notNull": false + }, + "max_steps": { + "name": "max_steps", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "max_attempts": { + "name": "max_attempts", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "nudge_threshold": { + "name": "nudge_threshold", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "use_system_message_mode_developer": { + "name": "use_system_message_mode_developer", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 0 + }, + "tools_override": { + "name": "tools_override", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "link_previews_override": { + "name": "link_previews_override", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "chat_mode_addendum": { + "name": "chat_mode_addendum", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "search_mode_addendum": { + "name": "search_mode_addendum", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "research_mode_addendum": { + "name": "research_mode_addendum", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "citation_reinforcement_enabled": { + "name": "citation_reinforcement_enabled", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 0 + }, + "citation_reinforcement_prompt": { + "name": "citation_reinforcement_prompt", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "nudge_final_step": { + "name": "nudge_final_step", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "nudge_preventive": { + "name": "nudge_preventive", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "nudge_retry": { + "name": "nudge_retry", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "nudge_search_final_step": { + "name": "nudge_search_final_step", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "nudge_search_preventive": { + "name": "nudge_search_preventive", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "nudge_search_retry": { + "name": "nudge_search_retry", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "provider_options": { + "name": "provider_options", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "default_hash": { + "name": "default_hash", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "idx_model_profiles_user_id": { + "name": "idx_model_profiles_user_id", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "model_profiles_user_id_user_id_fk": { + "name": "model_profiles_user_id_user_id_fk", + "tableFrom": "model_profiles", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "model_profiles_id_user_id_pk": { + "name": "model_profiles_id_user_id_pk", + "columns": [ + "id", + "user_id" + ] + } + }, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "powersync.models": { + "name": "models", + "schema": "powersync", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "provider": { + "name": "provider", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "model": { + "name": "model", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "url": { + "name": "url", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "api_key": { + "name": "api_key", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "is_system": { + "name": "is_system", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 0 + }, + "enabled": { + "name": "enabled", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 1 + }, + "tool_usage": { + "name": "tool_usage", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 1 + }, + "is_confidential": { + "name": "is_confidential", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 0 + }, + "start_with_reasoning": { + "name": "start_with_reasoning", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 0 + }, + "supports_parallel_tool_calls": { + "name": "supports_parallel_tool_calls", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 1 + }, + "context_window": { + "name": "context_window", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "default_hash": { + "name": "default_hash", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "vendor": { + "name": "vendor", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "idx_models_user_id": { + "name": "idx_models_user_id", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "models_user_id_user_id_fk": { + "name": "models_user_id_user_id_fk", + "tableFrom": "models", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "models_id_user_id_pk": { + "name": "models_id_user_id_pk", + "columns": [ + "id", + "user_id" + ] + } + }, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "powersync.modes": { + "name": "modes", + "schema": "powersync", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "label": { + "name": "label", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "icon": { + "name": "icon", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "system_prompt": { + "name": "system_prompt", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "is_default": { + "name": "is_default", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 0 + }, + "order": { + "name": "order", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 0 + }, + "default_hash": { + "name": "default_hash", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "idx_modes_user_id": { + "name": "idx_modes_user_id", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "modes_user_id_user_id_fk": { + "name": "modes_user_id_user_id_fk", + "tableFrom": "modes", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "modes_id_user_id_pk": { + "name": "modes_id_user_id_pk", + "columns": [ + "id", + "user_id" + ] + } + }, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "powersync.prompts": { + "name": "prompts", + "schema": "powersync", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "title": { + "name": "title", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "prompt": { + "name": "prompt", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "model_id": { + "name": "model_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "default_hash": { + "name": "default_hash", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "idx_prompts_user_id": { + "name": "idx_prompts_user_id", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "prompts_user_id_user_id_fk": { + "name": "prompts_user_id_user_id_fk", + "tableFrom": "prompts", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "prompts_id_user_id_pk": { + "name": "prompts_id_user_id_pk", + "columns": [ + "id", + "user_id" + ] + } + }, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "powersync.settings": { + "name": "settings", + "schema": "powersync", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "value": { + "name": "value", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false, + "default": "now()" + }, + "default_hash": { + "name": "default_hash", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "idx_settings_user_id": { + "name": "idx_settings_user_id", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "settings_user_id_user_id_fk": { + "name": "settings_user_id_user_id_fk", + "tableFrom": "settings", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "settings_id_user_id_pk": { + "name": "settings_id_user_id_pk", + "columns": [ + "id", + "user_id" + ] + } + }, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "powersync.tasks": { + "name": "tasks", + "schema": "powersync", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "item": { + "name": "item", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "order": { + "name": "order", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 0 + }, + "is_complete": { + "name": "is_complete", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 0 + }, + "default_hash": { + "name": "default_hash", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "idx_tasks_user_id": { + "name": "idx_tasks_user_id", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "tasks_user_id_user_id_fk": { + "name": "tasks_user_id_user_id_fk", + "tableFrom": "tasks", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "tasks_id_user_id_pk": { + "name": "tasks_id_user_id_pk", + "columns": [ + "id", + "user_id" + ] + } + }, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "powersync.triggers": { + "name": "triggers", + "schema": "powersync", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "trigger_type": { + "name": "trigger_type", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "trigger_time": { + "name": "trigger_time", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "prompt_id": { + "name": "prompt_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "is_enabled": { + "name": "is_enabled", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 1 + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "idx_triggers_user_id": { + "name": "idx_triggers_user_id", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "triggers_user_id_user_id_fk": { + "name": "triggers_user_id_user_id_fk", + "tableFrom": "triggers", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.rate_limits": { + "name": "rate_limits", + "schema": "", + "columns": { + "key": { + "name": "key", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "points": { + "name": "points", + "type": "integer", + "primaryKey": false, + "notNull": true, + "default": 0 + }, + "expire": { + "name": "expire", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "rate_limits_expire_idx": { + "name": "rate_limits_expire_idx", + "columns": [ + { + "expression": "expire", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.encryption_metadata": { + "name": "encryption_metadata", + "schema": "", + "columns": { + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "canary_iv": { + "name": "canary_iv", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "canary_ctext": { + "name": "canary_ctext", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "canary_secret_hash": { + "name": "canary_secret_hash", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": { + "encryption_metadata_user_id_user_id_fk": { + "name": "encryption_metadata_user_id_user_id_fk", + "tableFrom": "encryption_metadata", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.envelopes": { + "name": "envelopes", + "schema": "", + "columns": { + "device_id": { + "name": "device_id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "wrapped_ck": { + "name": "wrapped_ck", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "idx_envelopes_user_id": { + "name": "idx_envelopes_user_id", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "envelopes_device_id_devices_id_fk": { + "name": "envelopes_device_id_devices_id_fk", + "tableFrom": "envelopes", + "tableTo": "devices", + "schemaTo": "powersync", + "columnsFrom": [ + "device_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "envelopes_user_id_user_id_fk": { + "name": "envelopes_user_id_user_id_fk", + "tableFrom": "envelopes", + "tableTo": "user", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.otp_challenge": { + "name": "otp_challenge", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "email": { + "name": "email", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "challenge_token": { + "name": "challenge_token", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "expires_at": { + "name": "expires_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "otp_challenge_email_unique": { + "name": "otp_challenge_email_unique", + "nullsNotDistinct": false, + "columns": [ + "email" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + } + }, + "enums": {}, + "schemas": {}, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} \ No newline at end of file diff --git a/backend/drizzle/meta/_journal.json b/backend/drizzle/meta/_journal.json index 8306658e0..be6c642cd 100644 --- a/backend/drizzle/meta/_journal.json +++ b/backend/drizzle/meta/_journal.json @@ -92,6 +92,13 @@ "when": 1776165528264, "tag": "0012_furry_thor_girl", "breakpoints": true + }, + { + "idx": 13, + "version": "7", + "when": 1776298245962, + "tag": "0013_next_ulik", + "breakpoints": true } ] } \ No newline at end of file diff --git a/backend/src/agent-proxy/routes.test.ts b/backend/src/agent-proxy/routes.test.ts new file mode 100644 index 000000000..340c81ada --- /dev/null +++ b/backend/src/agent-proxy/routes.test.ts @@ -0,0 +1,879 @@ +import type { ConsoleSpies } from '@/test-utils/console-spies' +import { setupConsoleSpy } from '@/test-utils/console-spies' +import { afterAll, beforeAll, beforeEach, describe, expect, it, spyOn } from 'bun:test' +import { + classifyMessage, + clearConnections, + createAgentProxyRoutes, + handleHttpMessage, + handleWsMessage, + type HttpConnectionState, + maxPendingBytes, + maxPendingMessages, + openWsRelay, + parseApiKey, + parseClientMessage, + parseSSEStream, + type WsConnectionState, +} from './routes' +import { clearTickets, consumeWsTicket, createWsTicket } from '@/auth/ws-ticket' + +let consoleSpies: ConsoleSpies +beforeAll(() => { + consoleSpies = setupConsoleSpy() +}) +afterAll(() => { + consoleSpies.restore() +}) + +beforeEach(() => { + clearTickets() + clearConnections() +}) + +describe('parseApiKey', () => { + it('extracts apiKey from valid JSON', () => { + expect(parseApiKey('{"apiKey":"sk-abc123"}')).toBe('sk-abc123') + }) + + it('returns null for null input', () => { + expect(parseApiKey(null)).toBeNull() + }) + + it('returns null for empty string', () => { + expect(parseApiKey('')).toBeNull() + }) + + it('returns null when apiKey field is missing', () => { + expect(parseApiKey('{"other":"value"}')).toBeNull() + }) + + it('returns null for malformed JSON', () => { + expect(parseApiKey('not json')).toBeNull() + }) + + it('returns null for JSON without object shape', () => { + expect(parseApiKey('"just a string"')).toBeNull() + }) + + it('returns null cleanly for JSON "null" without throwing', () => { + consoleSpies.warn.mockClear() + expect(parseApiKey('null')).toBeNull() + expect(consoleSpies.warn).toHaveBeenCalledWith( + '[agent-proxy] authMethod JSON is not an object — credentials will not be sent', + ) + }) + + it('returns null for JSON array', () => { + expect(parseApiKey('[]')).toBeNull() + }) + + it('returns null for empty string apiKey', () => { + expect(parseApiKey('{"apiKey":""}')).toBeNull() + }) + + it('returns null when apiKey is non-string', () => { + expect(parseApiKey('{"apiKey":123}')).toBeNull() + }) +}) + +describe('parseClientMessage', () => { + it('parses valid JSON strings', () => { + expect(parseClientMessage('{"method":"ping","id":1}')).toEqual({ method: 'ping', id: 1 }) + }) + + it('returns null for non-JSON strings', () => { + expect(parseClientMessage('not json')).toBeNull() + }) + + it('passes through non-string objects as-is', () => { + const obj = { method: 'ping', id: 1 } + expect(parseClientMessage(obj)).toBe(obj) + }) + + it('returns null for Uint8Array (binary frame)', () => { + expect(parseClientMessage(new Uint8Array([1, 2, 3]))).toBeNull() + }) + + it('returns null for Buffer (binary frame)', () => { + expect(parseClientMessage(Buffer.from([1, 2, 3]))).toBeNull() + }) + + it('returns null for null', () => { + expect(parseClientMessage(null)).toBeNull() + }) + + it('returns null for arrays', () => { + expect(parseClientMessage([1, 2, 3])).toBeNull() + }) + + it('returns null for primitives', () => { + expect(parseClientMessage(42)).toBeNull() + expect(parseClientMessage(true)).toBeNull() + }) + + it('returns null for JSON primitive string', () => { + expect(parseClientMessage('"hello"')).toBeNull() + }) + + it('returns null for JSON primitive number', () => { + expect(parseClientMessage('42')).toBeNull() + }) + + it('returns null for JSON primitive boolean', () => { + expect(parseClientMessage('true')).toBeNull() + }) + + it('returns null for JSON null', () => { + expect(parseClientMessage('null')).toBeNull() + }) + + it('returns null for JSON array string', () => { + expect(parseClientMessage('[1,2,3]')).toBeNull() + }) +}) + +describe('WS ticket integration', () => { + it('creates and consumes a ticket with agent payload', () => { + const payload = { url: 'wss://agent.example.com/ws', authMethod: '{"apiKey":"test-key"}' } + const ticketId = createWsTicket('user-test', payload) + + const result = consumeWsTicket(ticketId) + expect(result).not.toBeNull() + expect(result!.userId).toBe('user-test') + expect(result!.payload).toEqual(payload) + + const apiKey = parseApiKey(result!.payload!.authMethod as string) + expect(apiKey).toBe('test-key') + }) + + it('prevents ticket reuse', () => { + const ticketId = createWsTicket('user-test') + expect(consumeWsTicket(ticketId)).not.toBeNull() + expect(consumeWsTicket(ticketId)).toBeNull() + }) +}) + +// ── parseSSEStream tests ──────────────────────────────────────────────────── + +const encode = (text: string): ReadableStream => { + const encoder = new TextEncoder() + return new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(text)) + controller.close() + }, + }) +} + +const collect = async (stream: AsyncGenerator): Promise => { + const items: unknown[] = [] + for await (const item of stream) { + items.push(item) + } + return items +} + +describe('parseSSEStream', () => { + it('parses a single JSON SSE event', async () => { + const body = encode('data: {"id":1}\n\n') + const events = await collect(parseSSEStream(body)) + expect(events).toEqual([{ id: 1 }]) + }) + + it('parses multiple JSON SSE events', async () => { + const body = encode('data: {"a":1}\n\ndata: {"b":2}\n\n') + const events = await collect(parseSSEStream(body)) + expect(events).toEqual([{ a: 1 }, { b: 2 }]) + }) + + it('skips and logs non-JSON SSE events', async () => { + const body = encode('data: not json\n\ndata: {"ok":true}\n\n') + const events = await collect(parseSSEStream(body)) + expect(events).toEqual([{ ok: true }]) + }) + + it('returns empty array for empty stream', async () => { + const body = encode('') + const events = await collect(parseSSEStream(body)) + expect(events).toEqual([]) + }) + + it('handles data: without space prefix', async () => { + const body = encode('data:{"no":"space"}\n\n') + const events = await collect(parseSSEStream(body)) + expect(events).toEqual([{ no: 'space' }]) + }) + + it('handles data: with space prefix', async () => { + const body = encode('data: {"with":"space"}\n\n') + const events = await collect(parseSSEStream(body)) + expect(events).toEqual([{ with: 'space' }]) + }) + + it('joins multi-line data fields', async () => { + const body = encode('data: {"multi":\ndata: "line"}\n\n') + const events = await collect(parseSSEStream(body)) + expect(events).toEqual([{ multi: 'line' }]) + }) + + it('throws on buffer overflow (>10M chars)', async () => { + const huge = 'data: ' + 'x'.repeat(11 * 1024 * 1024) + '\n\n' + const body = encode(huge) + await expect(collect(parseSSEStream(body))).rejects.toThrow('SSE buffer exceeded size limit') + }) + + it('handles multi-chunk streams where frames span multiple reads', async () => { + const encoder = new TextEncoder() + const body = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('data: {"a":1}\n')) + controller.enqueue(encoder.encode('\ndata: {"b":2}\n\n')) + controller.close() + }, + }) + const events = await collect(parseSSEStream(body)) + expect(events).toEqual([{ a: 1 }, { b: 2 }]) + }) +}) + +// ── classifyMessage tests ─────────────────────────────────────────────────── + +describe('classifyMessage', () => { + it('classifies request (has method and id)', () => { + expect(classifyMessage({ method: 'tools/call', id: 1 })).toBe('request') + }) + + it('classifies notification (has method only)', () => { + expect(classifyMessage({ method: 'notifications/progress' })).toBe('notification') + }) + + it('classifies response (has neither method)', () => { + expect(classifyMessage({ result: {}, id: 1 })).toBe('response') + }) +}) + +// ── Route-level open-handler tests ────────────────────────────────────────── +// +// Elysia's ws() handler cannot easily be invoked end-to-end through app.handle() +// (which is HTTP-only). Instead we reach into the router's recorded hooks and +// invoke the `open` handler directly with a mock ws object. This covers the +// authentication and SSRF-validation branches of the open handler. + +type MockWs = { + id: string + data: { query: { ticket?: string } } + closeCalls: Array<{ code?: number; reason?: string }> + sentMessages: string[] + send: (data: string | ArrayBuffer) => void + close: (code?: number, reason?: string) => void +} + +const createMockWs = (ticket?: string): MockWs => { + const ws: MockWs = { + id: `mock-ws-${Math.random().toString(36).slice(2)}`, + data: { query: ticket !== undefined ? { ticket } : {} }, + closeCalls: [], + sentMessages: [], + send(data) { + this.sentMessages.push(typeof data === 'string' ? data : String(data)) + }, + close(code, reason) { + this.closeCalls.push({ code, reason }) + }, + } + return ws +} + +type WsRoute = { + method: string + path: string + hooks: { open: (ws: MockWs) => void | Promise } +} + +const getOpenHandler = () => { + const app = createAgentProxyRoutes() + const route = (app.router.history as WsRoute[]).find((r) => r.method === 'WS' && r.path === '/agent-proxy/ws') + if (!route) throw new Error('WS route not registered') + return route.hooks.open +} + +describe('createAgentProxyRoutes (open handler)', () => { + it('registers the agent-proxy ws route', () => { + const app = createAgentProxyRoutes() + const route = (app.router.history as WsRoute[]).find((r) => r.method === 'WS' && r.path === '/agent-proxy/ws') + expect(route).toBeDefined() + }) + + it('closes with 4001 when ticket query param is missing', async () => { + const open = getOpenHandler() + const ws = createMockWs() + await open(ws) + expect(ws.closeCalls).toHaveLength(1) + expect(ws.closeCalls[0]!.code).toBe(4001) + }) + + it('closes with 4001 when ticket is invalid or already consumed', async () => { + const open = getOpenHandler() + const ws = createMockWs('bogus-ticket-id-that-does-not-exist') + await open(ws) + expect(ws.closeCalls).toHaveLength(1) + expect(ws.closeCalls[0]!.code).toBe(4001) + }) + + it('closes with 4003 when ticket payload has a private-IP URL (SSRF)', async () => { + const open = getOpenHandler() + const ticketId = createWsTicket('user-ssrf', { url: 'http://127.0.0.1/ws' }) + const ws = createMockWs(ticketId) + await open(ws) + expect(ws.closeCalls).toHaveLength(1) + expect(ws.closeCalls[0]!.code).toBe(4003) + }) + + it('closes with 4004 when ticket has no url in payload', async () => { + const open = getOpenHandler() + const ticketId = createWsTicket('user-no-url') + const ws = createMockWs(ticketId) + await open(ws) + expect(ws.closeCalls).toHaveLength(1) + expect(ws.closeCalls[0]!.code).toBe(4004) + }) + + it('closes with 4003 when ws:// is paired with an apiKey (cleartext credential)', async () => { + const open = getOpenHandler() + const ticketId = createWsTicket('user-ws-apikey', { + url: 'ws://agent.example.com/ws', + authMethod: '{"apiKey":"test-key"}', + }) + const ws = createMockWs(ticketId) + await open(ws) + expect(ws.closeCalls).toHaveLength(1) + expect(ws.closeCalls[0]!.code).toBe(4003) + }) + + it('closes with 4003 when http:// is paired with an apiKey (cleartext credential)', async () => { + const open = getOpenHandler() + const ticketId = createWsTicket('user-http-apikey', { + url: 'http://agent.example.com/acp', + authMethod: '{"apiKey":"test-key"}', + }) + const ws = createMockWs(ticketId) + await open(ws) + expect(ws.closeCalls).toHaveLength(1) + expect(ws.closeCalls[0]!.code).toBe(4003) + }) + + it('consumes a valid ticket and opens an upstream connection (http scheme)', async () => { + const open = getOpenHandler() + // Use a public-looking hostname so SSRF validation passes. The handler will + // create an HttpConnectionState without issuing any network request until a + // message arrives, so this is safe to invoke in unit tests. + const ticketId = createWsTicket('user-valid', { url: 'https://agent.example.com/acp' }) + const ws = createMockWs(ticketId) + await open(ws) + + // No close should have happened — connection is considered open. + expect(ws.closeCalls).toEqual([]) + // Ticket must have been consumed (one-time use). + expect(consumeWsTicket(ticketId)).toBeNull() + }) +}) + +// ── handleHttpMessage tests ───────────────────────────────────────────────── + +const createHttpState = (): HttpConnectionState => ({ + type: 'http', + agentUrl: 'https://agent.example.com/acp', + apiKey: null, + connectionId: null, + sessionId: null, + activeAborts: new Set(), + closed: false, + bootstrapPromise: null, +}) + +describe('handleHttpMessage', () => { + it('preserves the JSON-RPC request id in the error response when upstream returns non-JSON', async () => { + const ws = createMockWs() + const state = createHttpState() + const fakeFetch = async () => + new Response('oops', { + status: 200, + headers: { 'Content-Type': 'text/html' }, + }) + + await handleHttpMessage(ws, JSON.stringify({ jsonrpc: '2.0', method: 'ping', id: 42 }), state, fakeFetch) + + expect(ws.sentMessages).toHaveLength(1) + const payload = JSON.parse(ws.sentMessages[0]!) as { id: unknown; error: { code: number } } + expect(payload.id).toBe(42) + expect(payload.error.code).toBe(-32603) + }) + + it('preserves a string request id in the error response', async () => { + const ws = createMockWs() + const state = createHttpState() + const fakeFetch = async () => new Response('not json', { status: 200, headers: { 'Content-Type': 'text/plain' } }) + + await handleHttpMessage(ws, JSON.stringify({ jsonrpc: '2.0', method: 'ping', id: 'abc-123' }), state, fakeFetch) + + const payload = JSON.parse(ws.sentMessages[0]!) as { id: unknown } + expect(payload.id).toBe('abc-123') + }) + + it('clears the initial connect timeout once response headers arrive (long SSE streams not aborted)', async () => { + const ws = createFakeDownstream() + const state = createHttpState() + + // Build a ReadableStream that emits SSE events across multiple turns of the + // microtask queue. Without FIX 1, the 30s initial connect timeout would still + // be armed during streaming and fire on slow upstreams; after FIX 1 it is + // cleared the moment fetchImpl resolves (headers received). + const encoder = new TextEncoder() + let enqueuedDuringStreaming = false + const body = new ReadableStream({ + async start(controller) { + controller.enqueue(encoder.encode('data: {"chunk":1}\n\n')) + await new Promise((resolve) => queueMicrotask(() => resolve(undefined))) + enqueuedDuringStreaming = true + controller.enqueue(encoder.encode('data: {"chunk":2}\n\n')) + controller.close() + }, + }) + + let capturedSignal: AbortSignal | undefined + const fakeFetch: (input: string | URL | Request, init?: RequestInit) => Promise = async (_url, init) => { + capturedSignal = init?.signal as AbortSignal + return new Response(body, { status: 200, headers: { 'Content-Type': 'text/event-stream' } }) + } + + // Spy on clearTimeout to confirm FIX 1: the initial timer is cleared as + // soon as fetchImpl resolves, not only in the `finally` block. + const clearSpy = spyOn(globalThis, 'clearTimeout') + const before = clearSpy.mock.calls.length + + await handleHttpMessage( + asElysiaWs(ws), + JSON.stringify({ jsonrpc: '2.0', method: 'session/prompt', id: 1 }), + state, + fakeFetch, + ) + + const clearCalls = clearSpy.mock.calls.length - before + clearSpy.mockRestore() + + // Both chunks streamed through without an AbortError surfacing from the loop. + const messages = ws.sentMessages.map((m) => JSON.parse(m)) + expect(messages).toEqual([{ chunk: 1 }, { chunk: 2 }]) + expect(enqueuedDuringStreaming).toBe(true) + // The signal was never aborted — the initial timeout did not fire. + expect(capturedSignal?.aborted).toBe(false) + // clearTimeout fires twice: once after fetchImpl resolves (the FIX 1 call) + // and once in the finally block. If FIX 1 regressed, we'd see only 1 call. + expect(clearCalls).toBe(2) + // activeAborts was cleaned up. + expect(state.activeAborts.size).toBe(0) + }) + + it('aborts in-flight notification POSTs when close aborts activeAborts', async () => { + const ws = createMockWs() + const state = createHttpState() + let capturedSignal: AbortSignal | undefined + const fakeFetch: (input: string | URL | Request, init?: RequestInit) => Promise = (_url, init) => { + capturedSignal = init?.signal as AbortSignal + return new Promise((_resolve, reject) => { + capturedSignal!.addEventListener('abort', () => reject(new DOMException('aborted', 'AbortError'))) + }) + } + + const promise = handleHttpMessage( + ws, + JSON.stringify({ jsonrpc: '2.0', method: 'notifications/progress' }), + state, + fakeFetch, + ) + + // Simulate close: abort all active controllers (mirrors the close handler). + for (const ac of state.activeAborts) ac.abort() + + await promise + + expect(capturedSignal?.aborted).toBe(true) + expect(state.activeAborts.size).toBe(0) + }) + + it('pipelines subsequent messages with Acp-Session-Id from bootstrap response', async () => { + const ws = createFakeDownstream() + const state = createHttpState() + + // Control when the first (bootstrap) fetch resolves so we can enqueue a second message + // while bootstrap is still in flight. + let resolveFirst: ((response: Response) => void) | null = null + const capturedHeaders: Array> = [] + let call = 0 + + const fakeFetch: (input: string | URL | Request, init?: RequestInit) => Promise = async (_url, init) => { + const headers = init?.headers as Record + capturedHeaders.push({ ...headers }) + call += 1 + if (call === 1) { + return new Promise((resolve) => { + resolveFirst = resolve + }) + } + return new Response(JSON.stringify({ result: 'second', id: 2 }), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }) + } + + // Fire both messages in quick succession. Single-threaded JS means both synchronous + // entries run before any `await` can yield, so both enter handleHttpMessage before + // the second can observe the first's state changes across an await. + const p1 = handleHttpMessage( + asElysiaWs(ws), + JSON.stringify({ jsonrpc: '2.0', method: 'session/new', id: 1 }), + state, + fakeFetch, + ) + const p2 = handleHttpMessage( + asElysiaWs(ws), + JSON.stringify({ jsonrpc: '2.0', method: 'session/prompt', id: 2 }), + state, + fakeFetch, + ) + + // Wait for the first fetch to be called. + while (call < 1) await new Promise((r) => queueMicrotask(() => r(undefined))) + + // Resolve the bootstrap with session headers populated. + resolveFirst!( + new Response(JSON.stringify({ result: 'first', id: 1 }), { + status: 200, + headers: { 'Content-Type': 'application/json', 'Acp-Session-Id': 'sess-123' }, + }), + ) + + await Promise.all([p1, p2]) + + // First call had no session header (bootstrap); second call must carry the Acp-Session-Id + // returned by the bootstrap response. + expect(capturedHeaders).toHaveLength(2) + expect(capturedHeaders[0]!['Acp-Session-Id']).toBeUndefined() + expect(capturedHeaders[1]!['Acp-Session-Id']).toBe('sess-123') + expect(state.sessionId).toBe('sess-123') + }) + + it('allows bootstrap retry after the first bootstrap attempt fails', async () => { + const ws = createFakeDownstream() + const state = createHttpState() + + let call = 0 + const fakeFetch: (input: string | URL | Request, init?: RequestInit) => Promise = async () => { + call += 1 + if (call === 1) throw new Error('network down') + return new Response(JSON.stringify({ result: 'ok', id: 2 }), { + status: 200, + headers: { 'Content-Type': 'application/json', 'Acp-Session-Id': 'sess-retry' }, + }) + } + + // First call: bootstrap fails. Must not leave bootstrapPromise set so the next caller retries. + await handleHttpMessage( + asElysiaWs(ws), + JSON.stringify({ jsonrpc: '2.0', method: 'session/new', id: 1 }), + state, + fakeFetch, + ).catch(() => { + // handleHttpMessage surfaces the error on the request path — swallow here. + }) + + expect(state.bootstrapPromise).toBeNull() + expect(state.sessionId).toBeNull() + + // Second call: must be able to bootstrap (not deadlock) and populate the session. + await handleHttpMessage( + asElysiaWs(ws), + JSON.stringify({ jsonrpc: '2.0', method: 'session/new', id: 2 }), + state, + fakeFetch, + ) + + expect(state.sessionId).toBe('sess-retry') + expect(call).toBe(2) + }) + + it('resets bootstrapPromise when upstream returns success but no Acp-Session-Id header', async () => { + const ws = createFakeDownstream() + const state = createHttpState() + + const fakeFetch: (input: string | URL | Request, init?: RequestInit) => Promise = async () => { + // Success response with no session header — upstream protocol violation / degraded mode. + return new Response(JSON.stringify({ result: 'ok', id: 1 }), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }) + } + + await handleHttpMessage( + asElysiaWs(ws), + JSON.stringify({ jsonrpc: '2.0', method: 'session/new', id: 1 }), + state, + fakeFetch, + ) + + // sessionId was never populated → releaseBootstrap must have reset bootstrapPromise + // to null so the next arriving message can claim the bootstrap role and retry. + expect(state.sessionId).toBeNull() + expect(state.bootstrapPromise).toBeNull() + }) +}) + +// ── WS relay queue/flush tests ────────────────────────────────────────────── + +type FakeUpstreamListeners = { + open: Array<() => void> + message: Array<(event: { data: string | ArrayBuffer }) => void> + close: Array<(event: { code?: number; reason?: string }) => void> + error: Array<(event: ErrorEvent) => void> +} + +type FakeUpstreamWs = { + readyState: number + sentMessages: string[] + closeCalls: number + listeners: FakeUpstreamListeners + addEventListener: (type: 'open' | 'message' | 'close' | 'error', listener: (event: never) => void) => void + send: (data: string) => void + close: () => void + fireOpen: () => void + fireClose: (code?: number, reason?: string) => void + fireError: (message?: string) => void +} + +const createFakeUpstream = (): FakeUpstreamWs => { + const listeners: FakeUpstreamListeners = { open: [], message: [], close: [], error: [] } + const fake: FakeUpstreamWs = { + readyState: WebSocket.CONNECTING, + sentMessages: [], + closeCalls: 0, + listeners, + addEventListener(type, listener) { + // Narrow to the right listener list by event type. + if (type === 'open') listeners.open.push(listener as () => void) + else if (type === 'message') listeners.message.push(listener as (event: { data: string | ArrayBuffer }) => void) + else if (type === 'close') listeners.close.push(listener as (event: { code?: number; reason?: string }) => void) + else if (type === 'error') listeners.error.push(listener as (event: ErrorEvent) => void) + }, + send(data) { + this.sentMessages.push(data) + }, + close() { + this.closeCalls += 1 + this.readyState = WebSocket.CLOSED + }, + fireOpen() { + this.readyState = WebSocket.OPEN + for (const l of listeners.open) l() + }, + fireClose(code, reason) { + this.readyState = WebSocket.CLOSED + for (const l of listeners.close) l({ code, reason }) + }, + fireError(message) { + for (const l of listeners.error) l({ message: message ?? 'upstream error' } as ErrorEvent) + }, + } + return fake +} + +type FakeDownstreamWs = { + id: string + sentMessages: string[] + closeCalls: Array<{ code?: number; reason?: string }> + send: (data: string | ArrayBuffer) => void + close: (code?: number, reason?: string) => void +} + +const createFakeDownstream = (): FakeDownstreamWs => ({ + id: `ds-${Math.random().toString(36).slice(2)}`, + sentMessages: [], + closeCalls: [], + send(data) { + this.sentMessages.push(typeof data === 'string' ? data : String(data)) + }, + close(code, reason) { + this.closeCalls.push({ code, reason }) + }, +}) + +const createWsState = (upstream: FakeUpstreamWs): WsConnectionState => ({ + type: 'websocket', + upstream: upstream as unknown as WebSocket, + closed: false, + pendingMessages: [], + pendingBytes: 0, +}) + +type WsFactory = Parameters[3] +type ElysiaWsArg = Parameters[0] + +const asElysiaWs = (ws: FakeDownstreamWs): ElysiaWsArg => ws as unknown as ElysiaWsArg +const asWebSocketFactory = (fake: FakeUpstreamWs): WsFactory => (() => fake as unknown as WebSocket) as WsFactory + +describe('handleWsMessage (queue and flush)', () => { + it('forwards immediately when upstream is OPEN', () => { + const upstream = createFakeUpstream() + upstream.readyState = WebSocket.OPEN + const ws = createFakeDownstream() + const state = createWsState(upstream) + + handleWsMessage(asElysiaWs(ws), '{"method":"ping"}', state) + + expect(upstream.sentMessages).toEqual(['{"method":"ping"}']) + expect(state.pendingMessages).toEqual([]) + expect(state.pendingBytes).toBe(0) + }) + + it('queues messages while upstream is CONNECTING', () => { + const upstream = createFakeUpstream() + const ws = createFakeDownstream() + const state = createWsState(upstream) + + handleWsMessage(asElysiaWs(ws), '{"id":1}', state) + handleWsMessage(asElysiaWs(ws), '{"id":2}', state) + + expect(upstream.sentMessages).toEqual([]) + expect(state.pendingMessages).toEqual(['{"id":1}', '{"id":2}']) + expect(state.pendingBytes).toBe(Buffer.byteLength('{"id":1}') + Buffer.byteLength('{"id":2}')) + expect(ws.closeCalls).toEqual([]) + }) + + it('stringifies non-string messages before queueing', () => { + const upstream = createFakeUpstream() + const ws = createFakeDownstream() + const state = createWsState(upstream) + + handleWsMessage(asElysiaWs(ws), { method: 'init', id: 7 }, state) + + expect(state.pendingMessages).toEqual(['{"method":"init","id":7}']) + }) + + it('silently drops messages when upstream is CLOSING or CLOSED', () => { + const upstream = createFakeUpstream() + upstream.readyState = WebSocket.CLOSED + const ws = createFakeDownstream() + const state = createWsState(upstream) + + handleWsMessage(asElysiaWs(ws), '{"id":1}', state) + + expect(upstream.sentMessages).toEqual([]) + expect(state.pendingMessages).toEqual([]) + expect(ws.closeCalls).toEqual([]) + }) + + it('closes the downstream with 4005 when pending message count exceeds bound', () => { + const upstream = createFakeUpstream() + const ws = createFakeDownstream() + const state = createWsState(upstream) + + // Fill to the bound. Each msg is tiny so byte bound is not hit. + for (let i = 0; i < maxPendingMessages; i++) { + handleWsMessage(asElysiaWs(ws), `{"i":${i}}`, state) + } + expect(state.pendingMessages).toHaveLength(maxPendingMessages) + expect(ws.closeCalls).toEqual([]) + + // One more — should overflow and close. + handleWsMessage(asElysiaWs(ws), '{"overflow":true}', state) + + expect(ws.closeCalls).toEqual([{ code: 4005, reason: 'Upstream connection backlog exceeded' }]) + expect(state.closed).toBe(true) + expect(state.pendingMessages).toEqual([]) + expect(state.pendingBytes).toBe(0) + // Upstream handshake must be cancelled to avoid leaking a connecting socket. + expect(upstream.closeCalls).toBe(1) + }) + + it('closes the downstream with 4005 when pending bytes exceed bound', () => { + const upstream = createFakeUpstream() + const ws = createFakeDownstream() + const state = createWsState(upstream) + + // One payload that alone exceeds the byte bound. + const big = 'x'.repeat(maxPendingBytes + 1) + handleWsMessage(asElysiaWs(ws), big, state) + + expect(ws.closeCalls).toEqual([{ code: 4005, reason: 'Upstream connection backlog exceeded' }]) + expect(state.closed).toBe(true) + expect(state.pendingMessages).toEqual([]) + expect(state.pendingBytes).toBe(0) + }) + + it('closes the upstream WS when backlog overflow tears down the relay', () => { + const upstream = createFakeUpstream() + // Upstream is CONNECTING — without closing it, the handshake would continue after teardown. + const ws = createFakeDownstream() + const state = createWsState(upstream) + + const big = 'x'.repeat(maxPendingBytes + 1) + handleWsMessage(asElysiaWs(ws), big, state) + + expect(upstream.closeCalls).toBe(1) + expect(ws.closeCalls).toEqual([{ code: 4005, reason: 'Upstream connection backlog exceeded' }]) + }) +}) + +describe('openWsRelay (lifecycle)', () => { + it('flushes queued messages in order when upstream fires open', () => { + const fake = createFakeUpstream() + const ws = createFakeDownstream() + + const state = openWsRelay(asElysiaWs(ws), 'wss://agent.example.com/ws', null, asWebSocketFactory(fake)) + + handleWsMessage(asElysiaWs(ws), '{"id":1}', state) + handleWsMessage(asElysiaWs(ws), '{"id":2}', state) + expect(fake.sentMessages).toEqual([]) + + fake.fireOpen() + + expect(fake.sentMessages).toEqual(['{"id":1}', '{"id":2}']) + expect(state.pendingMessages).toEqual([]) + expect(state.pendingBytes).toBe(0) + }) + + it('drains pending queue when upstream closes before opening', () => { + const fake = createFakeUpstream() + const ws = createFakeDownstream() + + const state = openWsRelay(asElysiaWs(ws), 'wss://agent.example.com/ws', null, asWebSocketFactory(fake)) + + handleWsMessage(asElysiaWs(ws), '{"id":1}', state) + handleWsMessage(asElysiaWs(ws), '{"id":2}', state) + expect(state.pendingMessages).toHaveLength(2) + + fake.fireClose(1006, 'upstream gone') + + expect(state.pendingMessages).toEqual([]) + expect(state.pendingBytes).toBe(0) + expect(state.closed).toBe(true) + expect(ws.closeCalls).toEqual([{ code: 1006, reason: 'upstream gone' }]) + }) + + it('drains pending queue when upstream errors before opening', () => { + const fake = createFakeUpstream() + const ws = createFakeDownstream() + + const state = openWsRelay(asElysiaWs(ws), 'wss://agent.example.com/ws', null, asWebSocketFactory(fake)) + + handleWsMessage(asElysiaWs(ws), '{"id":1}', state) + expect(state.pendingMessages).toHaveLength(1) + + fake.fireError('handshake failed') + + expect(state.pendingMessages).toEqual([]) + expect(state.pendingBytes).toBe(0) + expect(state.closed).toBe(true) + expect(ws.closeCalls).toEqual([{ code: 4005, reason: 'Upstream agent connection error' }]) + }) +}) diff --git a/backend/src/agent-proxy/routes.ts b/backend/src/agent-proxy/routes.ts new file mode 100644 index 000000000..bfcf95af9 --- /dev/null +++ b/backend/src/agent-proxy/routes.ts @@ -0,0 +1,510 @@ +import { consumeWsTicket } from '@/auth/ws-ticket' +import { createSafeFetch, validateSafeUrl } from '@/utils/url-validation' +import { Elysia, t } from 'elysia' + +const proxyTimeoutMs = 30_000 +const maxSseBufferChars = 10 * 1024 * 1024 + +/** Max messages queued while upstream WS is CONNECTING. Exceeding closes the client conn with 4005. */ +export const maxPendingMessages = 64 +/** Max total bytes queued while upstream WS is CONNECTING. Exceeding closes the client conn with 4005. */ +export const maxPendingBytes = 256 * 1024 + +type ElysiaWS = { + readonly id: string + send: (data: string | ArrayBuffer) => void + close: (code?: number, reason?: string) => void + [key: string]: unknown +} + +// ── Shared utilities ───────────────────────────────────────────────────────── + +/** Parse apiKey from the agent's authMethod JSON column. */ +export const parseApiKey = (authMethod: string | null): string | null => { + if (!authMethod) return null + let parsed: unknown + try { + parsed = JSON.parse(authMethod) + } catch { + console.error('[agent-proxy] authMethod is not valid JSON — agent credentials will not be sent') + return null + } + if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) { + console.warn('[agent-proxy] authMethod JSON is not an object — credentials will not be sent') + return null + } + const apiKey = (parsed as { apiKey?: unknown }).apiKey + if (typeof apiKey !== 'string' || apiKey.length === 0) { + console.warn('[agent-proxy] authMethod object has no string apiKey field') + return null + } + return apiKey +} + +const isPlainObject = (value: unknown): value is Record => + typeof value === 'object' && value !== null && !Array.isArray(value) && !(value instanceof Uint8Array) + +/** + * Parses an incoming WS client message into a JSON-RPC object, or `null` if invalid. + * Accepts strings (parsed as JSON), already-deserialized plain objects (passed through), + * and rejects binary frames (Uint8Array/Buffer), arrays, primitives, and malformed JSON. + */ +export const parseClientMessage = (message: unknown): Record | null => { + if (isPlainObject(message)) return message + if (typeof message !== 'string') return null + try { + const parsed = JSON.parse(message) as unknown + return isPlainObject(parsed) ? parsed : null + } catch { + return null + } +} + +// ── WebSocket relay ────────────────────────────────────────────────────────── + +export type WsConnectionState = { + type: 'websocket' + upstream: WebSocket + closed: boolean + pendingMessages: string[] + pendingBytes: number +} + +type WebSocketFactory = (url: string, protocols?: string | string[]) => WebSocket + +const defaultWebSocketFactory: WebSocketFactory = (url, protocols) => new WebSocket(url, protocols) + +/** + * Opens an upstream WebSocket relay bound to the given downstream client `ws`. + * Messages sent by the client while the upstream is still CONNECTING are queued + * and flushed on `open`; the queue is bounded by {@link maxPendingMessages} and + * {@link maxPendingBytes} to prevent unbounded memory growth. + * + * The `webSocketFactory` parameter allows tests to inject a fake upstream without + * touching globals or making real network connections. + */ +export const openWsRelay = ( + ws: ElysiaWS, + url: string, + apiKey: string | null, + webSocketFactory: WebSocketFactory = defaultWebSocketFactory, +): WsConnectionState => { + // WS auth via subprotocol header — avoids leaking credentials in URL query params + const protocols = apiKey ? ['acp', `Bearer.${apiKey}`] : undefined + // KNOWN LIMITATION: WebSocket upstream connections lack DNS-pinning. + // validateSafeUrl provides synchronous hostname-only SSRF protection (blocks + // private IPs and loopback), but a DNS rebinding attack where the hostname + // first resolves public then changes to an internal IP between validation + // and connection could bypass this. HTTP path uses safeFetch with resolveAndValidate + // for full DNS pinning. TODO: implement createSafeWebSocket for parity. + const upstream = webSocketFactory(url, protocols) + const state: WsConnectionState = { + type: 'websocket', + upstream, + closed: false, + pendingMessages: [], + pendingBytes: 0, + } + + upstream.addEventListener('open', () => { + if (state.closed) return + for (const msg of state.pendingMessages) { + if (state.closed) break + upstream.send(msg) + } + state.pendingMessages.length = 0 + state.pendingBytes = 0 + }) + + upstream.addEventListener('message', (event) => { + if (state.closed) return + ws.send(typeof event.data === 'string' ? event.data : String(event.data)) + }) + + upstream.addEventListener('close', (event) => { + if (state.closed) return + state.closed = true + connections.delete(ws.id) + ws.close(event.code ?? 1000, event.reason ?? '') + state.pendingMessages.length = 0 + state.pendingBytes = 0 + }) + + upstream.addEventListener('error', (event) => { + const detail = (event as ErrorEvent).message ?? 'unknown' + console.error(`[agent-proxy] Upstream WS error for url=${url}: ${detail}`) + if (state.closed) return + state.closed = true + connections.delete(ws.id) + ws.close(4005, 'Upstream agent connection error') + state.pendingMessages.length = 0 + state.pendingBytes = 0 + }) + + return state +} + +/** + * Handles a client message destined for an upstream WebSocket relay. + * If the upstream is OPEN, forwards immediately. If CONNECTING, queues the message + * within {@link maxPendingMessages}/{@link maxPendingBytes} bounds; exceeding the + * bounds closes the downstream client with code 4005. CLOSING/CLOSED is a silent drop. + */ +export const handleWsMessage = (ws: ElysiaWS, message: unknown, state: WsConnectionState): void => { + const data = typeof message === 'string' ? message : JSON.stringify(message) + if (state.upstream.readyState === WebSocket.OPEN) { + state.upstream.send(data) + return + } + if (state.upstream.readyState !== WebSocket.CONNECTING) { + // CLOSING or CLOSED — message has no destination, drop silently + return + } + // CONNECTING — queue with bounds + const byteLen = Buffer.byteLength(data) + if (state.pendingMessages.length >= maxPendingMessages || state.pendingBytes + byteLen > maxPendingBytes) { + console.warn('[agent-proxy] Upstream connection backlog exceeded, closing') + state.closed = true + connections.delete(ws.id) + state.pendingMessages.length = 0 + state.pendingBytes = 0 + // Close upstream BEFORE downstream to avoid leaking the in-flight handshake. + // readyState is CONNECTING at this point (checked above); without closing, the + // upstream WS continues connecting (and eventually opens) even though the relay + // is torn down, burning a socket until the upstream peer's idle timeout. + state.upstream.close() + ws.close(4005, 'Upstream connection backlog exceeded') + return + } + state.pendingMessages.push(data) + state.pendingBytes += byteLen +} + +// ── HTTP/SSE relay ─────────────────────────────────────────────────────────── + +export type HttpConnectionState = { + type: 'http' + agentUrl: string + apiKey: string | null + connectionId: string | null + sessionId: string | null + activeAborts: Set + closed: boolean + /** + * Gate that serializes ACP session bootstrap. The first message with no sessionId + * takes ownership (sets this to a pending promise); concurrent messages await it + * so they see the `Acp-Session-Id` / `Acp-Connection-Id` headers returned by the + * bootstrap response. Reset to null after a failed bootstrap so the next message + * can retry. Stays null throughout the connection if no bootstrap is needed. + */ + bootstrapPromise: Promise | null +} + +/** + * Parses an SSE (Server-Sent Events) response body into a stream of JSON-decoded events. + * Yields each successfully parsed event; drops non-JSON `data:` lines with a warning. + * Throws if the buffer exceeds {@link maxSseBufferChars} characters to prevent unbounded memory growth. + */ +export async function* parseSSEStream(body: ReadableStream): AsyncGenerator { + const reader = body.getReader() + const decoder = new TextDecoder() + let buffer = '' + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + + if (buffer.length > maxSseBufferChars) { + throw new Error('SSE buffer exceeded size limit') + } + + const events = buffer.split('\n\n') + buffer = events.pop() || '' + + for (const event of events) { + const dataLines = event + .split('\n') + .filter((line) => line.startsWith('data:')) + .map((line) => line.slice(line.startsWith('data: ') ? 6 : 5)) + + if (dataLines.length > 0) { + const data = dataLines.join('\n') + try { + yield JSON.parse(data) + } catch { + console.warn('[agent-proxy] Dropped non-JSON SSE event:', data.slice(0, 200)) + } + } + } + } + } finally { + reader.releaseLock() + } +} + +/** + * Classifies a JSON-RPC message by its shape: + * - `request`: has both `method` and `id` (expects a response) + * - `notification`: has `method` but no `id` (fire-and-forget) + * - `response`: has neither (a reply to an earlier request) + */ +export const classifyMessage = (msg: Record): 'request' | 'notification' | 'response' => { + if ('method' in msg && 'id' in msg) return 'request' + if ('method' in msg) return 'notification' + return 'response' +} + +const openHttpRelay = (url: string, apiKey: string | null): HttpConnectionState => ({ + type: 'http', + agentUrl: url, + apiKey, + connectionId: null, + sessionId: null, + activeAborts: new Set(), + closed: false, + bootstrapPromise: null, +}) + +type FetchImpl = (input: string | URL | Request, init?: RequestInit) => Promise + +const defaultSafeFetch: FetchImpl = createSafeFetch(globalThis.fetch) + +/** + * Handles a JSON-RPC message from the downstream WS client over an HTTP/SSE upstream. + * The `fetchImpl` parameter allows tests to inject a fake fetch without touching globals. + */ +export const handleHttpMessage = async ( + ws: ElysiaWS, + message: unknown, + state: HttpConnectionState, + fetchImpl: FetchImpl = defaultSafeFetch, +) => { + const msg = parseClientMessage(message) + if (msg === null) { + console.warn('[agent-proxy] Dropped non-JSON client message:', String(message).slice(0, 200)) + ws.send(JSON.stringify({ jsonrpc: '2.0', error: { code: -32700, message: 'Parse error' }, id: null })) + return + } + const msgType = classifyMessage(msg) + + // ACP session bootstrap gate: serialize the first in-flight request so subsequent + // messages see the `Acp-Session-Id` / `Acp-Connection-Id` headers returned by the + // bootstrap response. Single-threaded JS makes the null-check + assignment atomic + // (no `await` between them), so at most one message claims the bootstrap role. + // Callers that arrive while bootstrap is in flight wait on the existing promise. + const needsBootstrap = state.bootstrapPromise === null && state.sessionId === null + const bootstrapResolvers = needsBootstrap ? Promise.withResolvers() : null + if (bootstrapResolvers) { + state.bootstrapPromise = bootstrapResolvers.promise + } else if (state.bootstrapPromise) { + await state.bootstrapPromise + } + + const headers: Record = { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + } + if (state.connectionId) headers['Acp-Connection-Id'] = state.connectionId + if (state.sessionId) headers['Acp-Session-Id'] = state.sessionId + if (state.apiKey) headers['Authorization'] = `Bearer ${state.apiKey}` + + // Releases the bootstrap gate. If the bootstrap attempt failed to establish a session + // (no sessionId populated), reset `bootstrapPromise` to null so the next caller can + // retry the bootstrap itself. Safe to call multiple times: `Promise.resolve()` on an + // already-resolved promise is a no-op, and the `sessionId === null` reset is idempotent. + const releaseBootstrap = () => { + if (!bootstrapResolvers) return + if (state.sessionId === null) state.bootstrapPromise = null + bootstrapResolvers.resolve() + } + + if (msgType === 'notification' || msgType === 'response') { + const ac = new AbortController() + state.activeAborts.add(ac) + const timeout = setTimeout(() => ac.abort(), proxyTimeoutMs) + try { + await fetchImpl(state.agentUrl, { + method: 'POST', + headers, + body: JSON.stringify(msg), + signal: ac.signal, + }) + } catch (err) { + console.warn('[agent-proxy] Fire-and-forget POST failed (session preserved):', err) + } finally { + clearTimeout(timeout) + state.activeAborts.delete(ac) + releaseBootstrap() + } + return + } + + const ac = new AbortController() + state.activeAborts.add(ac) + + const timeout = setTimeout(() => ac.abort(), proxyTimeoutMs) + + try { + const response = await fetchImpl(state.agentUrl, { + method: 'POST', + headers, + body: JSON.stringify(msg), + signal: ac.signal, + }) + // Headers have arrived — the initial timeout's job (bounding connect + headers time) + // is done. Clearing here prevents a long-running SSE stream from being aborted mid-stream + // at the 30s mark. Stream cancellation still works via `state.closed` checks in the loop + // and via `activeAborts` iteration on WS close. + clearTimeout(timeout) + + const connId = response.headers.get('Acp-Connection-Id') + if (connId) state.connectionId = connId + const sessId = response.headers.get('Acp-Session-Id') + if (sessId) state.sessionId = sessId + + // Release the bootstrap gate as soon as session headers are captured — waiters + // should not block on the SSE stream body of the bootstrap request. + releaseBootstrap() + + const contentType = response.headers.get('Content-Type') || '' + + if (contentType.includes('text/event-stream') && response.body) { + for await (const event of parseSSEStream(response.body)) { + if (state.closed) break + ws.send(JSON.stringify(event)) + } + } else { + const text = await response.text() + if (state.closed) return + try { + const result = JSON.parse(text) + ws.send(JSON.stringify(result)) + } catch { + console.error(`[agent-proxy] Non-JSON response from upstream (status ${response.status}):`, text.slice(0, 500)) + if (state.closed) return + ws.send( + JSON.stringify({ + jsonrpc: '2.0', + error: { code: -32603, message: 'Upstream returned non-JSON response' }, + id: (msg.id as string | number | null) ?? null, + }), + ) + } + } + } finally { + clearTimeout(timeout) + state.activeAborts.delete(ac) + // No-op if already released after headers arrived; this covers the error path where + // fetch threw before we could capture session headers (so waiters can retry bootstrap). + releaseBootstrap() + } +} + +// ── Connection state ───────────────────────────────────────────────────────── + +type ConnectionState = WsConnectionState | HttpConnectionState +const connections = new Map() + +/** Clears the in-memory connection store. Used by tests to isolate state between test runs. */ +export const clearConnections = (): void => { + connections.clear() +} + +// ── Routes ─────────────────────────────────────────────────────────────────── + +export const createAgentProxyRoutes = () => { + return new Elysia({ prefix: '/agent-proxy' }).ws('/ws', { + query: t.Object({ ticket: t.Optional(t.String()) }), + + open: (ws: ElysiaWS) => { + try { + const ticketId = (ws.data as { query?: { ticket?: string } }).query?.ticket + if (!ticketId) { + ws.close(4001, 'Unauthorized') + return + } + + const ticket = consumeWsTicket(ticketId) + if (!ticket) { + ws.close(4001, 'Unauthorized') + return + } + + const agentUrl = ticket.payload?.url as string | undefined + const authMethod = ticket.payload?.authMethod as string | undefined + + if (!agentUrl) { + ws.close(4004, 'Agent configuration missing') + return + } + + // SSRF protection — validateSafeUrl checks hostname synchronously; + // safeFetch does DNS-pinned validation for the HTTP path. + const validationUrl = agentUrl.replace(/^wss:\/\//, 'https://').replace(/^ws:\/\//, 'http://') + const validation = validateSafeUrl(validationUrl) + if (!validation.valid) { + ws.close(4003, 'Connection refused') + return + } + + const apiKey = parseApiKey(authMethod ?? null) + const isWebSocket = agentUrl.startsWith('ws://') || agentUrl.startsWith('wss://') + + // Block API keys over unencrypted transports — ws:// leaks `Bearer.{apiKey}` + // in the Sec-WebSocket-Protocol header, and http:// leaks the Authorization + // header in cleartext. Require encrypted upstream (wss:// or https://). + if (apiKey && (agentUrl.startsWith('ws://') || agentUrl.startsWith('http://'))) { + ws.close(4003, 'API keys require encrypted upstream (wss:// or https://)') + return + } + + const state = isWebSocket ? openWsRelay(ws, agentUrl, apiKey) : openHttpRelay(agentUrl, apiKey) + + connections.set(ws.id, state) + } catch (err) { + console.error('[agent-proxy] Error in open handler:', err) + ws.close(4005, 'Internal proxy error') + } + }, + + message: (ws: ElysiaWS, message: unknown) => { + const state = connections.get(ws.id) + if (!state || state.closed) return + + if (state.type === 'websocket') { + handleWsMessage(ws, message, state) + return + } + + handleHttpMessage(ws, message, state).catch((err) => { + if (state.closed) { + console.warn('[agent-proxy] HTTP relay error after close (suppressed):', err) + return + } + console.error('[agent-proxy] HTTP relay error:', err) + state.closed = true + connections.delete(ws.id) + ws.close(4005, 'Upstream agent error') + }) + }, + + close: (ws: ElysiaWS) => { + const state = connections.get(ws.id) + if (!state) return + + state.closed = true + connections.delete(ws.id) + + if (state.type === 'websocket') { + if (state.upstream.readyState === WebSocket.OPEN || state.upstream.readyState === WebSocket.CONNECTING) { + state.upstream.close() + } + } else { + for (const ac of state.activeAborts) ac.abort() + } + }, + }) +} diff --git a/backend/src/agents/routes.test.ts b/backend/src/agents/routes.test.ts new file mode 100644 index 000000000..3282b8da4 --- /dev/null +++ b/backend/src/agents/routes.test.ts @@ -0,0 +1,116 @@ +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'bun:test' +import { clearSettingsCache } from '@/config/settings' +import type { ConsoleSpies } from '@/test-utils/console-spies' +import { setupConsoleSpy } from '@/test-utils/console-spies' +import type { AgentDescriptor, AgentProvider } from './types' + +// We test the route logic by calling the Elysia app directly via .handle() +import { createAgentsRoutes } from './routes' + +const makeProvider = (agents: AgentDescriptor[]): AgentProvider => ({ + getAgents: async () => agents, +}) + +const agent1: AgentDescriptor = { id: 'a1', name: 'Agent 1', type: 'remote', transport: 'websocket', url: 'wss://a1' } +const agent2: AgentDescriptor = { id: 'a2', name: 'Agent 2', type: 'built-in', transport: 'in-process' } +const agent3: AgentDescriptor = { id: 'a3', name: 'Agent 3', type: 'remote', transport: 'websocket', url: 'wss://a3' } + +const ENV_KEYS = ['ENABLED_AGENTS'] as const +let savedEnv: Partial> +let consoleSpies: ConsoleSpies + +beforeAll(() => { + consoleSpies = setupConsoleSpy() +}) + +afterAll(() => { + consoleSpies.restore() +}) + +beforeEach(() => { + clearSettingsCache() + savedEnv = {} + for (const key of ENV_KEYS) { + savedEnv[key] = process.env[key] + } +}) + +afterEach(() => { + for (const key of ENV_KEYS) { + if (savedEnv[key] !== undefined) { + process.env[key] = savedEnv[key] + } else { + delete process.env[key] + } + } + clearSettingsCache() +}) + +describe('Agent routes', () => { + it('returns empty array when no providers are registered', async () => { + const app = createAgentsRoutes([]) + const res = await app.handle(new Request('http://localhost/agents')) + const json = (await res.json()) as { agents: AgentDescriptor[] } + expect(json.agents).toEqual([]) + }) + + it('merges results from multiple providers', async () => { + const app = createAgentsRoutes([makeProvider([agent1]), makeProvider([agent2, agent3])]) + const res = await app.handle(new Request('http://localhost/agents')) + const json = (await res.json()) as { agents: AgentDescriptor[] } + expect(json.agents).toHaveLength(3) + expect(json.agents.map((a) => a.id)).toEqual(['a1', 'a2', 'a3']) + }) + + it('filters by type query parameter', async () => { + const app = createAgentsRoutes([makeProvider([agent1, agent2, agent3])]) + const res = await app.handle(new Request('http://localhost/agents?type=remote')) + const json = (await res.json()) as { agents: AgentDescriptor[] } + expect(json.agents).toHaveLength(2) + expect(json.agents.every((a) => a.type === 'remote')).toBe(true) + }) + + it('filters by id query parameter', async () => { + const app = createAgentsRoutes([makeProvider([agent1, agent2, agent3])]) + const res = await app.handle(new Request('http://localhost/agents?id=a2')) + const json = (await res.json()) as { agents: AgentDescriptor[] } + expect(json.agents).toHaveLength(1) + expect(json.agents[0].id).toBe('a2') + }) + + it('filters by enabledAgents setting', async () => { + process.env.ENABLED_AGENTS = 'a1,a3' + clearSettingsCache() + + const app = createAgentsRoutes([makeProvider([agent1, agent2, agent3])]) + const res = await app.handle(new Request('http://localhost/agents')) + const json = (await res.json()) as { agents: AgentDescriptor[] } + expect(json.agents).toHaveLength(2) + expect(json.agents.map((a) => a.id)).toEqual(['a1', 'a3']) + }) + + it('returns all agents when enabledAgents is empty', async () => { + delete process.env.ENABLED_AGENTS + clearSettingsCache() + + const app = createAgentsRoutes([makeProvider([agent1, agent2])]) + const res = await app.handle(new Request('http://localhost/agents')) + const json = (await res.json()) as { agents: AgentDescriptor[] } + expect(json.agents).toHaveLength(2) + }) + + it('returns fulfilled provider agents when another provider rejects', async () => { + consoleSpies.error.mockClear() + const rejectingProvider: AgentProvider = { + getAgents: async () => { + throw new Error('boom') + }, + } + const app = createAgentsRoutes([rejectingProvider, makeProvider([agent1, agent2])]) + const res = await app.handle(new Request('http://localhost/agents')) + expect(res.status).toBe(200) + const json = (await res.json()) as { agents: AgentDescriptor[] } + expect(json.agents.map((a) => a.id)).toEqual(['a1', 'a2']) + expect(consoleSpies.error).toHaveBeenCalled() + }) +}) diff --git a/backend/src/agents/routes.ts b/backend/src/agents/routes.ts new file mode 100644 index 000000000..da3968530 --- /dev/null +++ b/backend/src/agents/routes.ts @@ -0,0 +1,43 @@ +import { getEnabledAgentIds, getSettings } from '@/config/settings' +import { safeErrorHandler } from '@/middleware/error-handling' +import { Elysia, t } from 'elysia' +import type { AgentDescriptor, AgentProvider } from './types' + +/** + * Creates the public agent discovery endpoint. + * + * Intentionally unauthenticated — returns metadata about enabled agents only + * (no user-specific data). Clients need to discover available agents before + * authenticating to use any of them. Reviewed and approved in PR #531. + * + * GET /agents — returns available agents, optionally filtered by type or id. + */ +export const createAgentsRoutes = (providers: AgentProvider[] = []) => { + return new Elysia({ prefix: '/agents' }).onError(safeErrorHandler).get( + '/', + async ({ query }) => { + const results = await Promise.allSettled(providers.map((p) => p.getAgents())) + results + .filter((r) => r.status === 'rejected') + .forEach((r) => { + console.error('[agents] Provider failed:', (r as PromiseRejectedResult).reason) + }) + + const enabledIds = getEnabledAgentIds(getSettings()) + const agents = results + .filter((r): r is PromiseFulfilledResult => r.status === 'fulfilled') + .flatMap((r) => r.value) + .filter((a) => !enabledIds || enabledIds.includes(a.id)) + .filter((a) => !query.type || a.type === query.type) + .filter((a) => !query.id || a.id === query.id) + + return { agents } + }, + { + query: t.Object({ + type: t.Optional(t.String()), + id: t.Optional(t.String()), + }), + }, + ) +} diff --git a/backend/src/agents/types.ts b/backend/src/agents/types.ts new file mode 100644 index 000000000..6b2a1d723 --- /dev/null +++ b/backend/src/agents/types.ts @@ -0,0 +1,13 @@ +export type AgentDescriptor = { + id: string + name: string + type: 'built-in' | 'local' | 'remote' + transport: 'in-process' | 'stdio' | 'websocket' + url?: string + icon?: string + isSystem?: number +} + +export type AgentProvider = { + getAgents: () => Promise +} diff --git a/backend/src/api/powersync.test.ts b/backend/src/api/powersync.test.ts index 485284df9..567343cbe 100644 --- a/backend/src/api/powersync.test.ts +++ b/backend/src/api/powersync.test.ts @@ -52,6 +52,8 @@ const powersyncSettings: Settings = { oidcIssuer: '', betterAuthUrl: 'http://localhost:8000', betterAuthSecret, + enabledAgents: '', + allowCustomAgents: false, rateLimitEnabled: false, swaggerEnabled: false, trustedProxy: '', diff --git a/backend/src/auth/oidc-integration.test.ts b/backend/src/auth/oidc-integration.test.ts index 3af876167..164a28f92 100644 --- a/backend/src/auth/oidc-integration.test.ts +++ b/backend/src/auth/oidc-integration.test.ts @@ -61,6 +61,8 @@ const baseSettings: Settings = { oidcIssuer: '', // set per-suite once mock server is up betterAuthUrl: 'http://localhost:8000', betterAuthSecret: 'test-secret-at-least-32-chars-long!!', + enabledAgents: '', + allowCustomAgents: false, rateLimitEnabled: false, swaggerEnabled: false, trustedProxy: '', diff --git a/backend/src/auth/routes.test.ts b/backend/src/auth/routes.test.ts index 048eb9735..23b9344cf 100644 --- a/backend/src/auth/routes.test.ts +++ b/backend/src/auth/routes.test.ts @@ -57,6 +57,8 @@ describe('Authentication Routes', () => { oidcIssuer: '', betterAuthUrl: 'http://localhost:8000', betterAuthSecret: 'test-secret-at-least-32-chars-long!!', + enabledAgents: '', + allowCustomAgents: false, rateLimitEnabled: false, swaggerEnabled: false, trustedProxy: '', diff --git a/backend/src/auth/ws-ticket-routes.test.ts b/backend/src/auth/ws-ticket-routes.test.ts new file mode 100644 index 000000000..f5e5e4040 --- /dev/null +++ b/backend/src/auth/ws-ticket-routes.test.ts @@ -0,0 +1,200 @@ +import * as settingsModule from '@/config/settings' +import { clearSettingsCache } from '@/config/settings' +import type { ConsoleSpies } from '@/test-utils/console-spies' +import { setupConsoleSpy } from '@/test-utils/console-spies' +import { mockAuth, mockAuthUnauthenticated } from '@/test-utils/mock-auth' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, spyOn } from 'bun:test' +import { Elysia } from 'elysia' +import { clearTickets, createWsTicket } from './ws-ticket' +import { createWsTicketRoutes } from './ws-ticket-routes' + +const ENV_KEYS = ['ALLOW_CUSTOM_AGENTS'] as const +let savedEnv: Partial> + +let app: { handle: Elysia['handle'] } +let consoleSpies: ConsoleSpies +let getSettingsSpy: ReturnType + +const defaultSettings = { + fireworksApiKey: '', + mistralApiKey: '', + anthropicApiKey: '', + exaApiKey: '', + thunderboltInferenceUrl: '', + thunderboltInferenceApiKey: '', + monitoringToken: '', + googleClientId: '', + googleClientSecret: '', + microsoftClientId: '', + microsoftClientSecret: '', + logLevel: 'INFO' as const, + port: 8000, + appUrl: 'http://localhost:1420', + posthogHost: '', + posthogApiKey: '', + corsOrigins: '', + corsAllowCredentials: true, + corsAllowMethods: '', + corsAllowHeaders: '', + corsExposeHeaders: '', + waitlistEnabled: false, + waitlistAutoApproveDomains: '', + powersyncUrl: '', + powersyncJwtKid: '', + powersyncJwtSecret: '', + powersyncTokenExpirySeconds: 3600, + authMode: 'consumer' as const, + oidcClientId: '', + oidcClientSecret: '', + oidcIssuer: '', + betterAuthUrl: 'http://localhost:8000', + betterAuthSecret: 'test-secret-at-least-32-chars-long!!', + rateLimitEnabled: false, + swaggerEnabled: false, + trustedProxy: '' as const, + enabledAgents: '', + allowCustomAgents: false, +} + +const post = (body?: unknown) => { + const init: RequestInit = { method: 'POST' } + if (body !== undefined) { + init.headers = { 'Content-Type': 'application/json' } + init.body = JSON.stringify(body) + } + return app.handle(new Request('http://localhost/ws-ticket', init)) +} + +const postRaw = (rawBody: string) => + app.handle( + new Request('http://localhost/ws-ticket', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: rawBody, + }), + ) + +beforeAll(() => { + consoleSpies = setupConsoleSpy() +}) + +afterAll(() => { + consoleSpies.restore() +}) + +beforeEach(() => { + clearTickets() + clearSettingsCache() + savedEnv = {} + for (const key of ENV_KEYS) { + savedEnv[key] = process.env[key] + } + getSettingsSpy = spyOn(settingsModule, 'getSettings').mockReturnValue({ + ...defaultSettings, + allowCustomAgents: false, + }) + app = new Elysia().use(createWsTicketRoutes(mockAuth)) +}) + +afterEach(() => { + getSettingsSpy.mockRestore() + for (const key of ENV_KEYS) { + if (savedEnv[key] !== undefined) { + process.env[key] = savedEnv[key] + } else { + delete process.env[key] + } + } + clearSettingsCache() +}) + +describe('ws-ticket-routes', () => { + it('returns a ticket for POST with valid payload and allowCustomAgents=true', async () => { + getSettingsSpy.mockReturnValue({ ...defaultSettings, allowCustomAgents: true }) + + const res = await post({ payload: { url: 'wss://agent.example.com/ws' } }) + expect(res.status).toBe(200) + const json = (await res.json()) as { ticket: string } + expect(json.ticket).toBeDefined() + expect(typeof json.ticket).toBe('string') + expect(json.ticket.length).toBeGreaterThan(0) + }) + + it('returns 400 for malformed JSON body', async () => { + const res = await postRaw('not valid json{') + expect(res.status).toBe(400) + const json = (await res.json()) as { error: string } + expect(json.error).toBe('Invalid JSON body') + }) + + it('returns 400 when URL fails regex validation', async () => { + getSettingsSpy.mockReturnValue({ ...defaultSettings, allowCustomAgents: true }) + + const res = await post({ payload: { url: 'ftp://invalid.example.com' } }) + expect(res.status).toBe(400) + const json = (await res.json()) as { error: string } + expect(json.error).toBe('Invalid or oversized URL') + }) + + it('returns 400 when URL is oversized', async () => { + getSettingsSpy.mockReturnValue({ ...defaultSettings, allowCustomAgents: true }) + + const longUrl = 'https://example.com/' + 'a'.repeat(2100) + const res = await post({ payload: { url: longUrl } }) + expect(res.status).toBe(400) + const json = (await res.json()) as { error: string } + expect(json.error).toBe('Invalid or oversized URL') + }) + + it('returns 400 when authMethod is oversized', async () => { + getSettingsSpy.mockReturnValue({ ...defaultSettings, allowCustomAgents: true }) + + const res = await post({ + payload: { url: 'wss://agent.example.com/ws', authMethod: 'x'.repeat(5000) }, + }) + expect(res.status).toBe(400) + const json = (await res.json()) as { error: string } + expect(json.error).toBe('Oversized authMethod') + }) + + it('returns 403 when allowCustomAgents=false and URL is present', async () => { + const res = await post({ payload: { url: 'wss://agent.example.com/ws' } }) + expect(res.status).toBe(403) + const json = (await res.json()) as { error: string } + expect(json.error).toBe('Custom agents are not allowed') + }) + + it('returns a ticket when no payload is provided', async () => { + const res = await post({}) + expect(res.status).toBe(200) + const json = (await res.json()) as { ticket: string } + expect(json.ticket).toBeDefined() + expect(typeof json.ticket).toBe('string') + }) + + it('returns 401 when no valid session is present', async () => { + const unauthApp = new Elysia().use(createWsTicketRoutes(mockAuthUnauthenticated)) + const res = await unauthApp.handle( + new Request('http://localhost/ws-ticket', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({}), + }), + ) + expect(res.status).toBe(401) + }) + + it('returns 429 with Retry-After when TicketQuotaError is thrown', async () => { + const warnSpy = spyOn(console, 'warn').mockImplementation(() => {}) + // Fill the per-user quota for the mockAuth user id + for (let i = 0; i < 20; i++) { + createWsTicket('test-user') + } + const res = await post({}) + expect(res.status).toBe(429) + expect(res.headers.get('Retry-After')).toBe('30') + const json = (await res.json()) as { error: string } + expect(json.error).toBe('Ticket quota exceeded') + warnSpy.mockRestore() + }) +}) diff --git a/backend/src/auth/ws-ticket-routes.ts b/backend/src/auth/ws-ticket-routes.ts new file mode 100644 index 000000000..a9e3967fa --- /dev/null +++ b/backend/src/auth/ws-ticket-routes.ts @@ -0,0 +1,92 @@ +import type { Auth } from '@/auth/elysia-plugin' +import { createAuthMacro } from '@/auth/elysia-plugin' +import { getSettings } from '@/config/settings' +import { safeErrorHandler } from '@/middleware/error-handling' +import { type AnyElysia, Elysia } from 'elysia' +import { createWsTicket, TicketQuotaError } from './ws-ticket' + +type RawPayload = { url?: string; authMethod?: string } | undefined + +type RequestBody = { payload?: RawPayload } + +type PayloadResult = + | { ok: true; payload: Record | undefined } + | { ok: false; status: 400 | 403; error: string } + +/** Parses an untrusted JSON request body, returning `null` on malformed input. */ +const parseJsonBody = async (request: Request): Promise => { + try { + return (await request.json()) as RequestBody + } catch { + return null + } +} + +/** + * Validates and normalises the raw `payload` field from a ws-ticket request body. + * Enforces URL protocol/size limits, authMethod size, and the `allowCustomAgents` flag. + */ +const buildTicketPayload = (raw: RawPayload): PayloadResult => { + if (!raw || typeof raw.url !== 'string') { + return { ok: true, payload: undefined } + } + + if (!/^(wss?|https?):\/\//.test(raw.url) || raw.url.length > 2048) { + return { ok: false, status: 400, error: 'Invalid or oversized URL' } + } + + if (!getSettings().allowCustomAgents) { + return { ok: false, status: 403, error: 'Custom agents are not allowed' } + } + + const payload: Record = { url: raw.url } + if (typeof raw.authMethod === 'string') { + if (raw.authMethod.length > 4096) { + return { ok: false, status: 400, error: 'Oversized authMethod' } + } + payload.authMethod = raw.authMethod + } + + return { ok: true, payload } +} + +/** + * Creates the WebSocket ticket endpoint. + * POST /ws-ticket — returns a short-lived ticket for authenticating WebSocket connections. + * Requires a valid session (cookie-based auth). Optionally IP rate-limited via `ipRateLimit`. + */ +export const createWsTicketRoutes = (auth: Auth, ipRateLimit?: AnyElysia) => { + const app = new Elysia({ prefix: '/ws-ticket' }).onError(safeErrorHandler).use(createAuthMacro(auth)) + if (ipRateLimit) { + app.use(ipRateLimit) + } + return app.post( + '/', + async ({ user, request, set }) => { + const body = await parseJsonBody(request) + if (body === null) { + set.status = 400 + return { error: 'Invalid JSON body' } + } + + const result = buildTicketPayload(body.payload) + if (!result.ok) { + set.status = result.status + return { error: result.error } + } + + try { + const ticket = createWsTicket(user.id, result.payload) + return { ticket } + } catch (err) { + if (err instanceof TicketQuotaError) { + set.status = 429 + set.headers['Retry-After'] = String(err.retryAfterSecs) + return { error: 'Ticket quota exceeded' } + } + throw err + } + }, + { auth: true }, + ) +} diff --git a/backend/src/auth/ws-ticket.test.ts b/backend/src/auth/ws-ticket.test.ts new file mode 100644 index 000000000..633c766f5 --- /dev/null +++ b/backend/src/auth/ws-ticket.test.ts @@ -0,0 +1,136 @@ +import { beforeEach, describe, expect, it, spyOn } from 'bun:test' +import { clearTickets, consumeWsTicket, createWsTicket, TicketQuotaError } from './ws-ticket' + +describe('ws-ticket', () => { + beforeEach(() => { + clearTickets() + }) + + it('returns null when the ticket has expired', () => { + const nowSpy = spyOn(Date, 'now') + nowSpy.mockReturnValue(1_000_000) + const ticket = createWsTicket('user-1') + nowSpy.mockReturnValue(1_000_000 + 30_001) // past TTL + expect(consumeWsTicket(ticket)).toBeNull() + nowSpy.mockRestore() + }) + + it('throws when ticket store is at capacity', () => { + const nowSpy = spyOn(Date, 'now') + // Freeze time so the eviction pass (triggered at MAX_TICKETS/2) finds + // nothing expired and the store fills to exactly MAX_TICKETS. + nowSpy.mockReturnValue(1_000_000) + for (let i = 0; i < 10_000; i++) { + createWsTicket(`user-${i}`) + } + expect(() => createWsTicket('overflow')).toThrow('Ticket store at capacity') + nowSpy.mockRestore() + }) + + it('generates a 64-character hex ticket', () => { + const ticket = createWsTicket('user-1') + expect(ticket).toHaveLength(64) + expect(/^[0-9a-f]{64}$/.test(ticket)).toBe(true) + }) + + it('generates unique tickets', () => { + const tickets = new Set(Array.from({ length: 100 }, (_, i) => createWsTicket(`user-${i}`))) + expect(tickets.size).toBe(100) + }) + + it('consumes a valid ticket and returns userId', () => { + const ticket = createWsTicket('user-42') + const result = consumeWsTicket(ticket) + expect(result).not.toBeNull() + expect(result!.userId).toBe('user-42') + }) + + it('returns null on second consumption (one-time use)', () => { + const ticket = createWsTicket('user-1') + const first = consumeWsTicket(ticket) + expect(first).not.toBeNull() + const second = consumeWsTicket(ticket) + expect(second).toBeNull() + }) + + it('returns null for unknown ticket', () => { + const result = consumeWsTicket('0'.repeat(64)) + expect(result).toBeNull() + }) + + it('roundtrips payload', () => { + const payload = { url: 'wss://example.com', authMethod: '{"apiKey":"abc"}' } + const ticket = createWsTicket('user-1', payload) + const result = consumeWsTicket(ticket) + expect(result).not.toBeNull() + expect(result!.payload).toEqual(payload) + }) + + it('omits payload when not provided', () => { + const ticket = createWsTicket('user-1') + const result = consumeWsTicket(ticket) + expect(result).not.toBeNull() + expect(result!.payload).toBeUndefined() + }) + + it('handles high-volume ticket creation without errors', () => { + // Create more than MAX_TICKETS/2 to trigger eviction + const tickets: string[] = [] + for (let i = 0; i < 6000; i++) { + tickets.push(createWsTicket(`user-${i}`)) + } + // The most recent ticket should still be consumable + const lastTicket = tickets[tickets.length - 1] + const result = consumeWsTicket(lastTicket) + expect(result).not.toBeNull() + expect(result!.userId).toBe('user-5999') + }) + + it('throws TicketQuotaError after 20 tickets for same user', () => { + const warnSpy = spyOn(console, 'warn').mockImplementation(() => {}) + const userId = 'user-quota' + const tickets: string[] = [] + for (let i = 0; i < 20; i++) { + tickets.push(createWsTicket(userId)) + } + expect(tickets).toHaveLength(20) + expect(() => createWsTicket(userId)).toThrow(TicketQuotaError) + warnSpy.mockRestore() + }) + + it('TicketQuotaError has retryAfterSecs = 30', () => { + const warnSpy = spyOn(console, 'warn').mockImplementation(() => {}) + const userId = 'user-retry' + for (let i = 0; i < 20; i++) { + createWsTicket(userId) + } + try { + createWsTicket(userId) + throw new Error('expected TicketQuotaError') + } catch (err) { + expect(err).toBeInstanceOf(TicketQuotaError) + expect((err as TicketQuotaError).retryAfterSecs).toBe(30) + } + warnSpy.mockRestore() + }) + + it('different users have independent quotas', () => { + for (let i = 0; i < 20; i++) { + createWsTicket('user-a') + } + // user-a is at quota, user-b should still be able to create + expect(() => createWsTicket('user-b')).not.toThrow() + }) + + it('expired tickets do not count toward quota', () => { + const nowSpy = spyOn(Date, 'now').mockReturnValue(1_000_000) + for (let i = 0; i < 20; i++) { + createWsTicket('user-expired') + } + // Advance past TTL + nowSpy.mockReturnValue(1_000_000 + 31_000) + // Old tickets expired → should be able to create more + expect(() => createWsTicket('user-expired')).not.toThrow() + nowSpy.mockRestore() + }) +}) diff --git a/backend/src/auth/ws-ticket.ts b/backend/src/auth/ws-ticket.ts new file mode 100644 index 000000000..54b617798 --- /dev/null +++ b/backend/src/auth/ws-ticket.ts @@ -0,0 +1,96 @@ +type WsTicket = { + userId: string + expiresAt: number + payload?: Record +} + +const TICKET_TTL_MS = 30_000 // 30 seconds +const MAX_TICKETS = 10_000 +const maxTicketsPerUser = 20 + +// In-memory store — single-process only. For multi-process, use Redis. +const tickets = new Map() + +/** Thrown when a user has reached their per-session ticket quota. Maps to HTTP 429. */ +export class TicketQuotaError extends Error { + readonly retryAfterSecs = 30 + constructor(userId: string) { + super(`Ticket quota exceeded for user ${userId}`) + this.name = 'TicketQuotaError' + } +} + +const generateTicketId = (): string => { + const bytes = new Uint8Array(32) + crypto.getRandomValues(bytes) + return Array.from(bytes, (b) => b.toString(16).padStart(2, '0')).join('') +} + +const evictExpired = () => { + const now = Date.now() + for (const [id, ticket] of tickets) { + if (ticket.expiresAt <= now) { + tickets.delete(id) + } + } +} + +/** Count active (unexpired, unconsumed) tickets for a given userId. */ +const countActiveTicketsForUser = (userId: string, now: number): number => { + let count = 0 + for (const ticket of tickets.values()) { + if (ticket.userId === userId && ticket.expiresAt > now) { + count++ + } + } + return count +} + +/** + * Creates a short-lived, one-time-use ticket for WebSocket authentication. + * The ticket is consumed on first use and expires after TICKET_TTL_MS. + * Throws {@link TicketQuotaError} when the per-user quota is exceeded. + */ +export const createWsTicket = (userId: string, payload?: Record): string => { + if (tickets.size >= MAX_TICKETS / 2) { + evictExpired() + } + if (tickets.size >= MAX_TICKETS) { + throw new Error('Ticket store at capacity') + } + + const now = Date.now() + const userTicketCount = countActiveTicketsForUser(userId, now) + if (userTicketCount >= maxTicketsPerUser) { + console.warn(`[ws-ticket] User ${userId} hit per-user quota (active=${userTicketCount}, max=${maxTicketsPerUser})`) + throw new TicketQuotaError(userId) + } + + const ticketId = generateTicketId() + tickets.set(ticketId, { + userId, + expiresAt: now + TICKET_TTL_MS, + ...(payload ? { payload } : {}), + }) + return ticketId +} + +/** + * Validates and consumes a WebSocket ticket. Returns the userId and optional payload + * if valid, null if the ticket is invalid, expired, or already consumed. + */ +export const consumeWsTicket = (ticketId: string): { userId: string; payload?: Record } | null => { + const ticket = tickets.get(ticketId) + if (!ticket) return null + + tickets.delete(ticketId) + + if (Date.now() > ticket.expiresAt) return null + + return { userId: ticket.userId, payload: ticket.payload } +} + +/** Clears the in-memory ticket store. Used by tests to isolate state between test runs. */ +export const clearTickets = (): void => { + tickets.clear() +} diff --git a/backend/src/config/settings.test.ts b/backend/src/config/settings.test.ts index f076900d1..1b5932a9e 100644 --- a/backend/src/config/settings.test.ts +++ b/backend/src/config/settings.test.ts @@ -1,6 +1,7 @@ import { afterEach, beforeEach, describe, expect, it } from 'bun:test' import { clearSettingsCache, + getEnabledAgentIds, getWaitlistAutoApproveDomains, getCorsMethodsList, getCorsOriginsList, @@ -474,6 +475,87 @@ describe('Config Settings', () => { }) }) + describe('getEnabledAgentIds', () => { + it('returns null for empty string (all enabled)', () => { + expect(getEnabledAgentIds({ enabledAgents: '' })).toBeNull() + }) + + it('splits comma-separated agent IDs', () => { + expect(getEnabledAgentIds({ enabledAgents: 'agent-1,agent-2,agent-3' })).toEqual([ + 'agent-1', + 'agent-2', + 'agent-3', + ]) + }) + + it('trims whitespace from IDs', () => { + expect(getEnabledAgentIds({ enabledAgents: ' agent-1 , agent-2 ' })).toEqual(['agent-1', 'agent-2']) + }) + + it('filters out empty segments', () => { + expect(getEnabledAgentIds({ enabledAgents: 'agent-1,,agent-2,' })).toEqual(['agent-1', 'agent-2']) + }) + + it('returns null for whitespace-only string', () => { + expect(getEnabledAgentIds({ enabledAgents: ' , , ' })).toBeNull() + }) + }) + + describe('Agent settings', () => { + const AGENT_ENV_KEYS = ['ENABLED_AGENTS', 'ALLOW_CUSTOM_AGENTS'] as const + + let savedEnv: Partial> + + beforeEach(() => { + clearSettingsCache() + savedEnv = {} + for (const key of AGENT_ENV_KEYS) { + savedEnv[key] = process.env[key] + } + }) + + afterEach(() => { + for (const key of AGENT_ENV_KEYS) { + if (savedEnv[key] !== undefined) { + process.env[key] = savedEnv[key] + } else { + delete process.env[key] + } + } + clearSettingsCache() + }) + + it('should default enabledAgents to empty string', () => { + delete process.env.ENABLED_AGENTS + const settings = getSettings() + expect(settings.enabledAgents).toBe('') + }) + + it('should read ENABLED_AGENTS from env', () => { + process.env.ENABLED_AGENTS = 'haystack-default,custom-agent' + const settings = getSettings() + expect(settings.enabledAgents).toBe('haystack-default,custom-agent') + }) + + it('should default allowCustomAgents to false', () => { + delete process.env.ALLOW_CUSTOM_AGENTS + const settings = getSettings() + expect(settings.allowCustomAgents).toBe(false) + }) + + it('should enable allowCustomAgents when ALLOW_CUSTOM_AGENTS is "true"', () => { + process.env.ALLOW_CUSTOM_AGENTS = 'true' + const settings = getSettings() + expect(settings.allowCustomAgents).toBe(true) + }) + + it('should keep allowCustomAgents disabled for any value other than "true"', () => { + process.env.ALLOW_CUSTOM_AGENTS = 'false' + const settings = getSettings() + expect(settings.allowCustomAgents).toBe(false) + }) + }) + describe('isOAuthRedirectUriAllowed', () => { const settings = { corsOrigins: 'http://localhost:1420,tauri://localhost,http://tauri.localhost' } diff --git a/backend/src/config/settings.ts b/backend/src/config/settings.ts index 709e3d5b0..0ac29997d 100644 --- a/backend/src/config/settings.ts +++ b/backend/src/config/settings.ts @@ -71,6 +71,10 @@ const settingsSchema = z // Set to 'cloudflare' to trust CF-Connecting-IP, 'akamai' for True-Client-IP, // or leave empty to use only the direct socket IP (proxy headers are NOT trusted) trustedProxy: z.enum(['', 'cloudflare', 'akamai']).default(''), + + // Agent settings + enabledAgents: z.string().default(''), + allowCustomAgents: z.boolean().default(false), }) .superRefine((data, ctx) => { if (data.powersyncUrl && data.powersyncJwtSecret.length < 32) { @@ -133,6 +137,8 @@ const parseSettings = (): Settings => { swaggerEnabled: process.env.SWAGGER_ENABLED === 'true', rateLimitEnabled: process.env.RATE_LIMIT_ENABLED !== 'false', trustedProxy: (process.env.TRUSTED_PROXY || '').toLowerCase(), + enabledAgents: process.env.ENABLED_AGENTS || '', + allowCustomAgents: process.env.ALLOW_CUSTOM_AGENTS === 'true', } return settingsSchema.parse(env) @@ -205,3 +211,18 @@ export const getWaitlistAutoApproveDomains = (settings: Settings): string[] => { .map((domain) => domain.trim().toLowerCase()) .filter((domain) => domain.length > 0) } + +/** + * Parse the comma-separated ENABLED_AGENTS setting into a list of agent IDs. + * Returns null when unset (meaning "all agents enabled"). + */ +export const getEnabledAgentIds = (settings: Pick): string[] | null => { + if (!settings.enabledAgents.trim()) { + return null + } + const ids = settings.enabledAgents + .split(',') + .map((id) => id.trim()) + .filter((id) => id.length > 0) + return ids.length > 0 ? ids : null +} diff --git a/backend/src/db/powersync-schema.ts b/backend/src/db/powersync-schema.ts index 17c7cc283..0ea28a866 100644 --- a/backend/src/db/powersync-schema.ts +++ b/backend/src/db/powersync-schema.ts @@ -46,6 +46,7 @@ export const chatThreadsTable = powersyncSchema.table( wasTriggeredByAutomation: integer('was_triggered_by_automation').default(0), contextSize: integer('context_size'), modeId: text('mode_id'), + agentId: text('agent_id'), deletedAt: timestamp('deleted_at'), userId: text('user_id') .notNull() @@ -242,6 +243,36 @@ export const devicesTable = powersyncSchema.table( (table) => [index('idx_devices_user_id').on(table.userId)], ) +export const agentsTable = powersyncSchema.table( + 'agents', + { + id: text('id').notNull(), + name: text('name'), + type: text('type', { enum: ['built-in', 'local', 'remote'] }), + transport: text('transport', { enum: ['in-process', 'stdio', 'websocket'] }), + command: text('command'), + args: text('args'), + url: text('url'), + authMethod: text('auth_method'), + icon: text('icon'), + isSystem: integer('is_system').default(0), + enabled: integer('enabled').default(1), + registryId: text('registry_id'), + installedVersion: text('installed_version'), + registryVersion: text('registry_version'), + distributionType: text('distribution_type'), + installPath: text('install_path'), + packageName: text('package_name'), + description: text('description'), + defaultHash: text('default_hash'), + deletedAt: timestamp('deleted_at'), + userId: text('user_id') + .notNull() + .references(() => user.id, { onDelete: 'cascade' }), + }, + (table) => [primaryKey({ columns: [table.id, table.userId] }), index('idx_agents_user_id').on(table.userId)], +) + /** * Map of PowerSync table names to Drizzle tables for account delete. * Must have an entry for every PowerSyncTableName (type-checked). @@ -258,6 +289,7 @@ export const powersyncTablesByName = { modes: modesTable, model_profiles: modelProfilesTable, devices: devicesTable, + agents: agentsTable, } satisfies Record /** @@ -286,6 +318,7 @@ export const powersyncPkColumn: Record = { modes: modesTable.id, model_profiles: modelProfilesTable.id, devices: devicesTable.id, + agents: agentsTable.id, } /** @@ -305,4 +338,5 @@ export const powersyncConflictTarget: Record modes: [modesTable.id, modesTable.userId], model_profiles: [modelProfilesTable.id, modelProfilesTable.userId], devices: [devicesTable.id], + agents: [agentsTable.id, agentsTable.userId], } diff --git a/backend/src/index.ts b/backend/src/index.ts index dfc28110b..c7b03e35b 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -9,6 +9,9 @@ import { createInferenceRoutes } from '@/inference/routes' import { createErrorHandlingMiddleware } from '@/middleware/error-handling' import { createHttpLoggingMiddleware } from '@/middleware/http-logging' import { createAuthIpRateLimit, createInferenceRateLimit, createProRateLimit } from '@/middleware/rate-limit' +import { createAgentProxyRoutes } from '@/agent-proxy/routes' +import { createAgentsRoutes } from '@/agents/routes' +import { createWsTicketRoutes } from '@/auth/ws-ticket-routes' import { createMcpProxyRoutes } from '@/mcp-proxy/routes' import { createPostHogRoutes } from '@/posthog/routes' import { createProToolsRoutes } from '@/pro/routes' @@ -104,6 +107,9 @@ export const createApp = async (deps?: AppDeps) => { .use(createPowerSyncRoutes(auth, settings, database)) .use(createEncryptionRoutes(auth, database)) .use(createAccountRoutes(auth, database)) + .use(createAgentsRoutes()) + .use(createWsTicketRoutes(auth, createAuthIpRateLimit(database, ipRateLimitSettings))) + .use(createAgentProxyRoutes()) ) } diff --git a/backend/src/pro/link-preview.test.ts b/backend/src/pro/link-preview.test.ts index 29ad8d866..ce5d52a79 100644 --- a/backend/src/pro/link-preview.test.ts +++ b/backend/src/pro/link-preview.test.ts @@ -67,6 +67,8 @@ describe('Link Preview Routes', () => { oidcIssuer: '', betterAuthUrl: 'http://localhost:8000', betterAuthSecret: 'test-secret-at-least-32-chars-long!!', + enabledAgents: '', + allowCustomAgents: false, rateLimitEnabled: false, swaggerEnabled: false, trustedProxy: '', diff --git a/backend/src/pro/proxy.test.ts b/backend/src/pro/proxy.test.ts index acb0e0c64..44ca919f3 100644 --- a/backend/src/pro/proxy.test.ts +++ b/backend/src/pro/proxy.test.ts @@ -75,6 +75,8 @@ describe('Proxy Routes', () => { oidcIssuer: '', betterAuthUrl: 'http://localhost:8000', betterAuthSecret: 'test-secret-at-least-32-chars-long!!', + enabledAgents: '', + allowCustomAgents: false, rateLimitEnabled: false, swaggerEnabled: false, trustedProxy: '', diff --git a/deploy/config/powersync-config.yaml b/deploy/config/powersync-config.yaml index c18f8e978..862fb7a8c 100644 --- a/deploy/config/powersync-config.yaml +++ b/deploy/config/powersync-config.yaml @@ -27,6 +27,7 @@ sync_rules: - SELECT * FROM powersync.modes WHERE user_id = bucket.user_id - SELECT * FROM powersync.model_profiles WHERE user_id = bucket.user_id - SELECT * FROM powersync.devices WHERE user_id = bucket.user_id + - SELECT * FROM powersync.agents WHERE user_id = bucket.user_id client_auth: supabase: false diff --git a/powersync-service/config/config.yaml b/powersync-service/config/config.yaml index 2fa79a442..9491bba6d 100644 --- a/powersync-service/config/config.yaml +++ b/powersync-service/config/config.yaml @@ -28,6 +28,7 @@ sync_rules: - SELECT * FROM powersync.modes WHERE user_id = bucket.user_id - SELECT * FROM powersync.model_profiles WHERE user_id = bucket.user_id - SELECT * FROM powersync.devices WHERE user_id = bucket.user_id + - SELECT * FROM powersync.agents WHERE user_id = bucket.user_id client_auth: supabase: false diff --git a/shared/powersync-tables.ts b/shared/powersync-tables.ts index bb0b6543f..f28530f21 100644 --- a/shared/powersync-tables.ts +++ b/shared/powersync-tables.ts @@ -5,6 +5,15 @@ * src/db/powersync/schema.ts, and powersync-service/config/config.yaml. */ +// NOTE: Adding a new table here requires the corresponding entry in the +// frontend drizzleSchema (src/db/powersync/schema.ts) and tables definition +// (src/db/tables.ts) at compile time because the `satisfies` constraint +// below references the frontend schema type. This means the two-PR deploy +// pattern (backend migration first, then frontend) must still be followed +// at the DEPLOYMENT level (run migration → update PowerSync Cloud rules → +// deploy frontend), but the frontend schema files themselves must be +// co-located in the backend PR to keep the type system honest. + export const powersyncTableNames = [ 'settings', 'chat_threads', @@ -17,6 +26,7 @@ export const powersyncTableNames = [ 'modes', 'model_profiles', 'devices', + 'agents', ] as const export type PowerSyncTableName = (typeof powersyncTableNames)[number] @@ -40,4 +50,5 @@ export const powersyncTableToQueryKeys: { modes: [['modes']], model_profiles: [['modelProfiles']], devices: [['devices']], + agents: [['agents']], } diff --git a/src/db/encryption/config.ts b/src/db/encryption/config.ts index 3f06bb28a..c8d0517f5 100644 --- a/src/db/encryption/config.ts +++ b/src/db/encryption/config.ts @@ -16,6 +16,7 @@ export const encryptedColumnsMap: Readonly> = tasks: ['item'], models: ['name', 'model', 'url', 'api_key', 'vendor', 'description'], mcp_servers: ['name', 'url', 'command', 'args'], + agents: ['name', 'url', 'command', 'args', 'auth_method', 'install_path', 'package_name', 'description'], prompts: ['title', 'prompt'], triggers: ['trigger_time'], model_profiles: [ diff --git a/src/db/powersync/schema.ts b/src/db/powersync/schema.ts index a9306e46c..a684f8fba 100644 --- a/src/db/powersync/schema.ts +++ b/src/db/powersync/schema.ts @@ -19,6 +19,7 @@ export const drizzleSchema = { modes: tables.modesTable, model_profiles: tables.modelProfilesTable, devices: tables.devicesTable, + agents: tables.agentsTable, } satisfies Record /** diff --git a/src/db/tables.ts b/src/db/tables.ts index cc4e4b3d2..1511257a4 100644 --- a/src/db/tables.ts +++ b/src/db/tables.ts @@ -23,6 +23,7 @@ export const chatThreadsTable = sqliteTable( wasTriggeredByAutomation: integer('was_triggered_by_automation').default(0), contextSize: integer('context_size'), modeId: text('mode_id'), + agentId: text('agent_id'), deletedAt: text('deleted_at'), userId: text('user_id'), }, @@ -219,6 +220,38 @@ export const modesTable = sqliteTable( ], ) +export const agentsTable = sqliteTable( + 'agents', + { + id: text('id').primaryKey(), + name: text('name'), + type: text('type', { enum: ['built-in', 'local', 'remote'] }), + transport: text('transport', { enum: ['in-process', 'stdio', 'websocket'] }), + command: text('command'), + args: text('args'), + url: text('url'), + authMethod: text('auth_method'), + icon: text('icon'), + isSystem: integer('is_system').default(0), + enabled: integer('enabled').default(1), + registryId: text('registry_id'), + installedVersion: text('installed_version'), + registryVersion: text('registry_version'), + distributionType: text('distribution_type'), + installPath: text('install_path'), + packageName: text('package_name'), + description: text('description'), + defaultHash: text('default_hash'), + deletedAt: text('deleted_at'), + userId: text('user_id'), + }, + (table) => [ + index('idx_agents_active') + .on(table.id) + .where(sql`${table.deletedAt} IS NULL`), + ], +) + /** Synced via PowerSync. No token. Used for device list and revoke access. */ export const devicesTable = sqliteTable('devices', { id: text('id').primaryKey(),