-
Notifications
You must be signed in to change notification settings - Fork 56
Upgrade agent platform to unified orchestrator with shared protocol #33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8fafe92
ced5466
e4da148
72cdda3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,155 @@ | ||
| #!/usr/bin/env -S deno run --allow-net --allow-env | ||
|
|
||
| /** | ||
| * Edge Agents Platform — End-to-End Demo | ||
| * | ||
| * Demonstrates the unified agent platform: | ||
| * 1. Health check on the gateway | ||
| * 2. Direct query (simple question handled by gateway) | ||
| * 3. Research query (routed to agent_alpha) | ||
| * 4. Database query (routed to agent_beta) | ||
| * 5. Multi-agent workflow (research → handoff to database) | ||
| * | ||
| * Usage: | ||
| * GATEWAY_URL=http://localhost:54321/functions/v1/gateway \ | ||
| * AUTH_TOKEN=your-token \ | ||
| * deno run --allow-net --allow-env scripts/demo/run-demo.ts | ||
| */ | ||
|
|
||
| const GATEWAY_URL = Deno.env.get("GATEWAY_URL") || "http://localhost:54321/functions/v1/gateway"; | ||
| const AUTH_TOKEN = Deno.env.get("AUTH_TOKEN") || "test-token"; | ||
|
|
||
| const headers = { | ||
| "Content-Type": "application/json", | ||
| "Authorization": `Bearer ${AUTH_TOKEN}`, | ||
| }; | ||
|
|
||
| function log(label: string, data: unknown) { | ||
| console.log(`\n${"=".repeat(60)}`); | ||
| console.log(` ${label}`); | ||
| console.log("=".repeat(60)); | ||
| console.log(typeof data === "string" ? data : JSON.stringify(data, null, 2)); | ||
| } | ||
|
|
||
| async function demo() { | ||
| console.log("\n Edge Agents Platform — End-to-End Demo"); | ||
| console.log(` Gateway: ${GATEWAY_URL}\n`); | ||
|
|
||
| // 1. Health Check | ||
| try { | ||
| log("1. Health Check (GET)", "Checking gateway status..."); | ||
| const health = await fetch(GATEWAY_URL, { headers }); | ||
| const healthData = await health.json(); | ||
| log("1. Health Check — Response", healthData); | ||
| } catch (error) { | ||
| log("1. Health Check — Error", { | ||
| error: (error as Error).message, | ||
| hint: "Make sure the gateway is running. Try: supabase functions serve gateway", | ||
| }); | ||
| console.log("\nDemo requires a running gateway. Showing example requests instead.\n"); | ||
| showExampleRequests(); | ||
| return; | ||
| } | ||
|
|
||
| // 2. Direct Query | ||
| try { | ||
| log("2. Direct Query", "Sending simple question (handled directly by gateway)..."); | ||
| const direct = await fetch(GATEWAY_URL, { | ||
| method: "POST", | ||
| headers, | ||
| body: JSON.stringify({ | ||
| message: "What is 2 + 2?", | ||
| model: "fast", | ||
| }), | ||
| }); | ||
| const directData = await direct.json(); | ||
| log("2. Direct Query — Response", directData); | ||
| } catch (error) { | ||
| log("2. Direct Query — Error", (error as Error).message); | ||
| } | ||
|
|
||
| // 3. Research Query | ||
| try { | ||
| log("3. Research Query", "Sending research query (routes to agent_alpha)..."); | ||
| const research = await fetch(GATEWAY_URL, { | ||
| method: "POST", | ||
| headers, | ||
| body: JSON.stringify({ | ||
| message: "Research the latest developments in edge computing and AI agents. Summarize key trends.", | ||
| model: "balanced", | ||
| }), | ||
| }); | ||
| const researchData = await research.json(); | ||
| log("3. Research Query — Response", researchData); | ||
| } catch (error) { | ||
| log("3. Research Query — Error", (error as Error).message); | ||
| } | ||
|
|
||
| // 4. Database Query | ||
| try { | ||
| log("4. Database Query", "Sending database query (routes to agent_beta)..."); | ||
| const db = await fetch(GATEWAY_URL, { | ||
| method: "POST", | ||
| headers, | ||
| body: JSON.stringify({ | ||
| message: "Query the agent_logs table and show me the most recent 5 entries.", | ||
| model: "balanced", | ||
| }), | ||
| }); | ||
| const dbData = await db.json(); | ||
| log("4. Database Query — Response", dbData); | ||
| } catch (error) { | ||
| log("4. Database Query — Error", (error as Error).message); | ||
| } | ||
|
|
||
| // 5. Multi-Step Workflow | ||
| try { | ||
| const workflowId = crypto.randomUUID(); | ||
| log("5. Multi-Step Workflow", `Starting workflow ${workflowId}...`); | ||
| const workflow = await fetch(GATEWAY_URL, { | ||
| method: "POST", | ||
| headers, | ||
| body: JSON.stringify({ | ||
| message: "Research the current state of serverless edge computing, then check our database for any related agent logs.", | ||
| model: "powerful", | ||
| workflow_id: workflowId, | ||
| }), | ||
| }); | ||
| const workflowData = await workflow.json(); | ||
| log("5. Multi-Step Workflow — Response", workflowData); | ||
| } catch (error) { | ||
| log("5. Multi-Step Workflow — Error", (error as Error).message); | ||
| } | ||
|
|
||
| console.log("\n" + "=".repeat(60)); | ||
| console.log(" Demo Complete!"); | ||
| console.log("=".repeat(60) + "\n"); | ||
| } | ||
|
|
||
| function showExampleRequests() { | ||
| console.log("Example curl commands you can run once the gateway is up:\n"); | ||
|
|
||
| console.log("# Health check"); | ||
| console.log(`curl ${GATEWAY_URL}\n`); | ||
|
|
||
| console.log("# Simple query"); | ||
| console.log(`curl -X POST ${GATEWAY_URL} \\`); | ||
| console.log(` -H "Content-Type: application/json" \\`); | ||
| console.log(` -H "Authorization: Bearer \${AUTH_TOKEN}" \\`); | ||
| console.log(` -d '{"message": "What is edge computing?", "model": "fast"}'\n`); | ||
|
|
||
| console.log("# Research query (routes to agent_alpha)"); | ||
| console.log(`curl -X POST ${GATEWAY_URL} \\`); | ||
| console.log(` -H "Content-Type: application/json" \\`); | ||
| console.log(` -H "Authorization: Bearer \${AUTH_TOKEN}" \\`); | ||
| console.log(` -d '{"message": "Research the latest AI agent frameworks", "model": "balanced"}'\n`); | ||
|
|
||
| console.log("# Database query (routes to agent_beta)"); | ||
| console.log(`curl -X POST ${GATEWAY_URL} \\`); | ||
| console.log(` -H "Content-Type: application/json" \\`); | ||
| console.log(` -H "Authorization: Bearer \${AUTH_TOKEN}" \\`); | ||
| console.log(` -d '{"message": "Show me recent entries from the agent_logs table", "model": "balanced"}'\n`); | ||
| } | ||
|
|
||
| // Run demo | ||
| demo().catch(console.error); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,133 @@ | ||
| /** | ||
| * Shared Supabase Channel Helpers | ||
| * | ||
| * Standardized channel management for inter-agent communication. | ||
| * Extracted from agent-manager and agent_alpha patterns. | ||
| */ | ||
|
|
||
| import { createClient, SupabaseClient } from "https://esm.sh/@supabase/supabase-js@2.38.4"; | ||
| import { AgentMessage, isValidAgentMessage } from "./protocol.ts"; | ||
|
|
||
| // Track active channel subscriptions to prevent duplicates | ||
| const activeChannels: Map<string, ReturnType<SupabaseClient['channel']>> = new Map(); | ||
|
|
||
| /** | ||
| * Get or create a Supabase client from environment variables | ||
| */ | ||
| export function getSupabaseClient(): SupabaseClient { | ||
| const url = Deno.env.get("SB_URL") || Deno.env.get("SUPABASE_URL") || ""; | ||
| const key = Deno.env.get("SB_SERVICE_KEY") || Deno.env.get("SUPABASE_SERVICE_ROLE_KEY") || ""; | ||
|
|
||
| if (!url || !key) { | ||
| throw new Error("Supabase credentials not configured. Set SB_URL and SB_SERVICE_KEY."); | ||
| } | ||
|
|
||
| return createClient(url, key); | ||
| } | ||
|
|
||
| /** | ||
| * Idempotent channel subscription — returns existing channel if already subscribed | ||
| */ | ||
| export async function createChannel( | ||
| supabase: SupabaseClient, | ||
| name: string | ||
| ): Promise<ReturnType<SupabaseClient['channel']>> { | ||
| const existing = activeChannels.get(name); | ||
| if (existing) return existing; | ||
|
|
||
| const channel = supabase.channel(name); | ||
|
|
||
| return new Promise((resolve, reject) => { | ||
| const timeout = setTimeout(() => { | ||
| reject(new Error(`Channel subscription timeout: ${name}`)); | ||
| }, 10000); | ||
|
|
||
| channel.subscribe((status: string, err?: Error) => { | ||
| if (status === "SUBSCRIBED") { | ||
| clearTimeout(timeout); | ||
| activeChannels.set(name, channel); | ||
| resolve(channel); | ||
| } | ||
| if (err) { | ||
| clearTimeout(timeout); | ||
| reject(err); | ||
| } | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Send a validated AgentMessage to a channel | ||
| */ | ||
| export async function sendMessage( | ||
| supabase: SupabaseClient, | ||
| channelName: string, | ||
| message: AgentMessage | ||
| ): Promise<boolean> { | ||
| if (!isValidAgentMessage(message)) { | ||
| console.error(`[channel] Invalid message format, refusing to send to ${channelName}`); | ||
| return false; | ||
| } | ||
|
|
||
| try { | ||
| const channel = await createChannel(supabase, channelName); | ||
| await channel.send({ | ||
| type: "broadcast", | ||
| event: "message", | ||
| payload: message, | ||
| }); | ||
| return true; | ||
| } catch (error) { | ||
| console.error(`[channel] Error sending to ${channelName}:`, error); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Wait for a response matching a correlationId with timeout | ||
| */ | ||
| export function waitForResponse( | ||
| supabase: SupabaseClient, | ||
| channelName: string, | ||
| correlationId: string, | ||
| timeoutMs = 25000 | ||
| ): Promise<AgentMessage> { | ||
| return new Promise(async (resolve, reject) => { | ||
| const timeout = setTimeout(() => { | ||
| reject(new Error(`Response timeout after ${timeoutMs}ms for correlationId: ${correlationId}`)); | ||
| }, timeoutMs); | ||
|
|
||
| try { | ||
| const channel = await createChannel(supabase, channelName); | ||
|
|
||
| channel.on("broadcast", { event: "message" }, (payload: { payload?: unknown }) => { | ||
| const msg = payload?.payload; | ||
| if ( | ||
|
Comment on lines
+101
to
+105
|
||
| isValidAgentMessage(msg) && | ||
| msg.correlationId === correlationId && | ||
| (msg.type === "response" || msg.type === "error") | ||
| ) { | ||
| clearTimeout(timeout); | ||
| resolve(msg); | ||
| } | ||
| }); | ||
| } catch (error) { | ||
| clearTimeout(timeout); | ||
| reject(error); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Clean up a channel subscription | ||
| */ | ||
| export async function removeChannel( | ||
| supabase: SupabaseClient, | ||
| name: string | ||
| ): Promise<void> { | ||
| const channel = activeChannels.get(name); | ||
| if (channel) { | ||
| await supabase.removeChannel(channel); | ||
| activeChannels.delete(name); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
activeChannelscaches channels indefinitely. Because many call sites construct per-request response channel names that include correlation IDs, this map can grow without bound in a warm Edge Function instance. Consider not caching ephemeral response channels, and/or ensure channels are removed viaremoveChannelonce a request completes or times out.