From 048eddd468e8d2e44001196e183dffe3150f28a7 Mon Sep 17 00:00:00 2001 From: Waleed Date: Tue, 13 Jan 2026 08:38:19 -0800 Subject: [PATCH 1/7] fix(ff): add back condition for isHosted FF (#2789) * fix(ff): add back condition for isHosted FF * updated callers to use getBaseUrl() --- .../components/deploy-modal/components/chat/chat.tsx | 4 ++-- .../components/deploy-modal/components/form/form.tsx | 3 +-- .../deploy/components/deploy-modal/deploy-modal.tsx | 10 +++++----- apps/sim/lib/core/config/feature-flags.ts | 6 ++++-- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/chat/chat.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/chat/chat.tsx index c150ad3ab6..e254c7ce1c 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/chat/chat.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/chat/chat.tsx @@ -23,7 +23,7 @@ import { Alert, AlertDescription, Skeleton } from '@/components/ui' import { getEnv, isTruthy } from '@/lib/core/config/env' import { generatePassword } from '@/lib/core/security/encryption' import { cn } from '@/lib/core/utils/cn' -import { getEmailDomain } from '@/lib/core/utils/urls' +import { getBaseUrl, getEmailDomain } from '@/lib/core/utils/urls' import { quickValidateEmail } from '@/lib/messaging/email/validation' import { OutputSelect } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select' import { @@ -493,7 +493,7 @@ function IdentifierInput({ onChange(lowercaseValue) } - const fullUrl = `${getEnv('NEXT_PUBLIC_APP_URL')}/chat/${value}` + const fullUrl = `${getBaseUrl()}/chat/${value}` const displayUrl = fullUrl.replace(/^https?:\/\//, '') return ( diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/form/form.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/form/form.tsx index 614950c91f..3f0fd9ea50 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/form/form.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/form/form.tsx @@ -14,7 +14,6 @@ import { Tooltip, } from '@/components/emcn' import { Skeleton } from '@/components/ui' -import { getEnv } from '@/lib/core/config/env' import { isDev } from '@/lib/core/config/feature-flags' import { cn } from '@/lib/core/utils/cn' import { getBaseUrl, getEmailDomain } from '@/lib/core/utils/urls' @@ -392,7 +391,7 @@ export function FormDeploy({ ) } - const fullUrl = `${getEnv('NEXT_PUBLIC_APP_URL')}/form/${identifier}` + const fullUrl = `${getBaseUrl()}/form/${identifier}` const displayUrl = fullUrl.replace(/^https?:\/\//, '') return ( diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/deploy-modal.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/deploy-modal.tsx index 1ca73f5fb3..38d3d10651 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/deploy-modal.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/deploy-modal.tsx @@ -15,7 +15,7 @@ import { ModalTabsList, ModalTabsTrigger, } from '@/components/emcn' -import { getEnv } from '@/lib/core/config/env' +import { getBaseUrl } from '@/lib/core/utils/urls' import { getInputFormatExample as getInputFormatExampleUtil } from '@/lib/workflows/operations/deployment-utils' import type { WorkflowDeploymentVersionResponse } from '@/lib/workflows/persistence/utils' import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider' @@ -209,7 +209,7 @@ export function DeployModal({ } const data = await response.json() - const endpoint = `${getEnv('NEXT_PUBLIC_APP_URL')}/api/workflows/${workflowId}/execute` + const endpoint = `${getBaseUrl()}/api/workflows/${workflowId}/execute` const inputFormatExample = getInputFormatExample(selectedStreamingOutputs.length > 0) const placeholderKey = workflowWorkspaceId ? 'YOUR_WORKSPACE_API_KEY' : 'YOUR_API_KEY' @@ -270,7 +270,7 @@ export function DeployModal({ const deploymentInfoResponse = await fetch(`/api/workflows/${workflowId}/deploy`) if (deploymentInfoResponse.ok) { const deploymentData = await deploymentInfoResponse.json() - const apiEndpoint = `${getEnv('NEXT_PUBLIC_APP_URL')}/api/workflows/${workflowId}/execute` + const apiEndpoint = `${getBaseUrl()}/api/workflows/${workflowId}/execute` const inputFormatExample = getInputFormatExample(selectedStreamingOutputs.length > 0) const placeholderKey = getApiHeaderPlaceholder() @@ -409,7 +409,7 @@ export function DeployModal({ const deploymentInfoResponse = await fetch(`/api/workflows/${workflowId}/deploy`) if (deploymentInfoResponse.ok) { const deploymentData = await deploymentInfoResponse.json() - const apiEndpoint = `${getEnv('NEXT_PUBLIC_APP_URL')}/api/workflows/${workflowId}/execute` + const apiEndpoint = `${getBaseUrl()}/api/workflows/${workflowId}/execute` const inputFormatExample = getInputFormatExample(selectedStreamingOutputs.length > 0) const placeholderKey = getApiHeaderPlaceholder() @@ -526,7 +526,7 @@ export function DeployModal({ const deploymentInfoResponse = await fetch(`/api/workflows/${workflowId}/deploy`) if (deploymentInfoResponse.ok) { const deploymentData = await deploymentInfoResponse.json() - const apiEndpoint = `${getEnv('NEXT_PUBLIC_APP_URL')}/api/workflows/${workflowId}/execute` + const apiEndpoint = `${getBaseUrl()}/api/workflows/${workflowId}/execute` const inputFormatExample = getInputFormatExample(selectedStreamingOutputs.length > 0) const placeholderKey = getApiHeaderPlaceholder() diff --git a/apps/sim/lib/core/config/feature-flags.ts b/apps/sim/lib/core/config/feature-flags.ts index 2a57e569da..8b8bf75411 100644 --- a/apps/sim/lib/core/config/feature-flags.ts +++ b/apps/sim/lib/core/config/feature-flags.ts @@ -1,7 +1,7 @@ /** * Environment utility functions for consistent environment detection across the application */ -import { env, isFalsy, isTruthy } from './env' +import { env, getEnv, isFalsy, isTruthy } from './env' /** * Is the application running in production mode @@ -21,7 +21,9 @@ export const isTest = env.NODE_ENV === 'test' /** * Is this the hosted version of the application */ -export const isHosted = true +export const isHosted = + getEnv('NEXT_PUBLIC_APP_URL') === 'https://www.sim.ai' || + getEnv('NEXT_PUBLIC_APP_URL') === 'https://www.staging.sim.ai' /** * Is billing enforcement enabled From c9068d043ebb6bf9692eaed8628f86ed0b765bda Mon Sep 17 00:00:00 2001 From: Waleed Date: Tue, 13 Jan 2026 10:31:25 -0800 Subject: [PATCH 2/7] chore(readme): trim readme, add more envvar info (#2791) --- README.md | 117 ++++-------------- .../content/docs/en/self-hosting/index.mdx | 7 ++ 2 files changed, 33 insertions(+), 91 deletions(-) diff --git a/README.md b/README.md index abd3ed66fb..23c10de618 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,10 @@ Documentation

+

+ Set Up with Cursor +

+ ### Build Workflows with Ease Design agent workflows visually on a canvas—connect agents, tools, and blocks, then run them instantly. @@ -60,17 +64,11 @@ Docker must be installed and running on your machine. ### Self-hosted: Docker Compose ```bash -# Clone the repository -git clone https://github.com/simstudioai/sim.git - -# Navigate to the project directory -cd sim - -# Start Sim +git clone https://github.com/simstudioai/sim.git && cd sim docker compose -f docker-compose.prod.yml up -d ``` -Access the application at [http://localhost:3000/](http://localhost:3000/) +Open [http://localhost:3000](http://localhost:3000) #### Using Local Models with Ollama @@ -91,33 +89,17 @@ docker compose -f docker-compose.ollama.yml exec ollama ollama pull llama3.1:8b #### Using an External Ollama Instance -If you already have Ollama running on your host machine (outside Docker), you need to configure the `OLLAMA_URL` to use `host.docker.internal` instead of `localhost`: +If Ollama is running on your host machine, use `host.docker.internal` instead of `localhost`: ```bash -# Docker Desktop (macOS/Windows) OLLAMA_URL=http://host.docker.internal:11434 docker compose -f docker-compose.prod.yml up -d - -# Linux (add extra_hosts or use host IP) -docker compose -f docker-compose.prod.yml up -d # Then set OLLAMA_URL to your host's IP ``` -**Why?** When running inside Docker, `localhost` refers to the container itself, not your host machine. `host.docker.internal` is a special DNS name that resolves to the host. - -For Linux users, you can either: -- Use your host machine's actual IP address (e.g., `http://192.168.1.100:11434`) -- Add `extra_hosts: ["host.docker.internal:host-gateway"]` to the simstudio service in your compose file +On Linux, use your host's IP address or add `extra_hosts: ["host.docker.internal:host-gateway"]` to the compose file. #### Using vLLM -Sim also supports [vLLM](https://docs.vllm.ai/) for self-hosted models with OpenAI-compatible API: - -```bash -# Set these environment variables -VLLM_BASE_URL=http://your-vllm-server:8000 -VLLM_API_KEY=your_optional_api_key # Only if your vLLM instance requires auth -``` - -When running with Docker, use `host.docker.internal` if vLLM is on your host machine (same as Ollama above). +Sim supports [vLLM](https://docs.vllm.ai/) for self-hosted models. Set `VLLM_BASE_URL` and optionally `VLLM_API_KEY` in your environment. ### Self-hosted: Dev Containers @@ -128,14 +110,9 @@ When running with Docker, use `host.docker.internal` if vLLM is on your host mac ### Self-hosted: Manual Setup -**Requirements:** -- [Bun](https://bun.sh/) runtime -- [Node.js](https://nodejs.org/) v20+ (required for sandboxed code execution) -- PostgreSQL 12+ with [pgvector extension](https://github.com/pgvector/pgvector) (required for AI embeddings) - -**Note:** Sim uses vector embeddings for AI features like knowledge bases and semantic search, which requires the `pgvector` PostgreSQL extension. +**Requirements:** [Bun](https://bun.sh/), [Node.js](https://nodejs.org/) v20+, PostgreSQL 12+ with [pgvector](https://github.com/pgvector/pgvector) -1. Clone and install dependencies: +1. Clone and install: ```bash git clone https://github.com/simstudioai/sim.git @@ -145,75 +122,33 @@ bun install 2. Set up PostgreSQL with pgvector: -You need PostgreSQL with the `vector` extension for embedding support. Choose one option: - -**Option A: Using Docker (Recommended)** -```bash -# Start PostgreSQL with pgvector extension -docker run --name simstudio-db \ - -e POSTGRES_PASSWORD=your_password \ - -e POSTGRES_DB=simstudio \ - -p 5432:5432 -d \ - pgvector/pgvector:pg17 -``` - -**Option B: Manual Installation** -- Install PostgreSQL 12+ and the pgvector extension -- See [pgvector installation guide](https://github.com/pgvector/pgvector#installation) - -3. Set up environment: - ```bash -cd apps/sim -cp .env.example .env # Configure with required variables (DATABASE_URL, BETTER_AUTH_SECRET, BETTER_AUTH_URL) +docker run --name simstudio-db -e POSTGRES_PASSWORD=your_password -e POSTGRES_DB=simstudio -p 5432:5432 -d pgvector/pgvector:pg17 ``` -Update your `.env` file with the database URL: -```bash -DATABASE_URL="postgresql://postgres:your_password@localhost:5432/simstudio" -``` - -4. Set up the database: - -First, configure the database package environment: -```bash -cd packages/db -cp .env.example .env -``` +Or install manually via the [pgvector guide](https://github.com/pgvector/pgvector#installation). -Update your `packages/db/.env` file with the database URL: -```bash -DATABASE_URL="postgresql://postgres:your_password@localhost:5432/simstudio" -``` +3. Configure environment: -Then run the migrations: ```bash -cd packages/db # Required so drizzle picks correct .env file -bunx drizzle-kit migrate --config=./drizzle.config.ts +cp apps/sim/.env.example apps/sim/.env +cp packages/db/.env.example packages/db/.env +# Edit both .env files to set DATABASE_URL="postgresql://postgres:your_password@localhost:5432/simstudio" ``` -5. Start the development servers: - -**Recommended approach - run both servers together (from project root):** +4. Run migrations: ```bash -bun run dev:full +cd packages/db && bunx drizzle-kit migrate --config=./drizzle.config.ts ``` -This starts both the main Next.js application and the realtime socket server required for full functionality. - -**Alternative - run servers separately:** +5. Start development servers: -Next.js app (from project root): ```bash -bun run dev +bun run dev:full # Starts both Next.js app and realtime socket server ``` -Realtime socket server (from `apps/sim` directory in a separate terminal): -```bash -cd apps/sim -bun run dev:sockets -``` +Or run separately: `bun run dev` (Next.js) and `cd apps/sim && bun run dev:sockets` (realtime). ## Copilot API Keys @@ -224,7 +159,7 @@ Copilot is a Sim-managed service. To use Copilot on a self-hosted instance: ## Environment Variables -Key environment variables for self-hosted deployments (see `apps/sim/.env.example` for full list): +Key environment variables for self-hosted deployments. See [`.env.example`](apps/sim/.env.example) for defaults or [`env.ts`](apps/sim/lib/core/config/env.ts) for the full list. | Variable | Required | Description | |----------|----------|-------------| @@ -232,9 +167,9 @@ Key environment variables for self-hosted deployments (see `apps/sim/.env.exampl | `BETTER_AUTH_SECRET` | Yes | Auth secret (`openssl rand -hex 32`) | | `BETTER_AUTH_URL` | Yes | Your app URL (e.g., `http://localhost:3000`) | | `NEXT_PUBLIC_APP_URL` | Yes | Public app URL (same as above) | -| `ENCRYPTION_KEY` | Yes | Encryption key (`openssl rand -hex 32`) | -| `OLLAMA_URL` | No | Ollama server URL (default: `http://localhost:11434`) | -| `VLLM_BASE_URL` | No | vLLM server URL for self-hosted models | +| `ENCRYPTION_KEY` | Yes | Encrypts environment variables (`openssl rand -hex 32`) | +| `INTERNAL_API_SECRET` | Yes | Encrypts internal API routes (`openssl rand -hex 32`) | +| `API_ENCRYPTION_KEY` | Yes | Encrypts API keys (`openssl rand -hex 32`) | | `COPILOT_API_KEY` | No | API key from sim.ai for Copilot features | ## Troubleshooting diff --git a/apps/docs/content/docs/en/self-hosting/index.mdx b/apps/docs/content/docs/en/self-hosting/index.mdx index 63ef741914..f86c94f896 100644 --- a/apps/docs/content/docs/en/self-hosting/index.mdx +++ b/apps/docs/content/docs/en/self-hosting/index.mdx @@ -8,6 +8,12 @@ import { Callout } from 'fumadocs-ui/components/callout' Deploy Sim Studio on your own infrastructure with Docker or Kubernetes. +
+ + Set Up with Cursor + +
+ ## Requirements | Resource | Minimum | Recommended | @@ -48,3 +54,4 @@ Open [http://localhost:3000](http://localhost:3000) | realtime | 3002 | WebSocket server | | db | 5432 | PostgreSQL with pgvector | | migrations | - | Database migrations (runs once) | + From 40a066f39cfa8036792a80709e7e921e9c6ab3f8 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Tue, 13 Jan 2026 10:47:32 -0800 Subject: [PATCH 3/7] improvement(FF): CI check to prevent hardcoding of FFs (#2790) * improvement(FF): CI check to prevent hardcoding of FFs * revert test change * add FF lint checks --- .github/workflows/test-build.yml | 35 ++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/.github/workflows/test-build.yml b/.github/workflows/test-build.yml index cd9f480b68..fdf21d76fe 100644 --- a/.github/workflows/test-build.yml +++ b/.github/workflows/test-build.yml @@ -38,6 +38,41 @@ jobs: - name: Install dependencies run: bun install --frozen-lockfile + - name: Validate feature flags + run: | + FILE="apps/sim/lib/core/config/feature-flags.ts" + ERRORS="" + + echo "Checking for hardcoded boolean feature flags..." + + # Use perl for multiline matching to catch both: + # export const isHosted = true + # export const isHosted = + # true + HARDCODED=$(perl -0777 -ne 'while (/export const (is[A-Za-z]+)\s*=\s*\n?\s*(true|false)\b/g) { print " $1 = $2\n" }' "$FILE") + + if [ -n "$HARDCODED" ]; then + ERRORS="${ERRORS}\n❌ Feature flags must not be hardcoded to boolean literals!\n\nFound hardcoded flags:\n${HARDCODED}\n\nFeature flags should derive their values from environment variables.\n" + fi + + echo "Checking feature flag naming conventions..." + + # Check that all export const (except functions) start with 'is' + # This finds exports like "export const someFlag" that don't start with "is" or "get" + BAD_NAMES=$(grep -E "^export const [a-z]" "$FILE" | grep -vE "^export const (is|get)" | sed 's/export const \([a-zA-Z]*\).*/ \1/') + + if [ -n "$BAD_NAMES" ]; then + ERRORS="${ERRORS}\n❌ Feature flags must use 'is' prefix for boolean flags!\n\nFound incorrectly named flags:\n${BAD_NAMES}\n\nExample: 'hostedMode' should be 'isHostedMode'\n" + fi + + if [ -n "$ERRORS" ]; then + echo "" + echo -e "$ERRORS" + exit 1 + fi + + echo "✅ All feature flags are properly configured" + - name: Lint code run: bun run lint:check From 2bc403972cfbfb8756c4334ce933ad533a2361ff Mon Sep 17 00:00:00 2001 From: Waleed Date: Tue, 13 Jan 2026 11:43:02 -0800 Subject: [PATCH 4/7] feat(a2a): added a2a protocol (#2784) * feat(a2a): a2a added * feat(a2a): added a2a protocol * remove migrations * readd migrations * consolidated permissions utils * consolidated tag-input, output select -> combobox, added tags for A2A * cleanup up utils, share same deployed state as other tabs * ack PR comments * more * updated code examples * solely rely on tanstack query to vend data and invalidate query key's, remove custom caching --------- Co-authored-by: Emir Karabeg --- apps/docs/components/icons.tsx | 25 + apps/docs/components/ui/icon-mapping.ts | 2 + apps/docs/content/docs/en/tools/a2a.mdx | 215 + apps/docs/content/docs/en/tools/meta.json | 1 + .../sim/app/api/a2a/agents/[agentId]/route.ts | 289 + apps/sim/app/api/a2a/agents/route.ts | 186 + apps/sim/app/api/a2a/serve/[agentId]/route.ts | 1263 ++ apps/sim/app/api/a2a/serve/[agentId]/utils.ts | 176 + apps/sim/app/api/memory/[id]/route.ts | 49 +- apps/sim/app/api/memory/route.ts | 77 +- .../app/api/tools/a2a/cancel-task/route.ts | 84 + .../a2a/delete-push-notification/route.ts | 94 + .../app/api/tools/a2a/get-agent-card/route.ts | 92 + .../tools/a2a/get-push-notification/route.ts | 115 + apps/sim/app/api/tools/a2a/get-task/route.ts | 95 + .../app/api/tools/a2a/resubscribe/route.ts | 119 + .../tools/a2a/send-message-stream/route.ts | 150 + .../app/api/tools/a2a/send-message/route.ts | 126 + .../tools/a2a/set-push-notification/route.ts | 93 + .../app/api/workflows/[id]/execute/route.ts | 5 +- apps/sim/app/api/workflows/route.ts | 12 +- .../app/api/workspaces/[id]/api-keys/route.ts | 8 +- .../api/workspaces/[id]/byok-keys/route.ts | 8 +- .../api/workspaces/[id]/environment/route.ts | 8 +- apps/sim/app/playground/page.tsx | 18 + .../w/[workflowId]/components/chat/chat.tsx | 2 +- .../output-select/output-select.tsx | 339 +- .../deploy-modal/components/a2a/a2a.tsx | 935 ++ .../deploy-modal/components/api/api.tsx | 45 +- .../deploy-modal/components/mcp/mcp.tsx | 41 +- .../components/template/template.tsx | 23 +- .../components/deploy-modal/deploy-modal.tsx | 229 +- .../a2a-push-notification-delivery.ts | 33 + apps/sim/blocks/blocks/a2a.ts | 306 + apps/sim/blocks/registry.ts | 2 + .../emcn/components/combobox/combobox.tsx | 14 +- .../emcn/components/tag-input/tag-input.tsx | 13 +- apps/sim/components/icons.tsx | 25 + apps/sim/hooks/queries/a2a/agents.ts | 307 + apps/sim/hooks/queries/a2a/tasks.ts | 262 + apps/sim/lib/a2a/agent-card.ts | 138 + apps/sim/lib/a2a/constants.ts | 28 + apps/sim/lib/a2a/index.ts | 83 + apps/sim/lib/a2a/push-notifications.ts | 114 + apps/sim/lib/a2a/types.ts | 142 + apps/sim/lib/a2a/utils.ts | 282 + apps/sim/lib/auth/hybrid.ts | 2 +- .../lib/webhooks/provider-subscriptions.ts | 1 - .../workflows/operations/deployment-utils.ts | 48 +- apps/sim/lib/workflows/utils.ts | 14 +- .../lib/workspaces/permissions/utils.test.ts | 209 + apps/sim/lib/workspaces/permissions/utils.ts | 116 +- apps/sim/package.json | 1 + apps/sim/tools/a2a/cancel_task.ts | 55 + .../sim/tools/a2a/delete_push_notification.ts | 60 + apps/sim/tools/a2a/get_agent_card.ts | 73 + apps/sim/tools/a2a/get_push_notification.ts | 77 + apps/sim/tools/a2a/get_task.ts | 72 + apps/sim/tools/a2a/index.ts | 21 + apps/sim/tools/a2a/resubscribe.ts | 87 + apps/sim/tools/a2a/send_message.ts | 72 + apps/sim/tools/a2a/send_message_stream.ts | 81 + apps/sim/tools/a2a/set_push_notification.ts | 92 + apps/sim/tools/a2a/types.ts | 135 + apps/sim/tools/registry.ts | 20 + bun.lock | 3 + packages/db/migrations/0139_late_cargill.sql | 61 + .../db/migrations/meta/0139_snapshot.json | 10245 ++++++++++++++++ packages/db/migrations/meta/_journal.json | 7 + packages/db/schema.ts | 147 + 70 files changed, 17889 insertions(+), 483 deletions(-) create mode 100644 apps/docs/content/docs/en/tools/a2a.mdx create mode 100644 apps/sim/app/api/a2a/agents/[agentId]/route.ts create mode 100644 apps/sim/app/api/a2a/agents/route.ts create mode 100644 apps/sim/app/api/a2a/serve/[agentId]/route.ts create mode 100644 apps/sim/app/api/a2a/serve/[agentId]/utils.ts create mode 100644 apps/sim/app/api/tools/a2a/cancel-task/route.ts create mode 100644 apps/sim/app/api/tools/a2a/delete-push-notification/route.ts create mode 100644 apps/sim/app/api/tools/a2a/get-agent-card/route.ts create mode 100644 apps/sim/app/api/tools/a2a/get-push-notification/route.ts create mode 100644 apps/sim/app/api/tools/a2a/get-task/route.ts create mode 100644 apps/sim/app/api/tools/a2a/resubscribe/route.ts create mode 100644 apps/sim/app/api/tools/a2a/send-message-stream/route.ts create mode 100644 apps/sim/app/api/tools/a2a/send-message/route.ts create mode 100644 apps/sim/app/api/tools/a2a/set-push-notification/route.ts create mode 100644 apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/a2a/a2a.tsx create mode 100644 apps/sim/background/a2a-push-notification-delivery.ts create mode 100644 apps/sim/blocks/blocks/a2a.ts create mode 100644 apps/sim/hooks/queries/a2a/agents.ts create mode 100644 apps/sim/hooks/queries/a2a/tasks.ts create mode 100644 apps/sim/lib/a2a/agent-card.ts create mode 100644 apps/sim/lib/a2a/constants.ts create mode 100644 apps/sim/lib/a2a/index.ts create mode 100644 apps/sim/lib/a2a/push-notifications.ts create mode 100644 apps/sim/lib/a2a/types.ts create mode 100644 apps/sim/lib/a2a/utils.ts create mode 100644 apps/sim/tools/a2a/cancel_task.ts create mode 100644 apps/sim/tools/a2a/delete_push_notification.ts create mode 100644 apps/sim/tools/a2a/get_agent_card.ts create mode 100644 apps/sim/tools/a2a/get_push_notification.ts create mode 100644 apps/sim/tools/a2a/get_task.ts create mode 100644 apps/sim/tools/a2a/index.ts create mode 100644 apps/sim/tools/a2a/resubscribe.ts create mode 100644 apps/sim/tools/a2a/send_message.ts create mode 100644 apps/sim/tools/a2a/send_message_stream.ts create mode 100644 apps/sim/tools/a2a/set_push_notification.ts create mode 100644 apps/sim/tools/a2a/types.ts create mode 100644 packages/db/migrations/0139_late_cargill.sql create mode 100644 packages/db/migrations/meta/0139_snapshot.json diff --git a/apps/docs/components/icons.tsx b/apps/docs/components/icons.tsx index 2c46036dbf..7addb30eaa 100644 --- a/apps/docs/components/icons.tsx +++ b/apps/docs/components/icons.tsx @@ -4078,6 +4078,31 @@ export function McpIcon(props: SVGProps) { ) } +export function A2AIcon(props: SVGProps) { + return ( + + + + + + + + + + ) +} + export function WordpressIcon(props: SVGProps) { return ( diff --git a/apps/docs/components/ui/icon-mapping.ts b/apps/docs/components/ui/icon-mapping.ts index 2258f7189e..fe03d578c1 100644 --- a/apps/docs/components/ui/icon-mapping.ts +++ b/apps/docs/components/ui/icon-mapping.ts @@ -4,6 +4,7 @@ import type { ComponentType, SVGProps } from 'react' import { + A2AIcon, AhrefsIcon, AirtableIcon, ApifyIcon, @@ -127,6 +128,7 @@ import { type IconComponent = ComponentType> export const blockTypeToIconMap: Record = { + a2a: A2AIcon, ahrefs: AhrefsIcon, airtable: AirtableIcon, apify: ApifyIcon, diff --git a/apps/docs/content/docs/en/tools/a2a.mdx b/apps/docs/content/docs/en/tools/a2a.mdx new file mode 100644 index 0000000000..558f1f907e --- /dev/null +++ b/apps/docs/content/docs/en/tools/a2a.mdx @@ -0,0 +1,215 @@ +--- +title: A2A +description: Interact with external A2A-compatible agents +--- + +import { BlockInfoCard } from "@/components/ui/block-info-card" + + + +{/* MANUAL-CONTENT-START:intro */} +The A2A (Agent-to-Agent) protocol enables Sim to interact with external AI agents and systems that implement A2A-compatible APIs. With A2A, you can connect Sim’s automations and workflows to remote agents—such as LLM-powered bots, microservices, and other AI-based tools—using a standardized messaging format. + +Using the A2A tools in Sim, you can: + +- **Send Messages to External Agents**: Communicate directly with remote agents, providing prompts, commands, or data. +- **Receive and Stream Responses**: Get structured responses, artifacts, or real-time updates from the agent as the task progresses. +- **Continue Conversations or Tasks**: Carry on multi-turn conversations or workflows by referencing task and context IDs. +- **Integrate Third-Party AI and Automation**: Leverage external A2A-compatible services as part of your Sim workflows. + +These features allow you to build advanced workflows that combine Sim’s native capabilities with the intelligence and automation of external AIs or custom agents. To use A2A integrations, you’ll need the external agent’s endpoint URL and, if required, an API key or credentials. +{/* MANUAL-CONTENT-END */} + + +## Usage Instructions + +Use the A2A (Agent-to-Agent) protocol to interact with external AI agents. + + + +## Tools + +### `a2a_send_message` + +Send a message to an external A2A-compatible agent. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `message` | string | Yes | Message to send to the agent | +| `taskId` | string | No | Task ID for continuing an existing task | +| `contextId` | string | No | Context ID for conversation continuity | +| `apiKey` | string | No | API key for authentication | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `content` | string | The text response from the agent | +| `taskId` | string | Task ID for follow-up interactions | +| `contextId` | string | Context ID for conversation continuity | +| `state` | string | Task state | +| `artifacts` | array | Structured output artifacts | +| `history` | array | Full message history | + +### `a2a_get_task` + +Query the status of an existing A2A task. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `taskId` | string | Yes | Task ID to query | +| `apiKey` | string | No | API key for authentication | +| `historyLength` | number | No | Number of history messages to include | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `taskId` | string | Task ID | +| `contextId` | string | Context ID | +| `state` | string | Task state | +| `artifacts` | array | Output artifacts | +| `history` | array | Message history | + +### `a2a_cancel_task` + +Cancel a running A2A task. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `taskId` | string | Yes | Task ID to cancel | +| `apiKey` | string | No | API key for authentication | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `cancelled` | boolean | Whether cancellation was successful | +| `state` | string | Task state after cancellation | + +### `a2a_get_agent_card` + +Fetch the Agent Card (discovery document) for an A2A agent. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `apiKey` | string | No | API key for authentication \(if required\) | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `name` | string | Agent name | +| `description` | string | Agent description | +| `url` | string | Agent endpoint URL | +| `version` | string | Agent version | +| `capabilities` | object | Agent capabilities \(streaming, pushNotifications, etc.\) | +| `skills` | array | Skills the agent can perform | +| `defaultInputModes` | array | Default input modes \(text, file, data\) | +| `defaultOutputModes` | array | Default output modes \(text, file, data\) | + +### `a2a_resubscribe` + +Reconnect to an ongoing A2A task stream after connection interruption. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `taskId` | string | Yes | Task ID to resubscribe to | +| `apiKey` | string | No | API key for authentication | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `taskId` | string | Task ID | +| `contextId` | string | Context ID | +| `state` | string | Current task state | +| `isRunning` | boolean | Whether the task is still running | +| `artifacts` | array | Output artifacts | +| `history` | array | Message history | + +### `a2a_set_push_notification` + +Configure a webhook to receive task update notifications. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `taskId` | string | Yes | Task ID to configure notifications for | +| `webhookUrl` | string | Yes | HTTPS webhook URL to receive notifications | +| `token` | string | No | Token for webhook validation | +| `apiKey` | string | No | API key for authentication | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `url` | string | Configured webhook URL | +| `token` | string | Token for webhook validation | +| `success` | boolean | Whether configuration was successful | + +### `a2a_get_push_notification` + +Get the push notification webhook configuration for a task. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `taskId` | string | Yes | Task ID to get notification config for | +| `apiKey` | string | No | API key for authentication | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `url` | string | Configured webhook URL | +| `token` | string | Token for webhook validation | +| `exists` | boolean | Whether a push notification config exists | + +### `a2a_delete_push_notification` + +Delete the push notification webhook configuration for a task. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `taskId` | string | Yes | Task ID to delete notification config for | +| `pushNotificationConfigId` | string | No | Push notification configuration ID to delete \(optional - server can derive from taskId\) | +| `apiKey` | string | No | API key for authentication | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `success` | boolean | Whether deletion was successful | + + + +## Notes + +- Category: `tools` +- Type: `a2a` diff --git a/apps/docs/content/docs/en/tools/meta.json b/apps/docs/content/docs/en/tools/meta.json index e489efd205..ea445d4488 100644 --- a/apps/docs/content/docs/en/tools/meta.json +++ b/apps/docs/content/docs/en/tools/meta.json @@ -1,6 +1,7 @@ { "pages": [ "index", + "a2a", "ahrefs", "airtable", "apify", diff --git a/apps/sim/app/api/a2a/agents/[agentId]/route.ts b/apps/sim/app/api/a2a/agents/[agentId]/route.ts new file mode 100644 index 0000000000..74c13af879 --- /dev/null +++ b/apps/sim/app/api/a2a/agents/[agentId]/route.ts @@ -0,0 +1,289 @@ +import { db } from '@sim/db' +import { a2aAgent, workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { generateAgentCard, generateSkillsFromWorkflow } from '@/lib/a2a/agent-card' +import type { AgentCapabilities, AgentSkill } from '@/lib/a2a/types' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { getRedisClient } from '@/lib/core/config/redis' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' + +const logger = createLogger('A2AAgentCardAPI') + +export const dynamic = 'force-dynamic' + +interface RouteParams { + agentId: string +} + +/** + * GET - Returns the Agent Card for discovery + */ +export async function GET(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + const [agent] = await db + .select({ + agent: a2aAgent, + workflow: workflow, + }) + .from(a2aAgent) + .innerJoin(workflow, eq(a2aAgent.workflowId, workflow.id)) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!agent) { + return NextResponse.json({ error: 'Agent not found' }, { status: 404 }) + } + + if (!agent.agent.isPublished) { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success) { + return NextResponse.json({ error: 'Agent not published' }, { status: 404 }) + } + } + + const agentCard = generateAgentCard( + { + id: agent.agent.id, + name: agent.agent.name, + description: agent.agent.description, + version: agent.agent.version, + capabilities: agent.agent.capabilities as AgentCapabilities, + skills: agent.agent.skills as AgentSkill[], + }, + { + id: agent.workflow.id, + name: agent.workflow.name, + description: agent.workflow.description, + } + ) + + return NextResponse.json(agentCard, { + headers: { + 'Content-Type': 'application/json', + 'Cache-Control': agent.agent.isPublished ? 'public, max-age=3600' : 'private, no-cache', + }, + }) + } catch (error) { + logger.error('Error getting Agent Card:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * PUT - Update an agent + */ +export async function PUT(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const [existingAgent] = await db + .select() + .from(a2aAgent) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!existingAgent) { + return NextResponse.json({ error: 'Agent not found' }, { status: 404 }) + } + + const body = await request.json() + + if ( + body.skillTags !== undefined && + (!Array.isArray(body.skillTags) || + !body.skillTags.every((tag: unknown): tag is string => typeof tag === 'string')) + ) { + return NextResponse.json({ error: 'skillTags must be an array of strings' }, { status: 400 }) + } + + let skills = body.skills ?? existingAgent.skills + if (body.skillTags !== undefined) { + const agentName = body.name ?? existingAgent.name + const agentDescription = body.description ?? existingAgent.description + skills = generateSkillsFromWorkflow(agentName, agentDescription, body.skillTags) + } + + const [updatedAgent] = await db + .update(a2aAgent) + .set({ + name: body.name ?? existingAgent.name, + description: body.description ?? existingAgent.description, + version: body.version ?? existingAgent.version, + capabilities: body.capabilities ?? existingAgent.capabilities, + skills, + authentication: body.authentication ?? existingAgent.authentication, + isPublished: body.isPublished ?? existingAgent.isPublished, + publishedAt: + body.isPublished && !existingAgent.isPublished ? new Date() : existingAgent.publishedAt, + updatedAt: new Date(), + }) + .where(eq(a2aAgent.id, agentId)) + .returning() + + logger.info(`Updated A2A agent: ${agentId}`) + + return NextResponse.json({ success: true, agent: updatedAgent }) + } catch (error) { + logger.error('Error updating agent:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * DELETE - Delete an agent + */ +export async function DELETE(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const [existingAgent] = await db + .select() + .from(a2aAgent) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!existingAgent) { + return NextResponse.json({ error: 'Agent not found' }, { status: 404 }) + } + + await db.delete(a2aAgent).where(eq(a2aAgent.id, agentId)) + + logger.info(`Deleted A2A agent: ${agentId}`) + + return NextResponse.json({ success: true }) + } catch (error) { + logger.error('Error deleting agent:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * POST - Publish/unpublish an agent + */ +export async function POST(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + logger.warn('A2A agent publish auth failed:', { error: auth.error, hasUserId: !!auth.userId }) + return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 }) + } + + const [existingAgent] = await db + .select() + .from(a2aAgent) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!existingAgent) { + return NextResponse.json({ error: 'Agent not found' }, { status: 404 }) + } + + const body = await request.json() + const action = body.action as 'publish' | 'unpublish' | 'refresh' + + if (action === 'publish') { + const [wf] = await db + .select({ isDeployed: workflow.isDeployed }) + .from(workflow) + .where(eq(workflow.id, existingAgent.workflowId)) + .limit(1) + + if (!wf?.isDeployed) { + return NextResponse.json( + { error: 'Workflow must be deployed before publishing agent' }, + { status: 400 } + ) + } + + await db + .update(a2aAgent) + .set({ + isPublished: true, + publishedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(a2aAgent.id, agentId)) + + const redis = getRedisClient() + if (redis) { + try { + await redis.del(`a2a:agent:${agentId}:card`) + } catch (err) { + logger.warn('Failed to invalidate agent card cache', { agentId, error: err }) + } + } + + logger.info(`Published A2A agent: ${agentId}`) + return NextResponse.json({ success: true, isPublished: true }) + } + + if (action === 'unpublish') { + await db + .update(a2aAgent) + .set({ + isPublished: false, + updatedAt: new Date(), + }) + .where(eq(a2aAgent.id, agentId)) + + const redis = getRedisClient() + if (redis) { + try { + await redis.del(`a2a:agent:${agentId}:card`) + } catch (err) { + logger.warn('Failed to invalidate agent card cache', { agentId, error: err }) + } + } + + logger.info(`Unpublished A2A agent: ${agentId}`) + return NextResponse.json({ success: true, isPublished: false }) + } + + if (action === 'refresh') { + const workflowData = await loadWorkflowFromNormalizedTables(existingAgent.workflowId) + if (!workflowData) { + return NextResponse.json({ error: 'Failed to load workflow' }, { status: 500 }) + } + + const [wf] = await db + .select({ name: workflow.name, description: workflow.description }) + .from(workflow) + .where(eq(workflow.id, existingAgent.workflowId)) + .limit(1) + + const skills = generateSkillsFromWorkflow(wf?.name || existingAgent.name, wf?.description) + + await db + .update(a2aAgent) + .set({ + skills, + updatedAt: new Date(), + }) + .where(eq(a2aAgent.id, agentId)) + + logger.info(`Refreshed skills for A2A agent: ${agentId}`) + return NextResponse.json({ success: true, skills }) + } + + return NextResponse.json({ error: 'Invalid action' }, { status: 400 }) + } catch (error) { + logger.error('Error with agent action:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} diff --git a/apps/sim/app/api/a2a/agents/route.ts b/apps/sim/app/api/a2a/agents/route.ts new file mode 100644 index 0000000000..e4229ea1e4 --- /dev/null +++ b/apps/sim/app/api/a2a/agents/route.ts @@ -0,0 +1,186 @@ +/** + * A2A Agents List Endpoint + * + * List and create A2A agents for a workspace. + */ + +import { db } from '@sim/db' +import { a2aAgent, workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq, sql } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { v4 as uuidv4 } from 'uuid' +import { generateSkillsFromWorkflow } from '@/lib/a2a/agent-card' +import { A2A_DEFAULT_CAPABILITIES } from '@/lib/a2a/constants' +import { sanitizeAgentName } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' +import { hasValidStartBlockInState } from '@/lib/workflows/triggers/trigger-utils' +import { getWorkspaceById } from '@/lib/workspaces/permissions/utils' + +const logger = createLogger('A2AAgentsAPI') + +export const dynamic = 'force-dynamic' + +/** + * GET - List all A2A agents for a workspace + */ +export async function GET(request: NextRequest) { + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const { searchParams } = new URL(request.url) + const workspaceId = searchParams.get('workspaceId') + + if (!workspaceId) { + return NextResponse.json({ error: 'workspaceId is required' }, { status: 400 }) + } + + const ws = await getWorkspaceById(workspaceId) + if (!ws) { + return NextResponse.json({ error: 'Workspace not found' }, { status: 404 }) + } + + const agents = await db + .select({ + id: a2aAgent.id, + workspaceId: a2aAgent.workspaceId, + workflowId: a2aAgent.workflowId, + name: a2aAgent.name, + description: a2aAgent.description, + version: a2aAgent.version, + capabilities: a2aAgent.capabilities, + skills: a2aAgent.skills, + authentication: a2aAgent.authentication, + isPublished: a2aAgent.isPublished, + publishedAt: a2aAgent.publishedAt, + createdAt: a2aAgent.createdAt, + updatedAt: a2aAgent.updatedAt, + workflowName: workflow.name, + workflowDescription: workflow.description, + isDeployed: workflow.isDeployed, + taskCount: sql`( + SELECT COUNT(*)::int + FROM "a2a_task" + WHERE "a2a_task"."agent_id" = "a2a_agent"."id" + )`.as('task_count'), + }) + .from(a2aAgent) + .leftJoin(workflow, eq(a2aAgent.workflowId, workflow.id)) + .where(eq(a2aAgent.workspaceId, workspaceId)) + .orderBy(a2aAgent.createdAt) + + logger.info(`Listed ${agents.length} A2A agents for workspace ${workspaceId}`) + + return NextResponse.json({ success: true, agents }) + } catch (error) { + logger.error('Error listing agents:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * POST - Create a new A2A agent from a workflow + */ +export async function POST(request: NextRequest) { + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const body = await request.json() + const { workspaceId, workflowId, name, description, capabilities, authentication, skillTags } = + body + + if (!workspaceId || !workflowId) { + return NextResponse.json( + { error: 'workspaceId and workflowId are required' }, + { status: 400 } + ) + } + + const [wf] = await db + .select({ + id: workflow.id, + name: workflow.name, + description: workflow.description, + workspaceId: workflow.workspaceId, + isDeployed: workflow.isDeployed, + }) + .from(workflow) + .where(and(eq(workflow.id, workflowId), eq(workflow.workspaceId, workspaceId))) + .limit(1) + + if (!wf) { + return NextResponse.json( + { error: 'Workflow not found or does not belong to workspace' }, + { status: 404 } + ) + } + + const workflowData = await loadWorkflowFromNormalizedTables(workflowId) + if (!workflowData || !hasValidStartBlockInState(workflowData)) { + return NextResponse.json( + { error: 'Workflow must have a Start block to be exposed as an A2A agent' }, + { status: 400 } + ) + } + + const [existing] = await db + .select({ id: a2aAgent.id }) + .from(a2aAgent) + .where(and(eq(a2aAgent.workspaceId, workspaceId), eq(a2aAgent.workflowId, workflowId))) + .limit(1) + + if (existing) { + return NextResponse.json( + { error: 'An agent already exists for this workflow' }, + { status: 409 } + ) + } + + const skills = generateSkillsFromWorkflow( + name || wf.name, + description || wf.description, + skillTags + ) + + const agentId = uuidv4() + const agentName = name || sanitizeAgentName(wf.name) + + const [agent] = await db + .insert(a2aAgent) + .values({ + id: agentId, + workspaceId, + workflowId, + createdBy: auth.userId, + name: agentName, + description: description || wf.description, + version: '1.0.0', + capabilities: { + ...A2A_DEFAULT_CAPABILITIES, + ...capabilities, + }, + skills, + authentication: authentication || { + schemes: ['bearer', 'apiKey'], + }, + isPublished: false, + createdAt: new Date(), + updatedAt: new Date(), + }) + .returning() + + logger.info(`Created A2A agent ${agentId} for workflow ${workflowId}`) + + return NextResponse.json({ success: true, agent }, { status: 201 }) + } catch (error) { + logger.error('Error creating agent:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} diff --git a/apps/sim/app/api/a2a/serve/[agentId]/route.ts b/apps/sim/app/api/a2a/serve/[agentId]/route.ts new file mode 100644 index 0000000000..dfd1f4e8f2 --- /dev/null +++ b/apps/sim/app/api/a2a/serve/[agentId]/route.ts @@ -0,0 +1,1263 @@ +import type { Artifact, Message, PushNotificationConfig, TaskState } from '@a2a-js/sdk' +import { db } from '@sim/db' +import { a2aAgent, a2aPushNotificationConfig, a2aTask, workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { v4 as uuidv4 } from 'uuid' +import { A2A_DEFAULT_TIMEOUT, A2A_MAX_HISTORY_LENGTH } from '@/lib/a2a/constants' +import { notifyTaskStateChange } from '@/lib/a2a/push-notifications' +import { + createAgentMessage, + extractWorkflowInput, + isTerminalState, + parseWorkflowSSEChunk, +} from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { getBrandConfig } from '@/lib/branding/branding' +import { acquireLock, getRedisClient, releaseLock } from '@/lib/core/config/redis' +import { SSE_HEADERS } from '@/lib/core/utils/sse' +import { getBaseUrl } from '@/lib/core/utils/urls' +import { markExecutionCancelled } from '@/lib/execution/cancellation' +import { + A2A_ERROR_CODES, + A2A_METHODS, + buildExecuteRequest, + buildTaskResponse, + createError, + createResponse, + extractAgentContent, + formatTaskResponse, + generateTaskId, + isJSONRPCRequest, + type MessageSendParams, + type PushNotificationSetParams, + type TaskIdParams, +} from '@/app/api/a2a/serve/[agentId]/utils' + +const logger = createLogger('A2AServeAPI') + +export const dynamic = 'force-dynamic' +export const runtime = 'nodejs' + +interface RouteParams { + agentId: string +} + +/** + * GET - Returns the Agent Card (discovery document) + */ +export async function GET(_request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + const redis = getRedisClient() + const cacheKey = `a2a:agent:${agentId}:card` + + if (redis) { + try { + const cached = await redis.get(cacheKey) + if (cached) { + return NextResponse.json(JSON.parse(cached), { + headers: { + 'Content-Type': 'application/json', + 'Cache-Control': 'private, max-age=60', + 'X-Cache': 'HIT', + }, + }) + } + } catch (err) { + logger.warn('Redis cache read failed', { agentId, error: err }) + } + } + + try { + const [agent] = await db + .select({ + id: a2aAgent.id, + name: a2aAgent.name, + description: a2aAgent.description, + version: a2aAgent.version, + capabilities: a2aAgent.capabilities, + skills: a2aAgent.skills, + authentication: a2aAgent.authentication, + isPublished: a2aAgent.isPublished, + }) + .from(a2aAgent) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!agent) { + return NextResponse.json({ error: 'Agent not found' }, { status: 404 }) + } + + if (!agent.isPublished) { + return NextResponse.json({ error: 'Agent not published' }, { status: 404 }) + } + + const baseUrl = getBaseUrl() + const brandConfig = getBrandConfig() + + const authConfig = agent.authentication as { schemes?: string[] } | undefined + const schemes = authConfig?.schemes || [] + const isPublic = schemes.includes('none') + + const agentCard = { + protocolVersion: '0.3.0', + name: agent.name, + description: agent.description || '', + url: `${baseUrl}/api/a2a/serve/${agent.id}`, + version: agent.version, + preferredTransport: 'JSONRPC', + documentationUrl: `${baseUrl}/docs/a2a`, + provider: { + organization: brandConfig.name, + url: baseUrl, + }, + capabilities: agent.capabilities, + skills: agent.skills || [], + ...(isPublic + ? {} + : { + securitySchemes: { + apiKey: { + type: 'apiKey' as const, + name: 'X-API-Key', + in: 'header' as const, + description: 'API key authentication', + }, + }, + security: [{ apiKey: [] }], + }), + defaultInputModes: ['text/plain', 'application/json'], + defaultOutputModes: ['text/plain', 'application/json'], + } + + if (redis) { + try { + await redis.set(cacheKey, JSON.stringify(agentCard), 'EX', 60) + } catch (err) { + logger.warn('Redis cache write failed', { agentId, error: err }) + } + } + + return NextResponse.json(agentCard, { + headers: { + 'Content-Type': 'application/json', + 'Cache-Control': 'private, max-age=60', + 'X-Cache': 'MISS', + }, + }) + } catch (error) { + logger.error('Error getting Agent Card:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * POST - Handle JSON-RPC requests + */ +export async function POST(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + const [agent] = await db + .select({ + id: a2aAgent.id, + name: a2aAgent.name, + workflowId: a2aAgent.workflowId, + workspaceId: a2aAgent.workspaceId, + isPublished: a2aAgent.isPublished, + capabilities: a2aAgent.capabilities, + authentication: a2aAgent.authentication, + }) + .from(a2aAgent) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!agent) { + return NextResponse.json( + createError(null, A2A_ERROR_CODES.AGENT_UNAVAILABLE, 'Agent not found'), + { status: 404 } + ) + } + + if (!agent.isPublished) { + return NextResponse.json( + createError(null, A2A_ERROR_CODES.AGENT_UNAVAILABLE, 'Agent not published'), + { status: 404 } + ) + } + + const authSchemes = (agent.authentication as { schemes?: string[] })?.schemes || [] + const requiresAuth = !authSchemes.includes('none') + + if (requiresAuth) { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json( + createError(null, A2A_ERROR_CODES.AUTHENTICATION_REQUIRED, 'Unauthorized'), + { status: 401 } + ) + } + } + + const [wf] = await db + .select({ isDeployed: workflow.isDeployed }) + .from(workflow) + .where(eq(workflow.id, agent.workflowId)) + .limit(1) + + if (!wf?.isDeployed) { + return NextResponse.json( + createError(null, A2A_ERROR_CODES.AGENT_UNAVAILABLE, 'Workflow is not deployed'), + { status: 400 } + ) + } + + const body = await request.json() + + if (!isJSONRPCRequest(body)) { + return NextResponse.json( + createError(null, A2A_ERROR_CODES.INVALID_REQUEST, 'Invalid JSON-RPC request'), + { status: 400 } + ) + } + + const { id, method, params: rpcParams } = body + const apiKey = request.headers.get('X-API-Key') + + logger.info(`A2A request: ${method} for agent ${agentId}`) + + switch (method) { + case A2A_METHODS.MESSAGE_SEND: + return handleMessageSend(id, agent, rpcParams as MessageSendParams, apiKey) + + case A2A_METHODS.MESSAGE_STREAM: + return handleMessageStream(request, id, agent, rpcParams as MessageSendParams, apiKey) + + case A2A_METHODS.TASKS_GET: + return handleTaskGet(id, rpcParams as TaskIdParams) + + case A2A_METHODS.TASKS_CANCEL: + return handleTaskCancel(id, rpcParams as TaskIdParams) + + case A2A_METHODS.TASKS_RESUBSCRIBE: + return handleTaskResubscribe(request, id, rpcParams as TaskIdParams) + + case A2A_METHODS.PUSH_NOTIFICATION_SET: + return handlePushNotificationSet(id, rpcParams as PushNotificationSetParams) + + case A2A_METHODS.PUSH_NOTIFICATION_GET: + return handlePushNotificationGet(id, rpcParams as TaskIdParams) + + case A2A_METHODS.PUSH_NOTIFICATION_DELETE: + return handlePushNotificationDelete(id, rpcParams as TaskIdParams) + + default: + return NextResponse.json( + createError(id, A2A_ERROR_CODES.METHOD_NOT_FOUND, `Method not found: ${method}`), + { status: 404 } + ) + } + } catch (error) { + logger.error('Error handling A2A request:', error) + return NextResponse.json(createError(null, A2A_ERROR_CODES.INTERNAL_ERROR, 'Internal error'), { + status: 500, + }) + } +} + +/** + * Handle message/send - Send a message (v0.3) + */ +async function handleMessageSend( + id: string | number, + agent: { + id: string + name: string + workflowId: string + workspaceId: string + }, + params: MessageSendParams, + apiKey?: string | null +): Promise { + if (!params?.message) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Message is required'), + { status: 400 } + ) + } + + const message = params.message + const taskId = message.taskId || generateTaskId() + const contextId = message.contextId || uuidv4() + + // Distributed lock to prevent concurrent task processing + const lockKey = `a2a:task:${taskId}:lock` + const lockValue = uuidv4() + const acquired = await acquireLock(lockKey, lockValue, 60) + + if (!acquired) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INTERNAL_ERROR, 'Task is currently being processed'), + { status: 409 } + ) + } + + try { + let existingTask: typeof a2aTask.$inferSelect | null = null + if (message.taskId) { + const [found] = await db.select().from(a2aTask).where(eq(a2aTask.id, message.taskId)).limit(1) + existingTask = found || null + + if (!existingTask) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), + { status: 404 } + ) + } + + if (isTerminalState(existingTask.status as TaskState)) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.TASK_ALREADY_COMPLETE, 'Task already in terminal state'), + { status: 400 } + ) + } + } + + const history: Message[] = existingTask?.messages ? (existingTask.messages as Message[]) : [] + + history.push(message) + + if (history.length > A2A_MAX_HISTORY_LENGTH) { + history.splice(0, history.length - A2A_MAX_HISTORY_LENGTH) + } + + if (existingTask) { + await db + .update(a2aTask) + .set({ + status: 'working', + messages: history, + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + } else { + await db.insert(a2aTask).values({ + id: taskId, + agentId: agent.id, + sessionId: contextId || null, + status: 'working', + messages: history, + metadata: {}, + createdAt: new Date(), + updatedAt: new Date(), + }) + } + + const { + url: executeUrl, + headers, + useInternalAuth, + } = await buildExecuteRequest({ + workflowId: agent.workflowId, + apiKey, + }) + + logger.info(`Executing workflow ${agent.workflowId} for A2A task ${taskId}`) + + try { + const workflowInput = extractWorkflowInput(message) + if (!workflowInput) { + return NextResponse.json( + createError( + id, + A2A_ERROR_CODES.INVALID_PARAMS, + 'Message must contain at least one part with content' + ), + { status: 400 } + ) + } + + const response = await fetch(executeUrl, { + method: 'POST', + headers, + body: JSON.stringify({ + ...workflowInput, + triggerType: 'api', + ...(useInternalAuth && { workflowId: agent.workflowId }), + }), + signal: AbortSignal.timeout(A2A_DEFAULT_TIMEOUT), + }) + + const executeResult = await response.json() + + const finalState: TaskState = response.ok ? 'completed' : 'failed' + + const agentContent = extractAgentContent(executeResult) + const agentMessage = createAgentMessage(agentContent) + agentMessage.taskId = taskId + if (contextId) agentMessage.contextId = contextId + history.push(agentMessage) + + const artifacts = executeResult.output?.artifacts || [] + + await db + .update(a2aTask) + .set({ + status: finalState, + messages: history, + artifacts, + executionId: executeResult.metadata?.executionId, + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + + if (isTerminalState(finalState)) { + notifyTaskStateChange(taskId, finalState).catch((err) => { + logger.error('Failed to trigger push notification', { taskId, error: err }) + }) + } + + const task = buildTaskResponse({ + taskId, + contextId, + state: finalState, + history, + artifacts, + }) + + return NextResponse.json(createResponse(id, task)) + } catch (error) { + const isTimeout = error instanceof Error && error.name === 'TimeoutError' + logger.error(`Error executing workflow for task ${taskId}:`, { error, isTimeout }) + + const errorMessage = isTimeout + ? `Workflow execution timed out after ${A2A_DEFAULT_TIMEOUT}ms` + : error instanceof Error + ? error.message + : 'Workflow execution failed' + + await db + .update(a2aTask) + .set({ + status: 'failed', + updatedAt: new Date(), + completedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + + notifyTaskStateChange(taskId, 'failed').catch((err) => { + logger.error('Failed to trigger push notification for failure', { taskId, error: err }) + }) + + return NextResponse.json(createError(id, A2A_ERROR_CODES.INTERNAL_ERROR, errorMessage), { + status: 500, + }) + } + } finally { + await releaseLock(lockKey, lockValue) + } +} + +/** + * Handle message/stream - Stream a message response (v0.3) + */ +async function handleMessageStream( + _request: NextRequest, + id: string | number, + agent: { + id: string + name: string + workflowId: string + workspaceId: string + }, + params: MessageSendParams, + apiKey?: string | null +): Promise { + if (!params?.message) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Message is required'), + { status: 400 } + ) + } + + const message = params.message + const contextId = message.contextId || uuidv4() + const taskId = message.taskId || generateTaskId() + + // Distributed lock to prevent concurrent task processing + const lockKey = `a2a:task:${taskId}:lock` + const lockValue = uuidv4() + const acquired = await acquireLock(lockKey, lockValue, 300) + + if (!acquired) { + const encoder = new TextEncoder() + const errorStream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode( + `event: error\ndata: ${JSON.stringify({ code: A2A_ERROR_CODES.INTERNAL_ERROR, message: 'Task is currently being processed' })}\n\n` + ) + ) + controller.close() + }, + }) + return new NextResponse(errorStream, { headers: SSE_HEADERS }) + } + + let history: Message[] = [] + let existingTask: typeof a2aTask.$inferSelect | null = null + + if (message.taskId) { + const [found] = await db.select().from(a2aTask).where(eq(a2aTask.id, message.taskId)).limit(1) + existingTask = found || null + + if (!existingTask) { + await releaseLock(lockKey, lockValue) + return NextResponse.json(createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + if (isTerminalState(existingTask.status as TaskState)) { + await releaseLock(lockKey, lockValue) + return NextResponse.json( + createError(id, A2A_ERROR_CODES.TASK_ALREADY_COMPLETE, 'Task already in terminal state'), + { status: 400 } + ) + } + + history = existingTask.messages as Message[] + } + + history.push(message) + + if (history.length > A2A_MAX_HISTORY_LENGTH) { + history.splice(0, history.length - A2A_MAX_HISTORY_LENGTH) + } + + if (existingTask) { + await db + .update(a2aTask) + .set({ + status: 'working', + messages: history, + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + } else { + await db.insert(a2aTask).values({ + id: taskId, + agentId: agent.id, + sessionId: contextId || null, + status: 'working', + messages: history, + metadata: {}, + createdAt: new Date(), + updatedAt: new Date(), + }) + } + + const encoder = new TextEncoder() + + const stream = new ReadableStream({ + async start(controller) { + const sendEvent = (event: string, data: unknown) => { + try { + const jsonRpcResponse = { + jsonrpc: '2.0' as const, + id, + result: data, + } + controller.enqueue( + encoder.encode(`event: ${event}\ndata: ${JSON.stringify(jsonRpcResponse)}\n\n`) + ) + } catch (error) { + logger.error('Error sending SSE event:', error) + } + } + + sendEvent('status', { + kind: 'status', + taskId, + contextId, + status: { state: 'working', timestamp: new Date().toISOString() }, + }) + + try { + const { + url: executeUrl, + headers, + useInternalAuth, + } = await buildExecuteRequest({ + workflowId: agent.workflowId, + apiKey, + stream: true, + }) + + const workflowInput = extractWorkflowInput(message) + if (!workflowInput) { + sendEvent('error', { + code: A2A_ERROR_CODES.INVALID_PARAMS, + message: 'Message must contain at least one part with content', + }) + await releaseLock(lockKey, lockValue) + controller.close() + return + } + + const response = await fetch(executeUrl, { + method: 'POST', + headers, + body: JSON.stringify({ + ...workflowInput, + triggerType: 'api', + stream: true, + ...(useInternalAuth && { workflowId: agent.workflowId }), + }), + signal: AbortSignal.timeout(A2A_DEFAULT_TIMEOUT), + }) + + if (!response.ok) { + let errorMessage = 'Workflow execution failed' + try { + const errorResult = await response.json() + errorMessage = errorResult.error || errorMessage + } catch { + // Response may not be JSON + } + throw new Error(errorMessage) + } + + const contentType = response.headers.get('content-type') || '' + const isStreamingResponse = + contentType.includes('text/event-stream') || contentType.includes('text/plain') + + if (response.body && isStreamingResponse) { + const reader = response.body.getReader() + const decoder = new TextDecoder() + let accumulatedContent = '' + let finalContent: string | undefined + + while (true) { + const { done, value } = await reader.read() + if (done) break + + const rawChunk = decoder.decode(value, { stream: true }) + const parsed = parseWorkflowSSEChunk(rawChunk) + + if (parsed.content) { + accumulatedContent += parsed.content + sendEvent('message', { + kind: 'message', + taskId, + contextId, + role: 'agent', + parts: [{ kind: 'text', text: parsed.content }], + final: false, + }) + } + + if (parsed.finalContent) { + finalContent = parsed.finalContent + } + } + + const messageContent = + (finalContent !== undefined && finalContent.length > 0 + ? finalContent + : accumulatedContent) || 'Task completed' + const agentMessage = createAgentMessage(messageContent) + agentMessage.taskId = taskId + if (contextId) agentMessage.contextId = contextId + history.push(agentMessage) + + await db + .update(a2aTask) + .set({ + status: 'completed', + messages: history, + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + + notifyTaskStateChange(taskId, 'completed').catch((err) => { + logger.error('Failed to trigger push notification', { taskId, error: err }) + }) + + sendEvent('task', { + kind: 'task', + id: taskId, + contextId, + status: { state: 'completed', timestamp: new Date().toISOString() }, + history, + artifacts: [], + }) + } else { + const result = await response.json() + + const content = extractAgentContent(result) + + sendEvent('message', { + kind: 'message', + taskId, + contextId, + role: 'agent', + parts: [{ kind: 'text', text: content }], + final: true, + }) + + const agentMessage = createAgentMessage(content) + agentMessage.taskId = taskId + if (contextId) agentMessage.contextId = contextId + history.push(agentMessage) + + const artifacts = (result.output?.artifacts as Artifact[]) || [] + + await db + .update(a2aTask) + .set({ + status: 'completed', + messages: history, + artifacts, + executionId: result.metadata?.executionId, + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + + notifyTaskStateChange(taskId, 'completed').catch((err) => { + logger.error('Failed to trigger push notification', { taskId, error: err }) + }) + + sendEvent('task', { + kind: 'task', + id: taskId, + contextId, + status: { state: 'completed', timestamp: new Date().toISOString() }, + history, + artifacts, + }) + } + } catch (error) { + const isTimeout = error instanceof Error && error.name === 'TimeoutError' + logger.error(`Streaming error for task ${taskId}:`, { error, isTimeout }) + + const errorMessage = isTimeout + ? `Workflow execution timed out after ${A2A_DEFAULT_TIMEOUT}ms` + : error instanceof Error + ? error.message + : 'Streaming failed' + + await db + .update(a2aTask) + .set({ + status: 'failed', + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + + notifyTaskStateChange(taskId, 'failed').catch((err) => { + logger.error('Failed to trigger push notification for failure', { taskId, error: err }) + }) + + sendEvent('error', { + code: A2A_ERROR_CODES.INTERNAL_ERROR, + message: errorMessage, + }) + } finally { + await releaseLock(lockKey, lockValue) + controller.close() + } + }, + }) + + return new NextResponse(stream, { + headers: { + ...SSE_HEADERS, + 'X-Task-Id': taskId, + }, + }) +} + +/** + * Handle tasks/get - Query task status + */ +async function handleTaskGet(id: string | number, params: TaskIdParams): Promise { + if (!params?.id) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Task ID is required'), + { status: 400 } + ) + } + + const historyLength = + params.historyLength !== undefined && params.historyLength >= 0 + ? params.historyLength + : undefined + + const [task] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + + if (!task) { + return NextResponse.json(createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + const taskResponse = buildTaskResponse({ + taskId: task.id, + contextId: task.sessionId || task.id, + state: task.status as TaskState, + history: task.messages as Message[], + artifacts: (task.artifacts as Artifact[]) || [], + }) + + const result = formatTaskResponse(taskResponse, historyLength) + + return NextResponse.json(createResponse(id, result)) +} + +/** + * Handle tasks/cancel - Cancel a running task + */ +async function handleTaskCancel(id: string | number, params: TaskIdParams): Promise { + if (!params?.id) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Task ID is required'), + { status: 400 } + ) + } + + const [task] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + + if (!task) { + return NextResponse.json(createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + if (isTerminalState(task.status as TaskState)) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.TASK_ALREADY_COMPLETE, 'Task already in terminal state'), + { status: 400 } + ) + } + + if (task.executionId) { + try { + await markExecutionCancelled(task.executionId) + logger.info('Cancelled workflow execution', { + taskId: task.id, + executionId: task.executionId, + }) + } catch (error) { + logger.warn('Failed to cancel workflow execution', { + taskId: task.id, + executionId: task.executionId, + error, + }) + } + } + + await db + .update(a2aTask) + .set({ + status: 'canceled', + updatedAt: new Date(), + completedAt: new Date(), + }) + .where(eq(a2aTask.id, params.id)) + + notifyTaskStateChange(params.id, 'canceled').catch((err) => { + logger.error('Failed to trigger push notification for cancellation', { + taskId: params.id, + error: err, + }) + }) + + const canceledTask = buildTaskResponse({ + taskId: task.id, + contextId: task.sessionId || task.id, + state: 'canceled', + history: task.messages as Message[], + artifacts: (task.artifacts as Artifact[]) || [], + }) + + return NextResponse.json(createResponse(id, canceledTask)) +} + +/** + * Handle tasks/resubscribe - Reconnect to SSE stream for an ongoing task + */ +async function handleTaskResubscribe( + request: NextRequest, + id: string | number, + params: TaskIdParams +): Promise { + if (!params?.id) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Task ID is required'), + { status: 400 } + ) + } + + const [task] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + + if (!task) { + return NextResponse.json(createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + const encoder = new TextEncoder() + + if (isTerminalState(task.status as TaskState)) { + const completedTask = buildTaskResponse({ + taskId: task.id, + contextId: task.sessionId || task.id, + state: task.status as TaskState, + history: task.messages as Message[], + artifacts: (task.artifacts as Artifact[]) || [], + }) + const jsonRpcResponse = { jsonrpc: '2.0' as const, id, result: completedTask } + const sseData = `event: task\ndata: ${JSON.stringify(jsonRpcResponse)}\n\n` + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(sseData)) + controller.close() + }, + }) + return new NextResponse(stream, { headers: SSE_HEADERS }) + } + let isCancelled = false + let pollTimeoutId: ReturnType | null = null + + const abortSignal = request.signal + abortSignal.addEventListener('abort', () => { + isCancelled = true + if (pollTimeoutId) { + clearTimeout(pollTimeoutId) + pollTimeoutId = null + } + }) + + const stream = new ReadableStream({ + async start(controller) { + const sendEvent = (event: string, data: unknown): boolean => { + if (isCancelled || abortSignal.aborted) return false + try { + const jsonRpcResponse = { jsonrpc: '2.0' as const, id, result: data } + controller.enqueue( + encoder.encode(`event: ${event}\ndata: ${JSON.stringify(jsonRpcResponse)}\n\n`) + ) + return true + } catch (error) { + logger.error('Error sending SSE event:', error) + isCancelled = true + return false + } + } + + const cleanup = () => { + isCancelled = true + if (pollTimeoutId) { + clearTimeout(pollTimeoutId) + pollTimeoutId = null + } + } + + if ( + !sendEvent('status', { + kind: 'status', + taskId: task.id, + contextId: task.sessionId, + status: { state: task.status, timestamp: new Date().toISOString() }, + }) + ) { + cleanup() + return + } + + const pollInterval = 3000 // 3 seconds + const maxPolls = 100 // 5 minutes max + + let polls = 0 + const poll = async () => { + if (isCancelled || abortSignal.aborted) { + cleanup() + return + } + + polls++ + if (polls > maxPolls) { + cleanup() + try { + controller.close() + } catch { + // Already closed + } + return + } + + try { + const [updatedTask] = await db + .select() + .from(a2aTask) + .where(eq(a2aTask.id, params.id)) + .limit(1) + + if (isCancelled) { + cleanup() + return + } + + if (!updatedTask) { + sendEvent('error', { code: A2A_ERROR_CODES.TASK_NOT_FOUND, message: 'Task not found' }) + cleanup() + try { + controller.close() + } catch { + // Already closed + } + return + } + + if (updatedTask.status !== task.status) { + if ( + !sendEvent('status', { + kind: 'status', + taskId: updatedTask.id, + contextId: updatedTask.sessionId, + status: { state: updatedTask.status, timestamp: new Date().toISOString() }, + final: isTerminalState(updatedTask.status as TaskState), + }) + ) { + cleanup() + return + } + } + + if (isTerminalState(updatedTask.status as TaskState)) { + const messages = updatedTask.messages as Message[] + const lastMessage = messages[messages.length - 1] + if (lastMessage && lastMessage.role === 'agent') { + sendEvent('message', { + ...lastMessage, + taskId: updatedTask.id, + contextId: updatedTask.sessionId || updatedTask.id, + final: true, + }) + } + + cleanup() + try { + controller.close() + } catch { + // Already closed + } + return + } + + pollTimeoutId = setTimeout(poll, pollInterval) + } catch (error) { + logger.error('Error during SSE poll:', error) + sendEvent('error', { + code: A2A_ERROR_CODES.INTERNAL_ERROR, + message: error instanceof Error ? error.message : 'Polling failed', + }) + cleanup() + try { + controller.close() + } catch { + // Already closed + } + } + } + + poll() + }, + cancel() { + isCancelled = true + if (pollTimeoutId) { + clearTimeout(pollTimeoutId) + pollTimeoutId = null + } + }, + }) + + return new NextResponse(stream, { + headers: { + ...SSE_HEADERS, + 'X-Task-Id': params.id, + }, + }) +} + +/** + * Handle tasks/pushNotificationConfig/set - Set webhook for task updates + */ +async function handlePushNotificationSet( + id: string | number, + params: PushNotificationSetParams +): Promise { + if (!params?.id) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Task ID is required'), + { status: 400 } + ) + } + + if (!params?.pushNotificationConfig?.url) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Push notification URL is required'), + { status: 400 } + ) + } + + try { + const url = new URL(params.pushNotificationConfig.url) + if (url.protocol !== 'https:') { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Push notification URL must use HTTPS'), + { status: 400 } + ) + } + } catch { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Invalid push notification URL'), + { status: 400 } + ) + } + + const [task] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + + if (!task) { + return NextResponse.json(createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + const [existingConfig] = await db + .select() + .from(a2aPushNotificationConfig) + .where(eq(a2aPushNotificationConfig.taskId, params.id)) + .limit(1) + + const config = params.pushNotificationConfig + + if (existingConfig) { + await db + .update(a2aPushNotificationConfig) + .set({ + url: config.url, + token: config.token || null, + isActive: true, + updatedAt: new Date(), + }) + .where(eq(a2aPushNotificationConfig.id, existingConfig.id)) + } else { + await db.insert(a2aPushNotificationConfig).values({ + id: uuidv4(), + taskId: params.id, + url: config.url, + token: config.token || null, + isActive: true, + createdAt: new Date(), + updatedAt: new Date(), + }) + } + + const result: PushNotificationConfig = { + url: config.url, + token: config.token, + } + + return NextResponse.json(createResponse(id, result)) +} + +/** + * Handle tasks/pushNotificationConfig/get - Get webhook config for a task + */ +async function handlePushNotificationGet( + id: string | number, + params: TaskIdParams +): Promise { + if (!params?.id) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Task ID is required'), + { status: 400 } + ) + } + + const [task] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + + if (!task) { + return NextResponse.json(createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + const [config] = await db + .select() + .from(a2aPushNotificationConfig) + .where(eq(a2aPushNotificationConfig.taskId, params.id)) + .limit(1) + + if (!config) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Push notification config not found'), + { status: 404 } + ) + } + + const result: PushNotificationConfig = { + url: config.url, + token: config.token || undefined, + } + + return NextResponse.json(createResponse(id, result)) +} + +/** + * Handle tasks/pushNotificationConfig/delete - Delete webhook config for a task + */ +async function handlePushNotificationDelete( + id: string | number, + params: TaskIdParams +): Promise { + if (!params?.id) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Task ID is required'), + { status: 400 } + ) + } + + const [task] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + + if (!task) { + return NextResponse.json(createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + const [config] = await db + .select() + .from(a2aPushNotificationConfig) + .where(eq(a2aPushNotificationConfig.taskId, params.id)) + .limit(1) + + if (!config) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Push notification config not found'), + { status: 404 } + ) + } + + await db.delete(a2aPushNotificationConfig).where(eq(a2aPushNotificationConfig.id, config.id)) + + return NextResponse.json(createResponse(id, { success: true })) +} diff --git a/apps/sim/app/api/a2a/serve/[agentId]/utils.ts b/apps/sim/app/api/a2a/serve/[agentId]/utils.ts new file mode 100644 index 0000000000..f157d1efb3 --- /dev/null +++ b/apps/sim/app/api/a2a/serve/[agentId]/utils.ts @@ -0,0 +1,176 @@ +import type { Artifact, Message, PushNotificationConfig, Task, TaskState } from '@a2a-js/sdk' +import { v4 as uuidv4 } from 'uuid' +import { generateInternalToken } from '@/lib/auth/internal' +import { getBaseUrl } from '@/lib/core/utils/urls' + +/** A2A v0.3 JSON-RPC method names */ +export const A2A_METHODS = { + MESSAGE_SEND: 'message/send', + MESSAGE_STREAM: 'message/stream', + TASKS_GET: 'tasks/get', + TASKS_CANCEL: 'tasks/cancel', + TASKS_RESUBSCRIBE: 'tasks/resubscribe', + PUSH_NOTIFICATION_SET: 'tasks/pushNotificationConfig/set', + PUSH_NOTIFICATION_GET: 'tasks/pushNotificationConfig/get', + PUSH_NOTIFICATION_DELETE: 'tasks/pushNotificationConfig/delete', +} as const + +/** A2A v0.3 error codes */ +export const A2A_ERROR_CODES = { + PARSE_ERROR: -32700, + INVALID_REQUEST: -32600, + METHOD_NOT_FOUND: -32601, + INVALID_PARAMS: -32602, + INTERNAL_ERROR: -32603, + TASK_NOT_FOUND: -32001, + TASK_ALREADY_COMPLETE: -32002, + AGENT_UNAVAILABLE: -32003, + AUTHENTICATION_REQUIRED: -32004, +} as const + +export interface JSONRPCRequest { + jsonrpc: '2.0' + id: string | number + method: string + params?: unknown +} + +export interface JSONRPCResponse { + jsonrpc: '2.0' + id: string | number | null + result?: unknown + error?: { + code: number + message: string + data?: unknown + } +} + +export interface MessageSendParams { + message: Message + configuration?: { + acceptedOutputModes?: string[] + historyLength?: number + pushNotificationConfig?: PushNotificationConfig + } +} + +export interface TaskIdParams { + id: string + historyLength?: number +} + +export interface PushNotificationSetParams { + id: string + pushNotificationConfig: PushNotificationConfig +} + +export function createResponse(id: string | number | null, result: unknown): JSONRPCResponse { + return { jsonrpc: '2.0', id, result } +} + +export function createError( + id: string | number | null, + code: number, + message: string, + data?: unknown +): JSONRPCResponse { + return { jsonrpc: '2.0', id, error: { code, message, data } } +} + +export function isJSONRPCRequest(obj: unknown): obj is JSONRPCRequest { + if (!obj || typeof obj !== 'object') return false + const r = obj as Record + return r.jsonrpc === '2.0' && typeof r.method === 'string' && r.id !== undefined +} + +export function generateTaskId(): string { + return uuidv4() +} + +export function createTaskStatus(state: TaskState): { state: TaskState; timestamp: string } { + return { state, timestamp: new Date().toISOString() } +} + +export function formatTaskResponse(task: Task, historyLength?: number): Task { + if (historyLength !== undefined && task.history) { + return { + ...task, + history: task.history.slice(-historyLength), + } + } + return task +} + +export interface ExecuteRequestConfig { + workflowId: string + apiKey?: string | null + stream?: boolean +} + +export interface ExecuteRequestResult { + url: string + headers: Record + useInternalAuth: boolean +} + +export async function buildExecuteRequest( + config: ExecuteRequestConfig +): Promise { + const url = `${getBaseUrl()}/api/workflows/${config.workflowId}/execute` + const headers: Record = { 'Content-Type': 'application/json' } + let useInternalAuth = false + + if (config.apiKey) { + headers['X-API-Key'] = config.apiKey + } else { + const internalToken = await generateInternalToken() + headers.Authorization = `Bearer ${internalToken}` + useInternalAuth = true + } + + if (config.stream) { + headers['X-Stream-Response'] = 'true' + } + + return { url, headers, useInternalAuth } +} + +export function extractAgentContent(executeResult: { + output?: { content?: string; [key: string]: unknown } + error?: string +}): string { + // Prefer explicit content field + if (executeResult.output?.content) { + return executeResult.output.content + } + + // If output is an object with meaningful data, stringify it + if (typeof executeResult.output === 'object' && executeResult.output !== null) { + const keys = Object.keys(executeResult.output) + // Skip empty objects or objects with only undefined values + if (keys.length > 0 && keys.some((k) => executeResult.output![k] !== undefined)) { + return JSON.stringify(executeResult.output) + } + } + + // Fallback to error message or default + return executeResult.error || 'Task completed' +} + +export function buildTaskResponse(params: { + taskId: string + contextId: string + state: TaskState + history: Message[] + artifacts?: Artifact[] +}): Task { + return { + kind: 'task', + id: params.taskId, + contextId: params.contextId, + status: createTaskStatus(params.state), + history: params.history, + artifacts: params.artifacts || [], + } +} diff --git a/apps/sim/app/api/memory/[id]/route.ts b/apps/sim/app/api/memory/[id]/route.ts index 617979ef16..2f5b5ae1cc 100644 --- a/apps/sim/app/api/memory/[id]/route.ts +++ b/apps/sim/app/api/memory/[id]/route.ts @@ -1,11 +1,12 @@ import { db } from '@sim/db' -import { memory, permissions, workspace } from '@sim/db/schema' +import { memory } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { checkHybridAuth } from '@/lib/auth/hybrid' import { generateRequestId } from '@/lib/core/utils/request' +import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils' const logger = createLogger('MemoryByIdAPI') @@ -29,46 +30,6 @@ const memoryPutBodySchema = z.object({ workspaceId: z.string().uuid('Invalid workspace ID format'), }) -async function checkWorkspaceAccess( - workspaceId: string, - userId: string -): Promise<{ hasAccess: boolean; canWrite: boolean }> { - const [workspaceRow] = await db - .select({ ownerId: workspace.ownerId }) - .from(workspace) - .where(eq(workspace.id, workspaceId)) - .limit(1) - - if (!workspaceRow) { - return { hasAccess: false, canWrite: false } - } - - if (workspaceRow.ownerId === userId) { - return { hasAccess: true, canWrite: true } - } - - const [permissionRow] = await db - .select({ permissionType: permissions.permissionType }) - .from(permissions) - .where( - and( - eq(permissions.userId, userId), - eq(permissions.entityType, 'workspace'), - eq(permissions.entityId, workspaceId) - ) - ) - .limit(1) - - if (!permissionRow) { - return { hasAccess: false, canWrite: false } - } - - return { - hasAccess: true, - canWrite: permissionRow.permissionType === 'write' || permissionRow.permissionType === 'admin', - } -} - async function validateMemoryAccess( request: NextRequest, workspaceId: string, @@ -86,8 +47,8 @@ async function validateMemoryAccess( } } - const { hasAccess, canWrite } = await checkWorkspaceAccess(workspaceId, authResult.userId) - if (!hasAccess) { + const access = await checkWorkspaceAccess(workspaceId, authResult.userId) + if (!access.exists || !access.hasAccess) { return { error: NextResponse.json( { success: false, error: { message: 'Workspace not found' } }, @@ -96,7 +57,7 @@ async function validateMemoryAccess( } } - if (action === 'write' && !canWrite) { + if (action === 'write' && !access.canWrite) { return { error: NextResponse.json( { success: false, error: { message: 'Write access denied' } }, diff --git a/apps/sim/app/api/memory/route.ts b/apps/sim/app/api/memory/route.ts index fe159b9664..072756c7a6 100644 --- a/apps/sim/app/api/memory/route.ts +++ b/apps/sim/app/api/memory/route.ts @@ -1,56 +1,17 @@ import { db } from '@sim/db' -import { memory, permissions, workspace } from '@sim/db/schema' +import { memory } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq, isNull, like } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { checkHybridAuth } from '@/lib/auth/hybrid' import { generateRequestId } from '@/lib/core/utils/request' +import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils' const logger = createLogger('MemoryAPI') export const dynamic = 'force-dynamic' export const runtime = 'nodejs' -async function checkWorkspaceAccess( - workspaceId: string, - userId: string -): Promise<{ hasAccess: boolean; canWrite: boolean }> { - const [workspaceRow] = await db - .select({ ownerId: workspace.ownerId }) - .from(workspace) - .where(eq(workspace.id, workspaceId)) - .limit(1) - - if (!workspaceRow) { - return { hasAccess: false, canWrite: false } - } - - if (workspaceRow.ownerId === userId) { - return { hasAccess: true, canWrite: true } - } - - const [permissionRow] = await db - .select({ permissionType: permissions.permissionType }) - .from(permissions) - .where( - and( - eq(permissions.userId, userId), - eq(permissions.entityType, 'workspace'), - eq(permissions.entityId, workspaceId) - ) - ) - .limit(1) - - if (!permissionRow) { - return { hasAccess: false, canWrite: false } - } - - return { - hasAccess: true, - canWrite: permissionRow.permissionType === 'write' || permissionRow.permissionType === 'admin', - } -} - export async function GET(request: NextRequest) { const requestId = generateRequestId() @@ -76,8 +37,14 @@ export async function GET(request: NextRequest) { ) } - const { hasAccess } = await checkWorkspaceAccess(workspaceId, authResult.userId) - if (!hasAccess) { + const access = await checkWorkspaceAccess(workspaceId, authResult.userId) + if (!access.exists) { + return NextResponse.json( + { success: false, error: { message: 'Workspace not found' } }, + { status: 404 } + ) + } + if (!access.hasAccess) { return NextResponse.json( { success: false, error: { message: 'Access denied to this workspace' } }, { status: 403 } @@ -155,15 +122,21 @@ export async function POST(request: NextRequest) { ) } - const { hasAccess, canWrite } = await checkWorkspaceAccess(workspaceId, authResult.userId) - if (!hasAccess) { + const access = await checkWorkspaceAccess(workspaceId, authResult.userId) + if (!access.exists) { return NextResponse.json( { success: false, error: { message: 'Workspace not found' } }, { status: 404 } ) } + if (!access.hasAccess) { + return NextResponse.json( + { success: false, error: { message: 'Access denied to this workspace' } }, + { status: 403 } + ) + } - if (!canWrite) { + if (!access.canWrite) { return NextResponse.json( { success: false, error: { message: 'Write access denied to this workspace' } }, { status: 403 } @@ -282,15 +255,21 @@ export async function DELETE(request: NextRequest) { ) } - const { hasAccess, canWrite } = await checkWorkspaceAccess(workspaceId, authResult.userId) - if (!hasAccess) { + const access = await checkWorkspaceAccess(workspaceId, authResult.userId) + if (!access.exists) { return NextResponse.json( { success: false, error: { message: 'Workspace not found' } }, { status: 404 } ) } + if (!access.hasAccess) { + return NextResponse.json( + { success: false, error: { message: 'Access denied to this workspace' } }, + { status: 403 } + ) + } - if (!canWrite) { + if (!access.canWrite) { return NextResponse.json( { success: false, error: { message: 'Write access denied to this workspace' } }, { status: 403 } diff --git a/apps/sim/app/api/tools/a2a/cancel-task/route.ts b/apps/sim/app/api/tools/a2a/cancel-task/route.ts new file mode 100644 index 0000000000..9298273cee --- /dev/null +++ b/apps/sim/app/api/tools/a2a/cancel-task/route.ts @@ -0,0 +1,84 @@ +import type { Task } from '@a2a-js/sdk' +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +const logger = createLogger('A2ACancelTaskAPI') + +export const dynamic = 'force-dynamic' + +const A2ACancelTaskSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + taskId: z.string().min(1, 'Task ID is required'), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn(`[${requestId}] Unauthorized A2A cancel task attempt`) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + const body = await request.json() + const validatedData = A2ACancelTaskSchema.parse(body) + + logger.info(`[${requestId}] Canceling A2A task`, { + agentUrl: validatedData.agentUrl, + taskId: validatedData.taskId, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const task = (await client.cancelTask({ id: validatedData.taskId })) as Task + + logger.info(`[${requestId}] Successfully canceled A2A task`, { + taskId: validatedData.taskId, + state: task.status.state, + }) + + return NextResponse.json({ + success: true, + output: { + cancelled: true, + state: task.status.state, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid A2A cancel task request`, { + errors: error.errors, + }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error canceling A2A task:`, error) + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Failed to cancel task', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/delete-push-notification/route.ts b/apps/sim/app/api/tools/a2a/delete-push-notification/route.ts new file mode 100644 index 0000000000..f222ef8830 --- /dev/null +++ b/apps/sim/app/api/tools/a2a/delete-push-notification/route.ts @@ -0,0 +1,94 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +export const dynamic = 'force-dynamic' + +const logger = createLogger('A2ADeletePushNotificationAPI') + +const A2ADeletePushNotificationSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + taskId: z.string().min(1, 'Task ID is required'), + pushNotificationConfigId: z.string().optional(), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn( + `[${requestId}] Unauthorized A2A delete push notification attempt: ${authResult.error}` + ) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + logger.info( + `[${requestId}] Authenticated A2A delete push notification request via ${authResult.authType}`, + { + userId: authResult.userId, + } + ) + + const body = await request.json() + const validatedData = A2ADeletePushNotificationSchema.parse(body) + + logger.info(`[${requestId}] Deleting A2A push notification config`, { + agentUrl: validatedData.agentUrl, + taskId: validatedData.taskId, + pushNotificationConfigId: validatedData.pushNotificationConfigId, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + await client.deleteTaskPushNotificationConfig({ + id: validatedData.taskId, + pushNotificationConfigId: validatedData.pushNotificationConfigId || validatedData.taskId, + }) + + logger.info(`[${requestId}] Push notification config deleted successfully`, { + taskId: validatedData.taskId, + }) + + return NextResponse.json({ + success: true, + output: { + success: true, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid request data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error deleting A2A push notification:`, error) + + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Failed to delete push notification', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/get-agent-card/route.ts b/apps/sim/app/api/tools/a2a/get-agent-card/route.ts new file mode 100644 index 0000000000..c26ed764b6 --- /dev/null +++ b/apps/sim/app/api/tools/a2a/get-agent-card/route.ts @@ -0,0 +1,92 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +export const dynamic = 'force-dynamic' + +const logger = createLogger('A2AGetAgentCardAPI') + +const A2AGetAgentCardSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn(`[${requestId}] Unauthorized A2A get agent card attempt: ${authResult.error}`) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + logger.info( + `[${requestId}] Authenticated A2A get agent card request via ${authResult.authType}`, + { + userId: authResult.userId, + } + ) + + const body = await request.json() + const validatedData = A2AGetAgentCardSchema.parse(body) + + logger.info(`[${requestId}] Fetching Agent Card`, { + agentUrl: validatedData.agentUrl, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const agentCard = await client.getAgentCard() + + logger.info(`[${requestId}] Agent Card fetched successfully`, { + agentName: agentCard.name, + }) + + return NextResponse.json({ + success: true, + output: { + name: agentCard.name, + description: agentCard.description, + url: agentCard.url, + version: agentCard.protocolVersion, + capabilities: agentCard.capabilities, + skills: agentCard.skills, + defaultInputModes: agentCard.defaultInputModes, + defaultOutputModes: agentCard.defaultOutputModes, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid request data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error fetching Agent Card:`, error) + + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Failed to fetch Agent Card', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/get-push-notification/route.ts b/apps/sim/app/api/tools/a2a/get-push-notification/route.ts new file mode 100644 index 0000000000..5feedf4de1 --- /dev/null +++ b/apps/sim/app/api/tools/a2a/get-push-notification/route.ts @@ -0,0 +1,115 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +export const dynamic = 'force-dynamic' + +const logger = createLogger('A2AGetPushNotificationAPI') + +const A2AGetPushNotificationSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + taskId: z.string().min(1, 'Task ID is required'), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn( + `[${requestId}] Unauthorized A2A get push notification attempt: ${authResult.error}` + ) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + logger.info( + `[${requestId}] Authenticated A2A get push notification request via ${authResult.authType}`, + { + userId: authResult.userId, + } + ) + + const body = await request.json() + const validatedData = A2AGetPushNotificationSchema.parse(body) + + logger.info(`[${requestId}] Getting push notification config`, { + agentUrl: validatedData.agentUrl, + taskId: validatedData.taskId, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const result = await client.getTaskPushNotificationConfig({ + id: validatedData.taskId, + }) + + if (!result || !result.pushNotificationConfig) { + logger.info(`[${requestId}] No push notification config found for task`, { + taskId: validatedData.taskId, + }) + return NextResponse.json({ + success: true, + output: { + exists: false, + }, + }) + } + + logger.info(`[${requestId}] Push notification config retrieved successfully`, { + taskId: validatedData.taskId, + }) + + return NextResponse.json({ + success: true, + output: { + url: result.pushNotificationConfig.url, + token: result.pushNotificationConfig.token, + exists: true, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid request data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + if (error instanceof Error && error.message.includes('not found')) { + logger.info(`[${requestId}] Task not found, returning exists: false`) + return NextResponse.json({ + success: true, + output: { + exists: false, + }, + }) + } + + logger.error(`[${requestId}] Error getting A2A push notification:`, error) + + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Failed to get push notification', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/get-task/route.ts b/apps/sim/app/api/tools/a2a/get-task/route.ts new file mode 100644 index 0000000000..35aa5e278d --- /dev/null +++ b/apps/sim/app/api/tools/a2a/get-task/route.ts @@ -0,0 +1,95 @@ +import type { Task } from '@a2a-js/sdk' +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +export const dynamic = 'force-dynamic' + +const logger = createLogger('A2AGetTaskAPI') + +const A2AGetTaskSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + taskId: z.string().min(1, 'Task ID is required'), + apiKey: z.string().optional(), + historyLength: z.number().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn(`[${requestId}] Unauthorized A2A get task attempt: ${authResult.error}`) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + logger.info(`[${requestId}] Authenticated A2A get task request via ${authResult.authType}`, { + userId: authResult.userId, + }) + + const body = await request.json() + const validatedData = A2AGetTaskSchema.parse(body) + + logger.info(`[${requestId}] Getting A2A task`, { + agentUrl: validatedData.agentUrl, + taskId: validatedData.taskId, + historyLength: validatedData.historyLength, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const task = (await client.getTask({ + id: validatedData.taskId, + historyLength: validatedData.historyLength, + })) as Task + + logger.info(`[${requestId}] Successfully retrieved A2A task`, { + taskId: task.id, + state: task.status.state, + }) + + return NextResponse.json({ + success: true, + output: { + taskId: task.id, + contextId: task.contextId, + state: task.status.state, + artifacts: task.artifacts, + history: task.history, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid request data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error getting A2A task:`, error) + + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Failed to get task', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/resubscribe/route.ts b/apps/sim/app/api/tools/a2a/resubscribe/route.ts new file mode 100644 index 0000000000..75c0d24aec --- /dev/null +++ b/apps/sim/app/api/tools/a2a/resubscribe/route.ts @@ -0,0 +1,119 @@ +import type { + Artifact, + Message, + Task, + TaskArtifactUpdateEvent, + TaskState, + TaskStatusUpdateEvent, +} from '@a2a-js/sdk' +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient, extractTextContent, isTerminalState } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +const logger = createLogger('A2AResubscribeAPI') + +export const dynamic = 'force-dynamic' + +const A2AResubscribeSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + taskId: z.string().min(1, 'Task ID is required'), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn(`[${requestId}] Unauthorized A2A resubscribe attempt`) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + const body = await request.json() + const validatedData = A2AResubscribeSchema.parse(body) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const stream = client.resubscribeTask({ id: validatedData.taskId }) + + let taskId = validatedData.taskId + let contextId: string | undefined + let state: TaskState = 'working' + let content = '' + let artifacts: Artifact[] = [] + let history: Message[] = [] + + for await (const event of stream) { + if (event.kind === 'message') { + const msg = event as Message + content = extractTextContent(msg) + taskId = msg.taskId || taskId + contextId = msg.contextId || contextId + state = 'completed' + } else if (event.kind === 'task') { + const task = event as Task + taskId = task.id + contextId = task.contextId + state = task.status.state + artifacts = task.artifacts || [] + history = task.history || [] + const lastAgentMessage = history.filter((m) => m.role === 'agent').pop() + if (lastAgentMessage) { + content = extractTextContent(lastAgentMessage) + } + } else if ('status' in event) { + const statusEvent = event as TaskStatusUpdateEvent + state = statusEvent.status.state + } else if ('artifact' in event) { + const artifactEvent = event as TaskArtifactUpdateEvent + artifacts.push(artifactEvent.artifact) + } + } + + logger.info(`[${requestId}] Successfully resubscribed to A2A task ${taskId}`) + + return NextResponse.json({ + success: true, + output: { + taskId, + contextId, + state, + isRunning: !isTerminalState(state), + artifacts, + history, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid A2A resubscribe data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error resubscribing to A2A task:`, error) + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Failed to resubscribe', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/send-message-stream/route.ts b/apps/sim/app/api/tools/a2a/send-message-stream/route.ts new file mode 100644 index 0000000000..e30689a801 --- /dev/null +++ b/apps/sim/app/api/tools/a2a/send-message-stream/route.ts @@ -0,0 +1,150 @@ +import type { + Artifact, + Message, + Task, + TaskArtifactUpdateEvent, + TaskState, + TaskStatusUpdateEvent, +} from '@a2a-js/sdk' +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient, extractTextContent, isTerminalState } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +export const dynamic = 'force-dynamic' + +const logger = createLogger('A2ASendMessageStreamAPI') + +const A2ASendMessageStreamSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + message: z.string().min(1, 'Message is required'), + taskId: z.string().optional(), + contextId: z.string().optional(), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn( + `[${requestId}] Unauthorized A2A send message stream attempt: ${authResult.error}` + ) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + logger.info( + `[${requestId}] Authenticated A2A send message stream request via ${authResult.authType}`, + { + userId: authResult.userId, + } + ) + + const body = await request.json() + const validatedData = A2ASendMessageStreamSchema.parse(body) + + logger.info(`[${requestId}] Sending A2A streaming message`, { + agentUrl: validatedData.agentUrl, + hasTaskId: !!validatedData.taskId, + hasContextId: !!validatedData.contextId, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const message: Message = { + kind: 'message', + messageId: crypto.randomUUID(), + role: 'user', + parts: [{ kind: 'text', text: validatedData.message }], + ...(validatedData.taskId && { taskId: validatedData.taskId }), + ...(validatedData.contextId && { contextId: validatedData.contextId }), + } + + const stream = client.sendMessageStream({ message }) + + let taskId = '' + let contextId: string | undefined + let state: TaskState = 'working' + let content = '' + let artifacts: Artifact[] = [] + let history: Message[] = [] + + for await (const event of stream) { + if (event.kind === 'message') { + const msg = event as Message + content = extractTextContent(msg) + taskId = msg.taskId || taskId + contextId = msg.contextId || contextId + state = 'completed' + } else if (event.kind === 'task') { + const task = event as Task + taskId = task.id + contextId = task.contextId + state = task.status.state + artifacts = task.artifacts || [] + history = task.history || [] + const lastAgentMessage = history.filter((m) => m.role === 'agent').pop() + if (lastAgentMessage) { + content = extractTextContent(lastAgentMessage) + } + } else if ('status' in event) { + const statusEvent = event as TaskStatusUpdateEvent + state = statusEvent.status.state + } else if ('artifact' in event) { + const artifactEvent = event as TaskArtifactUpdateEvent + artifacts.push(artifactEvent.artifact) + } + } + + logger.info(`[${requestId}] A2A streaming message completed`, { + taskId, + state, + artifactCount: artifacts.length, + }) + + return NextResponse.json({ + success: isTerminalState(state) && state !== 'failed', + output: { + content, + taskId, + contextId, + state, + artifacts, + history, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid request data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error in A2A streaming:`, error) + + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Streaming failed', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/send-message/route.ts b/apps/sim/app/api/tools/a2a/send-message/route.ts new file mode 100644 index 0000000000..4d52fc710c --- /dev/null +++ b/apps/sim/app/api/tools/a2a/send-message/route.ts @@ -0,0 +1,126 @@ +import type { Message, Task } from '@a2a-js/sdk' +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient, extractTextContent, isTerminalState } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +export const dynamic = 'force-dynamic' + +const logger = createLogger('A2ASendMessageAPI') + +const A2ASendMessageSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + message: z.string().min(1, 'Message is required'), + taskId: z.string().optional(), + contextId: z.string().optional(), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn(`[${requestId}] Unauthorized A2A send message attempt: ${authResult.error}`) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + logger.info( + `[${requestId}] Authenticated A2A send message request via ${authResult.authType}`, + { + userId: authResult.userId, + } + ) + + const body = await request.json() + const validatedData = A2ASendMessageSchema.parse(body) + + logger.info(`[${requestId}] Sending A2A message`, { + agentUrl: validatedData.agentUrl, + hasTaskId: !!validatedData.taskId, + hasContextId: !!validatedData.contextId, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const message: Message = { + kind: 'message', + messageId: crypto.randomUUID(), + role: 'user', + parts: [{ kind: 'text', text: validatedData.message }], + ...(validatedData.taskId && { taskId: validatedData.taskId }), + ...(validatedData.contextId && { contextId: validatedData.contextId }), + } + + const result = await client.sendMessage({ message }) + + if (result.kind === 'message') { + const responseMessage = result as Message + + logger.info(`[${requestId}] A2A message sent successfully (message response)`) + + return NextResponse.json({ + success: true, + output: { + content: extractTextContent(responseMessage), + taskId: responseMessage.taskId || '', + contextId: responseMessage.contextId, + state: 'completed', + }, + }) + } + + const task = result as Task + const lastAgentMessage = task.history?.filter((m) => m.role === 'agent').pop() + const content = lastAgentMessage ? extractTextContent(lastAgentMessage) : '' + + logger.info(`[${requestId}] A2A message sent successfully (task response)`, { + taskId: task.id, + state: task.status.state, + }) + + return NextResponse.json({ + success: isTerminalState(task.status.state) && task.status.state !== 'failed', + output: { + content, + taskId: task.id, + contextId: task.contextId, + state: task.status.state, + artifacts: task.artifacts, + history: task.history, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid request data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error sending A2A message:`, error) + + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Internal server error', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/set-push-notification/route.ts b/apps/sim/app/api/tools/a2a/set-push-notification/route.ts new file mode 100644 index 0000000000..d407609418 --- /dev/null +++ b/apps/sim/app/api/tools/a2a/set-push-notification/route.ts @@ -0,0 +1,93 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +export const dynamic = 'force-dynamic' + +const logger = createLogger('A2ASetPushNotificationAPI') + +const A2ASetPushNotificationSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + taskId: z.string().min(1, 'Task ID is required'), + webhookUrl: z.string().min(1, 'Webhook URL is required'), + token: z.string().optional(), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn(`[${requestId}] Unauthorized A2A set push notification attempt`, { + error: authResult.error || 'Authentication required', + }) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + const body = await request.json() + const validatedData = A2ASetPushNotificationSchema.parse(body) + + logger.info(`[${requestId}] A2A set push notification request`, { + agentUrl: validatedData.agentUrl, + taskId: validatedData.taskId, + webhookUrl: validatedData.webhookUrl, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const result = await client.setTaskPushNotificationConfig({ + taskId: validatedData.taskId, + pushNotificationConfig: { + url: validatedData.webhookUrl, + token: validatedData.token, + }, + }) + + logger.info(`[${requestId}] A2A set push notification successful`, { + taskId: validatedData.taskId, + }) + + return NextResponse.json({ + success: true, + output: { + url: result.pushNotificationConfig.url, + token: result.pushNotificationConfig.token, + success: true, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid request data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error setting A2A push notification:`, error) + + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Failed to set push notification', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index ca35437e5a..e045e6eaba 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -215,10 +215,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: workflowStateOverride, } = validation.data - // For API key auth, the entire body is the input (except for our control fields) + // For API key and internal JWT auth, the entire body is the input (except for our control fields) // For session auth, the input is explicitly provided in the input field const input = - auth.authType === 'api_key' + auth.authType === 'api_key' || auth.authType === 'internal_jwt' ? (() => { const { selectedOutputs, @@ -226,6 +226,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: stream, useDraftState, workflowStateOverride, + workflowId: _workflowId, // Also exclude workflowId used for internal JWT auth ...rest } = body return Object.keys(rest).length > 0 ? rest : validatedInput diff --git a/apps/sim/app/api/workflows/route.ts b/apps/sim/app/api/workflows/route.ts index 7c905ab7e6..81d4c885b9 100644 --- a/apps/sim/app/api/workflows/route.ts +++ b/apps/sim/app/api/workflows/route.ts @@ -1,12 +1,12 @@ import { db } from '@sim/db' -import { workflow, workspace } from '@sim/db/schema' +import { workflow } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { getSession } from '@/lib/auth' import { generateRequestId } from '@/lib/core/utils/request' -import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' +import { getUserEntityPermissions, workspaceExists } from '@/lib/workspaces/permissions/utils' import { verifyWorkspaceMembership } from '@/app/api/workflows/utils' const logger = createLogger('WorkflowAPI') @@ -36,13 +36,9 @@ export async function GET(request: Request) { const userId = session.user.id if (workspaceId) { - const workspaceExists = await db - .select({ id: workspace.id }) - .from(workspace) - .where(eq(workspace.id, workspaceId)) - .then((rows) => rows.length > 0) + const wsExists = await workspaceExists(workspaceId) - if (!workspaceExists) { + if (!wsExists) { logger.warn( `[${requestId}] Attempt to fetch workflows for non-existent workspace: ${workspaceId}` ) diff --git a/apps/sim/app/api/workspaces/[id]/api-keys/route.ts b/apps/sim/app/api/workspaces/[id]/api-keys/route.ts index 1232272366..c649972140 100644 --- a/apps/sim/app/api/workspaces/[id]/api-keys/route.ts +++ b/apps/sim/app/api/workspaces/[id]/api-keys/route.ts @@ -1,5 +1,5 @@ import { db } from '@sim/db' -import { apiKey, workspace } from '@sim/db/schema' +import { apiKey } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq, inArray } from 'drizzle-orm' import { nanoid } from 'nanoid' @@ -9,7 +9,7 @@ import { createApiKey, getApiKeyDisplayFormat } from '@/lib/api-key/auth' import { getSession } from '@/lib/auth' import { PlatformEvents } from '@/lib/core/telemetry' import { generateRequestId } from '@/lib/core/utils/request' -import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' +import { getUserEntityPermissions, getWorkspaceById } from '@/lib/workspaces/permissions/utils' const logger = createLogger('WorkspaceApiKeysAPI') @@ -34,8 +34,8 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ const userId = session.user.id - const ws = await db.select().from(workspace).where(eq(workspace.id, workspaceId)).limit(1) - if (!ws.length) { + const ws = await getWorkspaceById(workspaceId) + if (!ws) { return NextResponse.json({ error: 'Workspace not found' }, { status: 404 }) } diff --git a/apps/sim/app/api/workspaces/[id]/byok-keys/route.ts b/apps/sim/app/api/workspaces/[id]/byok-keys/route.ts index 246cc6b245..3078555350 100644 --- a/apps/sim/app/api/workspaces/[id]/byok-keys/route.ts +++ b/apps/sim/app/api/workspaces/[id]/byok-keys/route.ts @@ -1,5 +1,5 @@ import { db } from '@sim/db' -import { workspace, workspaceBYOKKeys } from '@sim/db/schema' +import { workspaceBYOKKeys } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq } from 'drizzle-orm' import { nanoid } from 'nanoid' @@ -8,7 +8,7 @@ import { z } from 'zod' import { getSession } from '@/lib/auth' import { decryptSecret, encryptSecret } from '@/lib/core/security/encryption' import { generateRequestId } from '@/lib/core/utils/request' -import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' +import { getUserEntityPermissions, getWorkspaceById } from '@/lib/workspaces/permissions/utils' const logger = createLogger('WorkspaceBYOKKeysAPI') @@ -46,8 +46,8 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ const userId = session.user.id - const ws = await db.select().from(workspace).where(eq(workspace.id, workspaceId)).limit(1) - if (!ws.length) { + const ws = await getWorkspaceById(workspaceId) + if (!ws) { return NextResponse.json({ error: 'Workspace not found' }, { status: 404 }) } diff --git a/apps/sim/app/api/workspaces/[id]/environment/route.ts b/apps/sim/app/api/workspaces/[id]/environment/route.ts index 9c1ee4eb04..f11da0ecc9 100644 --- a/apps/sim/app/api/workspaces/[id]/environment/route.ts +++ b/apps/sim/app/api/workspaces/[id]/environment/route.ts @@ -1,5 +1,5 @@ import { db } from '@sim/db' -import { environment, workspace, workspaceEnvironment } from '@sim/db/schema' +import { environment, workspaceEnvironment } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' @@ -7,7 +7,7 @@ import { z } from 'zod' import { getSession } from '@/lib/auth' import { decryptSecret, encryptSecret } from '@/lib/core/security/encryption' import { generateRequestId } from '@/lib/core/utils/request' -import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' +import { getUserEntityPermissions, getWorkspaceById } from '@/lib/workspaces/permissions/utils' const logger = createLogger('WorkspaceEnvironmentAPI') @@ -33,8 +33,8 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ const userId = session.user.id // Validate workspace exists - const ws = await db.select().from(workspace).where(eq(workspace.id, workspaceId)).limit(1) - if (!ws.length) { + const ws = await getWorkspaceById(workspaceId) + if (!ws) { return NextResponse.json({ error: 'Workspace not found' }, { status: 404 }) } diff --git a/apps/sim/app/playground/page.tsx b/apps/sim/app/playground/page.tsx index a1a6694cfb..4670b805e0 100644 --- a/apps/sim/app/playground/page.tsx +++ b/apps/sim/app/playground/page.tsx @@ -364,12 +364,30 @@ export default function PlaygroundPage() { + {}} /> + {}} /> {}} /> + +
+ true} + onRemove={() => {}} + placeholder='Add tags' + placeholderWithTags='Add another' + tagVariant='secondary' + triggerKeys={['Enter', ',']} + /> +
+
diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx index 43fee221a3..cf6973216e 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx @@ -1,16 +1,9 @@ 'use client' import type React from 'react' -import { useCallback, useEffect, useMemo, useRef, useState } from 'react' -import { Check, RepeatIcon, SplitIcon } from 'lucide-react' -import { - Badge, - Popover, - PopoverContent, - PopoverDivider, - PopoverItem, - PopoverTrigger, -} from '@/components/emcn' +import { useMemo } from 'react' +import { RepeatIcon, SplitIcon } from 'lucide-react' +import { Combobox, type ComboboxOptionGroup } from '@/components/emcn' import { extractFieldsFromSchema, parseResponseFormatSafely, @@ -21,7 +14,7 @@ import { useSubBlockStore } from '@/stores/workflows/subblock/store' import { useWorkflowStore } from '@/stores/workflows/workflow/store' /** - * Renders a tag icon with background color. + * Renders a tag icon with background color for block section headers. * * @param icon - Either a letter string or a Lucide icon component * @param color - Background color for the icon container @@ -62,14 +55,9 @@ interface OutputSelectProps { placeholder?: string /** Whether to emit output IDs or labels in onOutputSelect callback */ valueMode?: 'id' | 'label' - /** - * When true, renders the underlying popover content inline instead of in a portal. - * Useful when used inside dialogs or other portalled components that manage scroll locking. - */ - disablePopoverPortal?: boolean - /** Alignment of the popover relative to the trigger */ + /** Alignment of the dropdown relative to the trigger */ align?: 'start' | 'end' | 'center' - /** Maximum height of the popover content in pixels */ + /** Maximum height of the dropdown content in pixels */ maxHeight?: number } @@ -90,14 +78,9 @@ export function OutputSelect({ disabled = false, placeholder = 'Select outputs', valueMode = 'id', - disablePopoverPortal = false, align = 'start', maxHeight = 200, }: OutputSelectProps) { - const [open, setOpen] = useState(false) - const [highlightedIndex, setHighlightedIndex] = useState(-1) - const triggerRef = useRef(null) - const popoverRef = useRef(null) const blocks = useWorkflowStore((state) => state.blocks) const { isShowingDiff, isDiffReady, hasActiveDiff, baselineWorkflow } = useWorkflowDiffStore() const subBlockValues = useSubBlockStore((state) => @@ -206,21 +189,10 @@ export function OutputSelect({ shouldUseBaseline, ]) - /** - * Checks if an output is currently selected by comparing both ID and label - * @param o - The output object to check - * @returns True if the output is selected, false otherwise - */ - const isSelectedValue = useCallback( - (o: { id: string; label: string }) => - selectedOutputs.includes(o.id) || selectedOutputs.includes(o.label), - [selectedOutputs] - ) - /** * Gets display text for selected outputs */ - const selectedOutputsDisplayText = useMemo(() => { + const selectedDisplayText = useMemo(() => { if (!selectedOutputs || selectedOutputs.length === 0) { return placeholder } @@ -234,19 +206,27 @@ export function OutputSelect({ } if (validOutputs.length === 1) { - const output = workflowOutputs.find( - (o) => o.id === validOutputs[0] || o.label === validOutputs[0] - ) - return output?.label || placeholder + return '1 output' } return `${validOutputs.length} outputs` }, [selectedOutputs, workflowOutputs, placeholder]) /** - * Groups outputs by block and sorts by distance from starter block + * Gets the background color for a block output based on its type + * @param blockType - The type of the block + * @returns The hex color code for the block */ - const groupedOutputs = useMemo(() => { + const getOutputColor = (blockType: string) => { + const blockConfig = getBlock(blockType) + return blockConfig?.bgColor || '#2F55FF' + } + + /** + * Groups outputs by block and sorts by distance from starter block. + * Returns ComboboxOptionGroup[] for use with Combobox. + */ + const comboboxGroups = useMemo((): ComboboxOptionGroup[] => { const groups: Record = {} const blockDistances: Record = {} const edges = useWorkflowStore.getState().edges @@ -283,242 +263,75 @@ export function OutputSelect({ groups[output.blockName].push(output) }) - return Object.entries(groups) + const sortedGroups = Object.entries(groups) .map(([blockName, outputs]) => ({ blockName, outputs, distance: blockDistances[outputs[0]?.blockId] || 0, })) .sort((a, b) => b.distance - a.distance) - .reduce( - (acc, { blockName, outputs }) => { - acc[blockName] = outputs - return acc - }, - {} as Record - ) - }, [workflowOutputs, blocks]) - - /** - * Gets the background color for a block output based on its type - * @param blockId - The block ID (unused but kept for future extensibility) - * @param blockType - The type of the block - * @returns The hex color code for the block - */ - const getOutputColor = (blockId: string, blockType: string) => { - const blockConfig = getBlock(blockType) - return blockConfig?.bgColor || '#2F55FF' - } - - /** - * Flattened outputs for keyboard navigation - */ - const flattenedOutputs = useMemo(() => { - return Object.values(groupedOutputs).flat() - }, [groupedOutputs]) - - /** - * Handles output selection by toggling the selected state - * @param value - The output label to toggle - */ - const handleOutputSelection = useCallback( - (value: string) => { - const emittedValue = - valueMode === 'label' ? value : workflowOutputs.find((o) => o.label === value)?.id || value - const index = selectedOutputs.indexOf(emittedValue) - - const newSelectedOutputs = - index === -1 - ? [...new Set([...selectedOutputs, emittedValue])] - : selectedOutputs.filter((id) => id !== emittedValue) - - onOutputSelect(newSelectedOutputs) - }, - [valueMode, workflowOutputs, selectedOutputs, onOutputSelect] - ) - - /** - * Handles keyboard navigation within the output list - * Supports ArrowUp, ArrowDown, Enter, and Escape keys - */ - useEffect(() => { - if (!open || flattenedOutputs.length === 0) return - - const handleKeyboardEvent = (e: KeyboardEvent) => { - switch (e.key) { - case 'ArrowDown': - e.preventDefault() - e.stopPropagation() - setHighlightedIndex((prev) => { - if (prev === -1 || prev >= flattenedOutputs.length - 1) { - return 0 - } - return prev + 1 - }) - break - - case 'ArrowUp': - e.preventDefault() - e.stopPropagation() - setHighlightedIndex((prev) => { - if (prev <= 0) { - return flattenedOutputs.length - 1 - } - return prev - 1 - }) - break - - case 'Enter': - e.preventDefault() - e.stopPropagation() - setHighlightedIndex((currentIndex) => { - if (currentIndex >= 0 && currentIndex < flattenedOutputs.length) { - handleOutputSelection(flattenedOutputs[currentIndex].label) - } - return currentIndex - }) - break - case 'Escape': - e.preventDefault() - e.stopPropagation() - setOpen(false) - break + return sortedGroups.map(({ blockName, outputs }) => { + const firstOutput = outputs[0] + const blockConfig = getBlock(firstOutput.blockType) + const blockColor = getOutputColor(firstOutput.blockType) + + let blockIcon: string | React.ComponentType<{ className?: string }> = blockName + .charAt(0) + .toUpperCase() + + if (blockConfig?.icon) { + blockIcon = blockConfig.icon + } else if (firstOutput.blockType === 'loop') { + blockIcon = RepeatIcon + } else if (firstOutput.blockType === 'parallel') { + blockIcon = SplitIcon } - } - - window.addEventListener('keydown', handleKeyboardEvent, true) - return () => window.removeEventListener('keydown', handleKeyboardEvent, true) - }, [open, flattenedOutputs, handleOutputSelection]) - /** - * Reset highlighted index when popover opens/closes - */ - useEffect(() => { - if (open) { - const firstSelectedIndex = flattenedOutputs.findIndex((output) => isSelectedValue(output)) - setHighlightedIndex(firstSelectedIndex >= 0 ? firstSelectedIndex : -1) - } else { - setHighlightedIndex(-1) - } - }, [open, flattenedOutputs, isSelectedValue]) - - /** - * Scroll highlighted item into view - */ - useEffect(() => { - if (highlightedIndex >= 0 && popoverRef.current) { - const highlightedElement = popoverRef.current.querySelector( - `[data-option-index="${highlightedIndex}"]` - ) - if (highlightedElement) { - highlightedElement.scrollIntoView({ behavior: 'smooth', block: 'nearest' }) + return { + sectionElement: ( +
+ + {blockName} +
+ ), + items: outputs.map((output) => ({ + label: output.path, + value: valueMode === 'label' ? output.label : output.id, + })), } - } - }, [highlightedIndex]) + }) + }, [workflowOutputs, blocks, valueMode]) /** - * Closes popover when clicking outside + * Normalize selected values to match the valueMode */ - useEffect(() => { - if (!open) return - - const handleClickOutside = (event: MouseEvent) => { - const target = event.target as Node - const insideTrigger = triggerRef.current?.contains(target) - const insidePopover = popoverRef.current?.contains(target) - - if (!insideTrigger && !insidePopover) { - setOpen(false) - } - } - - document.addEventListener('mousedown', handleClickOutside) - return () => document.removeEventListener('mousedown', handleClickOutside) - }, [open]) + const normalizedSelectedValues = useMemo(() => { + return selectedOutputs + .map((val) => { + // Find the output that matches either id or label + const output = workflowOutputs.find((o) => o.id === val || o.label === val) + if (!output) return null + // Return in the format matching valueMode + return valueMode === 'label' ? output.label : output.id + }) + .filter((v): v is string => v !== null) + }, [selectedOutputs, workflowOutputs, valueMode]) return ( - - -
- { - if (disabled || workflowOutputs.length === 0) return - e.stopPropagation() - setOpen((prev) => !prev) - }} - > - {selectedOutputsDisplayText} - -
-
- -
- {Object.entries(groupedOutputs).map(([blockName, outputs], groupIndex, groupArray) => { - const startIndex = flattenedOutputs.findIndex((o) => o.blockName === blockName) - - const firstOutput = outputs[0] - const blockConfig = getBlock(firstOutput.blockType) - const blockColor = getOutputColor(firstOutput.blockId, firstOutput.blockType) - - let blockIcon: string | React.ComponentType<{ className?: string }> = blockName - .charAt(0) - .toUpperCase() - - if (blockConfig?.icon) { - blockIcon = blockConfig.icon - } else if (firstOutput.blockType === 'loop') { - blockIcon = RepeatIcon - } else if (firstOutput.blockType === 'parallel') { - blockIcon = SplitIcon - } - - return ( -
-
- - {blockName} -
- -
- {outputs.map((output, localIndex) => { - const globalIndex = startIndex + localIndex - const isHighlighted = globalIndex === highlightedIndex - - return ( - handleOutputSelection(output.label)} - onMouseEnter={() => setHighlightedIndex(globalIndex)} - > - {output.path} - {isSelectedValue(output) && } - - ) - })} -
- {groupIndex < groupArray.length - 1 && } -
- ) - })} -
-
-
+ ) } diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/a2a/a2a.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/a2a/a2a.tsx new file mode 100644 index 0000000000..f72c96bc00 --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/a2a/a2a.tsx @@ -0,0 +1,935 @@ +'use client' + +import { useCallback, useEffect, useMemo, useState } from 'react' +import { createLogger } from '@sim/logger' +import { Check, Clipboard } from 'lucide-react' +import { useParams } from 'next/navigation' +import { + Badge, + Button, + ButtonGroup, + ButtonGroupItem, + Checkbox, + Code, + Combobox, + type ComboboxOption, + Input, + Label, + TagInput, + Textarea, + Tooltip, +} from '@/components/emcn' +import { Skeleton } from '@/components/ui' +import type { AgentAuthentication, AgentCapabilities } from '@/lib/a2a/types' +import { getBaseUrl } from '@/lib/core/utils/urls' +import { normalizeInputFormatValue } from '@/lib/workflows/input-format-utils' +import { StartBlockPath, TriggerUtils } from '@/lib/workflows/triggers/triggers' +import { + useA2AAgentByWorkflow, + useCreateA2AAgent, + useDeleteA2AAgent, + usePublishA2AAgent, + useUpdateA2AAgent, +} from '@/hooks/queries/a2a/agents' +import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow' +import { useSubBlockStore } from '@/stores/workflows/subblock/store' +import { useWorkflowStore } from '@/stores/workflows/workflow/store' + +const logger = createLogger('A2ADeploy') + +interface InputFormatField { + id?: string + name?: string + type?: string + value?: unknown + collapsed?: boolean +} + +/** + * Check if a description is a default/placeholder value that should be filtered out + */ +function isDefaultDescription(desc: string | null | undefined, workflowName: string): boolean { + if (!desc) return true + const normalized = desc.toLowerCase().trim() + return ( + normalized === '' || normalized === 'new workflow' || normalized === workflowName.toLowerCase() + ) +} + +type CodeLanguage = 'curl' | 'python' | 'javascript' | 'typescript' + +const LANGUAGE_LABELS: Record = { + curl: 'cURL', + python: 'Python', + javascript: 'JavaScript', + typescript: 'TypeScript', +} + +const LANGUAGE_SYNTAX: Record = { + curl: 'javascript', + python: 'python', + javascript: 'javascript', + typescript: 'javascript', +} + +interface A2aDeployProps { + workflowId: string + workflowName: string + workflowDescription?: string | null + isDeployed: boolean + workflowNeedsRedeployment?: boolean + onSubmittingChange?: (submitting: boolean) => void + onCanSaveChange?: (canSave: boolean) => void + onAgentExistsChange?: (exists: boolean) => void + onPublishedChange?: (published: boolean) => void + onNeedsRepublishChange?: (needsRepublish: boolean) => void + onDeployWorkflow?: () => Promise +} + +type AuthScheme = 'none' | 'apiKey' + +export function A2aDeploy({ + workflowId, + workflowName, + workflowDescription, + isDeployed, + workflowNeedsRedeployment, + onSubmittingChange, + onCanSaveChange, + onAgentExistsChange, + onPublishedChange, + onNeedsRepublishChange, + onDeployWorkflow, +}: A2aDeployProps) { + const params = useParams() + const workspaceId = params.workspaceId as string + + const { data: existingAgent, isLoading } = useA2AAgentByWorkflow(workspaceId, workflowId) + + const createAgent = useCreateA2AAgent() + const updateAgent = useUpdateA2AAgent() + const deleteAgent = useDeleteA2AAgent() + const publishAgent = usePublishA2AAgent() + + const blocks = useWorkflowStore((state) => state.blocks) + const { collaborativeSetSubblockValue } = useCollaborativeWorkflow() + + const startBlockId = useMemo(() => { + if (!blocks || Object.keys(blocks).length === 0) return null + const candidate = TriggerUtils.findStartBlock(blocks, 'api') + if (!candidate || candidate.path !== StartBlockPath.UNIFIED) return null + return candidate.blockId + }, [blocks]) + + const startBlockInputFormat = useSubBlockStore((state) => { + if (!workflowId || !startBlockId) return null + const workflowValues = state.workflowValues[workflowId] + const fromStore = workflowValues?.[startBlockId]?.inputFormat + if (fromStore !== undefined) return fromStore + const startBlock = blocks[startBlockId] + return startBlock?.subBlocks?.inputFormat?.value ?? null + }) + + const missingFields = useMemo(() => { + if (!startBlockId) return { input: false, data: false, files: false, any: false } + const normalizedFields = normalizeInputFormatValue(startBlockInputFormat) + const existingNames = new Set( + normalizedFields + .map((field) => field.name) + .filter((n): n is string => typeof n === 'string' && n.trim() !== '') + .map((n) => n.trim().toLowerCase()) + ) + const missing = { + input: !existingNames.has('input'), + data: !existingNames.has('data'), + files: !existingNames.has('files'), + any: false, + } + missing.any = missing.input || missing.data || missing.files + return missing + }, [startBlockId, startBlockInputFormat]) + + const handleAddA2AInputs = useCallback(() => { + if (!startBlockId) return + + const normalizedExisting = normalizeInputFormatValue(startBlockInputFormat) + const newFields: InputFormatField[] = [] + + // Add input field if missing (for TextPart) + if (missingFields.input) { + newFields.push({ + id: crypto.randomUUID(), + name: 'input', + type: 'string', + value: '', + collapsed: false, + }) + } + + // Add data field if missing (for DataPart) + if (missingFields.data) { + newFields.push({ + id: crypto.randomUUID(), + name: 'data', + type: 'object', + value: '', + collapsed: false, + }) + } + + // Add files field if missing (for FilePart) + if (missingFields.files) { + newFields.push({ + id: crypto.randomUUID(), + name: 'files', + type: 'files', + value: '', + collapsed: false, + }) + } + + if (newFields.length > 0) { + const updatedFields = [...newFields, ...normalizedExisting] + collaborativeSetSubblockValue(startBlockId, 'inputFormat', updatedFields) + logger.info( + `Added A2A input fields to Start block: ${newFields.map((f) => f.name).join(', ')}` + ) + } + }, [startBlockId, startBlockInputFormat, missingFields, collaborativeSetSubblockValue]) + + const [name, setName] = useState('') + const [description, setDescription] = useState('') + const [authScheme, setAuthScheme] = useState('apiKey') + const [pushNotificationsEnabled, setPushNotificationsEnabled] = useState(false) + const [skillTags, setSkillTags] = useState(['workflow', 'automation']) + const [language, setLanguage] = useState('curl') + const [useStreamingExample, setUseStreamingExample] = useState(false) + const [copied, setCopied] = useState(false) + + useEffect(() => { + if (existingAgent) { + setName(existingAgent.name) + const savedDesc = existingAgent.description || '' + setDescription(isDefaultDescription(savedDesc, workflowName) ? '' : savedDesc) + setPushNotificationsEnabled(existingAgent.capabilities?.pushNotifications ?? false) + const schemes = existingAgent.authentication?.schemes || [] + if (schemes.includes('apiKey')) { + setAuthScheme('apiKey') + } else { + setAuthScheme('none') + } + const skills = existingAgent.skills as Array<{ tags?: string[] }> | undefined + const savedTags = skills?.[0]?.tags + setSkillTags(savedTags?.length ? savedTags : ['workflow', 'automation']) + } else { + setName(workflowName) + setDescription( + isDefaultDescription(workflowDescription, workflowName) ? '' : workflowDescription || '' + ) + setAuthScheme('apiKey') + setPushNotificationsEnabled(false) + setSkillTags(['workflow', 'automation']) + } + }, [existingAgent, workflowName, workflowDescription]) + + useEffect(() => { + onAgentExistsChange?.(!!existingAgent) + }, [existingAgent, onAgentExistsChange]) + + useEffect(() => { + onPublishedChange?.(existingAgent?.isPublished ?? false) + }, [existingAgent?.isPublished, onPublishedChange]) + + const hasFormChanges = useMemo(() => { + if (!existingAgent) return false + const savedSchemes = existingAgent.authentication?.schemes || [] + const savedAuthScheme = savedSchemes.includes('apiKey') ? 'apiKey' : 'none' + const savedDesc = existingAgent.description || '' + const normalizedSavedDesc = isDefaultDescription(savedDesc, workflowName) ? '' : savedDesc + const skills = existingAgent.skills as Array<{ tags?: string[] }> | undefined + const savedTags = skills?.[0]?.tags || ['workflow', 'automation'] + const tagsChanged = + skillTags.length !== savedTags.length || skillTags.some((t, i) => t !== savedTags[i]) + return ( + name !== existingAgent.name || + description !== normalizedSavedDesc || + pushNotificationsEnabled !== (existingAgent.capabilities?.pushNotifications ?? false) || + authScheme !== savedAuthScheme || + tagsChanged + ) + }, [ + existingAgent, + name, + description, + pushNotificationsEnabled, + authScheme, + skillTags, + workflowName, + ]) + + const hasWorkflowChanges = useMemo(() => { + if (!existingAgent) return false + return !!workflowNeedsRedeployment + }, [existingAgent, workflowNeedsRedeployment]) + + const needsRepublish = existingAgent && (hasFormChanges || hasWorkflowChanges) + + useEffect(() => { + onNeedsRepublishChange?.(!!needsRepublish) + }, [needsRepublish, onNeedsRepublishChange]) + + const authSchemeOptions: ComboboxOption[] = useMemo( + () => [ + { label: 'API Key', value: 'apiKey' }, + { label: 'None (Public)', value: 'none' }, + ], + [] + ) + + const canSave = name.trim().length > 0 && description.trim().length > 0 + useEffect(() => { + onCanSaveChange?.(canSave) + }, [canSave, onCanSaveChange]) + + const isSubmitting = + createAgent.isPending || + updateAgent.isPending || + deleteAgent.isPending || + publishAgent.isPending + + useEffect(() => { + onSubmittingChange?.(isSubmitting) + }, [isSubmitting, onSubmittingChange]) + + const handleCreateOrUpdate = useCallback(async () => { + const capabilities: AgentCapabilities = { + streaming: true, + pushNotifications: pushNotificationsEnabled, + stateTransitionHistory: true, + } + + const authentication: AgentAuthentication = { + schemes: authScheme === 'none' ? ['none'] : [authScheme], + } + + try { + if (existingAgent) { + await updateAgent.mutateAsync({ + agentId: existingAgent.id, + name: name.trim(), + description: description.trim() || undefined, + capabilities, + authentication, + skillTags, + }) + } else { + await createAgent.mutateAsync({ + workspaceId, + workflowId, + name: name.trim(), + description: description.trim() || undefined, + capabilities, + authentication, + skillTags, + }) + } + } catch (error) { + logger.error('Failed to save A2A agent:', error) + } + }, [ + existingAgent, + name, + description, + pushNotificationsEnabled, + authScheme, + skillTags, + workspaceId, + workflowId, + createAgent, + updateAgent, + ]) + + const handlePublish = useCallback(async () => { + if (!existingAgent) return + try { + await publishAgent.mutateAsync({ + agentId: existingAgent.id, + workspaceId, + action: 'publish', + }) + } catch (error) { + logger.error('Failed to publish A2A agent:', error) + } + }, [existingAgent, workspaceId, publishAgent]) + + const handleUnpublish = useCallback(async () => { + if (!existingAgent) return + try { + await publishAgent.mutateAsync({ + agentId: existingAgent.id, + workspaceId, + action: 'unpublish', + }) + } catch (error) { + logger.error('Failed to unpublish A2A agent:', error) + } + }, [existingAgent, workspaceId, publishAgent]) + + const handleDelete = useCallback(async () => { + if (!existingAgent) return + try { + await deleteAgent.mutateAsync({ + agentId: existingAgent.id, + workspaceId, + }) + setName(workflowName) + setDescription(workflowDescription || '') + } catch (error) { + logger.error('Failed to delete A2A agent:', error) + } + }, [existingAgent, workspaceId, deleteAgent, workflowName, workflowDescription]) + + const handlePublishNewAgent = useCallback(async () => { + const capabilities: AgentCapabilities = { + streaming: true, + pushNotifications: pushNotificationsEnabled, + stateTransitionHistory: true, + } + + const authentication: AgentAuthentication = { + schemes: authScheme === 'none' ? ['none'] : [authScheme], + } + + try { + if (!isDeployed && onDeployWorkflow) { + await onDeployWorkflow() + } + + const newAgent = await createAgent.mutateAsync({ + workspaceId, + workflowId, + name: name.trim(), + description: description.trim() || undefined, + capabilities, + authentication, + skillTags, + }) + + await publishAgent.mutateAsync({ + agentId: newAgent.id, + workspaceId, + action: 'publish', + }) + } catch (error) { + logger.error('Failed to publish A2A agent:', error) + } + }, [ + name, + description, + pushNotificationsEnabled, + authScheme, + skillTags, + workspaceId, + workflowId, + createAgent, + publishAgent, + isDeployed, + onDeployWorkflow, + ]) + + const handleUpdateAndRepublish = useCallback(async () => { + if (!existingAgent) return + + const capabilities: AgentCapabilities = { + streaming: true, + pushNotifications: pushNotificationsEnabled, + stateTransitionHistory: true, + } + + const authentication: AgentAuthentication = { + schemes: authScheme === 'none' ? ['none'] : [authScheme], + } + + try { + if (!isDeployed && onDeployWorkflow) { + await onDeployWorkflow() + } + + await updateAgent.mutateAsync({ + agentId: existingAgent.id, + name: name.trim(), + description: description.trim() || undefined, + capabilities, + authentication, + skillTags, + }) + + await publishAgent.mutateAsync({ + agentId: existingAgent.id, + workspaceId, + action: 'publish', + }) + } catch (error) { + logger.error('Failed to update and republish A2A agent:', error) + } + }, [ + existingAgent, + isDeployed, + onDeployWorkflow, + name, + description, + pushNotificationsEnabled, + authScheme, + skillTags, + workspaceId, + updateAgent, + publishAgent, + ]) + + const baseUrl = getBaseUrl() + const endpoint = existingAgent ? `${baseUrl}/api/a2a/serve/${existingAgent.id}` : null + + const additionalInputFields = useMemo(() => { + const allFields = normalizeInputFormatValue(startBlockInputFormat) + return allFields.filter( + (field): field is InputFormatField & { name: string } => + !!field.name && + field.name.toLowerCase() !== 'input' && + field.name.toLowerCase() !== 'data' && + field.name.toLowerCase() !== 'files' + ) + }, [startBlockInputFormat]) + + const getExampleInputData = useCallback((): Record => { + const data: Record = {} + for (const field of additionalInputFields) { + switch (field.type) { + case 'string': + data[field.name] = 'example' + break + case 'number': + data[field.name] = 42 + break + case 'boolean': + data[field.name] = true + break + case 'object': + data[field.name] = { key: 'value' } + break + case 'array': + data[field.name] = [1, 2, 3] + break + default: + data[field.name] = 'example' + } + } + return data + }, [additionalInputFields]) + + const getJsonRpcPayload = useCallback((): Record => { + const inputData = getExampleInputData() + const hasAdditionalData = Object.keys(inputData).length > 0 + + // Build parts array: TextPart for message text, DataPart for additional fields + const parts: Array> = [{ kind: 'text', text: 'Hello, agent!' }] + if (hasAdditionalData) { + parts.push({ kind: 'data', data: inputData }) + } + + return { + jsonrpc: '2.0', + id: '1', + method: useStreamingExample ? 'message/stream' : 'message/send', + params: { + message: { + role: 'user', + parts, + }, + }, + } + }, [getExampleInputData, useStreamingExample]) + + const getCurlCommand = useCallback((): string => { + if (!endpoint) return '' + const payload = getJsonRpcPayload() + const requiresAuth = authScheme !== 'none' + + switch (language) { + case 'curl': + return requiresAuth + ? `curl -X POST \\ + -H "X-API-Key: $SIM_API_KEY" \\ + -H "Content-Type: application/json" \\ + -d '${JSON.stringify(payload)}' \\ + ${endpoint}` + : `curl -X POST \\ + -H "Content-Type: application/json" \\ + -d '${JSON.stringify(payload)}' \\ + ${endpoint}` + + case 'python': + return requiresAuth + ? `import os +import requests + +response = requests.post( + "${endpoint}", + headers={ + "X-API-Key": os.environ.get("SIM_API_KEY"), + "Content-Type": "application/json" + }, + json=${JSON.stringify(payload, null, 4).replace(/\n/g, '\n ')} +) + +print(response.json())` + : `import requests + +response = requests.post( + "${endpoint}", + headers={"Content-Type": "application/json"}, + json=${JSON.stringify(payload, null, 4).replace(/\n/g, '\n ')} +) + +print(response.json())` + + case 'javascript': + return requiresAuth + ? `const response = await fetch("${endpoint}", { + method: "POST", + headers: { + "X-API-Key": process.env.SIM_API_KEY, + "Content-Type": "application/json" + }, + body: JSON.stringify(${JSON.stringify(payload)}) +}); + +const data = await response.json(); +console.log(data);` + : `const response = await fetch("${endpoint}", { + method: "POST", + headers: {"Content-Type": "application/json"}, + body: JSON.stringify(${JSON.stringify(payload)}) +}); + +const data = await response.json(); +console.log(data);` + + case 'typescript': + return requiresAuth + ? `const response = await fetch("${endpoint}", { + method: "POST", + headers: { + "X-API-Key": process.env.SIM_API_KEY, + "Content-Type": "application/json" + }, + body: JSON.stringify(${JSON.stringify(payload)}) +}); + +const data: Record = await response.json(); +console.log(data);` + : `const response = await fetch("${endpoint}", { + method: "POST", + headers: {"Content-Type": "application/json"}, + body: JSON.stringify(${JSON.stringify(payload)}) +}); + +const data: Record = await response.json(); +console.log(data);` + + default: + return '' + } + }, [endpoint, language, getJsonRpcPayload, authScheme]) + + const handleCopyCommand = useCallback(() => { + navigator.clipboard.writeText(getCurlCommand()) + setCopied(true) + setTimeout(() => setCopied(false), 2000) + }, [getCurlCommand]) + + if (isLoading) { + return ( +
+
+ + + +
+
+ + +
+
+ + +
+
+ + +
+
+ ) + } + + return ( +
{ + e.preventDefault() + handleCreateOrUpdate() + }} + className='-mx-1 space-y-[12px] overflow-y-auto px-1 pb-[16px]' + > + {/* Endpoint URL (shown when agent exists) */} + {existingAgent && endpoint && ( +
+ +
+
+ {baseUrl.replace(/^https?:\/\//, '')}/api/a2a/serve/ +
+
+ + + + + + + {copied ? 'Copied' : 'Copy'} + + +
+
+

+ The A2A endpoint URL where clients can discover and call your agent +

+
+ )} + + {/* Agent Name */} +
+ + setName(e.target.value)} + placeholder='Enter agent name' + required + /> +

+ Human-readable name shown in the Agent Card +

+
+ + {/* Description */} +
+ +