Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

155 changes: 155 additions & 0 deletions scripts/demo/run-demo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#!/usr/bin/env -S deno run --allow-net --allow-env

/**
* Edge Agents Platform — End-to-End Demo
*
* Demonstrates the unified agent platform:
* 1. Health check on the gateway
* 2. Direct query (simple question handled by gateway)
* 3. Research query (routed to agent_alpha)
* 4. Database query (routed to agent_beta)
* 5. Multi-agent workflow (research → handoff to database)
*
* Usage:
* GATEWAY_URL=http://localhost:54321/functions/v1/gateway \
* AUTH_TOKEN=your-token \
* deno run --allow-net --allow-env scripts/demo/run-demo.ts
*/

const GATEWAY_URL = Deno.env.get("GATEWAY_URL") || "http://localhost:54321/functions/v1/gateway";
const AUTH_TOKEN = Deno.env.get("AUTH_TOKEN") || "test-token";

const headers = {
"Content-Type": "application/json",
"Authorization": `Bearer ${AUTH_TOKEN}`,
};

function log(label: string, data: unknown) {
console.log(`\n${"=".repeat(60)}`);
console.log(` ${label}`);
console.log("=".repeat(60));
console.log(typeof data === "string" ? data : JSON.stringify(data, null, 2));
}

async function demo() {
console.log("\n Edge Agents Platform — End-to-End Demo");
console.log(` Gateway: ${GATEWAY_URL}\n`);

// 1. Health Check
try {
log("1. Health Check (GET)", "Checking gateway status...");
const health = await fetch(GATEWAY_URL, { headers });
const healthData = await health.json();
log("1. Health Check — Response", healthData);
} catch (error) {
log("1. Health Check — Error", {
error: (error as Error).message,
hint: "Make sure the gateway is running. Try: supabase functions serve gateway",
});
console.log("\nDemo requires a running gateway. Showing example requests instead.\n");
showExampleRequests();
return;
}

// 2. Direct Query
try {
log("2. Direct Query", "Sending simple question (handled directly by gateway)...");
const direct = await fetch(GATEWAY_URL, {
method: "POST",
headers,
body: JSON.stringify({
message: "What is 2 + 2?",
model: "fast",
}),
});
const directData = await direct.json();
log("2. Direct Query — Response", directData);
} catch (error) {
log("2. Direct Query — Error", (error as Error).message);
}

// 3. Research Query
try {
log("3. Research Query", "Sending research query (routes to agent_alpha)...");
const research = await fetch(GATEWAY_URL, {
method: "POST",
headers,
body: JSON.stringify({
message: "Research the latest developments in edge computing and AI agents. Summarize key trends.",
model: "balanced",
}),
});
const researchData = await research.json();
log("3. Research Query — Response", researchData);
} catch (error) {
log("3. Research Query — Error", (error as Error).message);
}

// 4. Database Query
try {
log("4. Database Query", "Sending database query (routes to agent_beta)...");
const db = await fetch(GATEWAY_URL, {
method: "POST",
headers,
body: JSON.stringify({
message: "Query the agent_logs table and show me the most recent 5 entries.",
model: "balanced",
}),
});
const dbData = await db.json();
log("4. Database Query — Response", dbData);
} catch (error) {
log("4. Database Query — Error", (error as Error).message);
}

// 5. Multi-Step Workflow
try {
const workflowId = crypto.randomUUID();
log("5. Multi-Step Workflow", `Starting workflow ${workflowId}...`);
const workflow = await fetch(GATEWAY_URL, {
method: "POST",
headers,
body: JSON.stringify({
message: "Research the current state of serverless edge computing, then check our database for any related agent logs.",
model: "powerful",
workflow_id: workflowId,
}),
});
const workflowData = await workflow.json();
log("5. Multi-Step Workflow — Response", workflowData);
} catch (error) {
log("5. Multi-Step Workflow — Error", (error as Error).message);
}

console.log("\n" + "=".repeat(60));
console.log(" Demo Complete!");
console.log("=".repeat(60) + "\n");
}

function showExampleRequests() {
console.log("Example curl commands you can run once the gateway is up:\n");

console.log("# Health check");
console.log(`curl ${GATEWAY_URL}\n`);

console.log("# Simple query");
console.log(`curl -X POST ${GATEWAY_URL} \\`);
console.log(` -H "Content-Type: application/json" \\`);
console.log(` -H "Authorization: Bearer \${AUTH_TOKEN}" \\`);
console.log(` -d '{"message": "What is edge computing?", "model": "fast"}'\n`);

console.log("# Research query (routes to agent_alpha)");
console.log(`curl -X POST ${GATEWAY_URL} \\`);
console.log(` -H "Content-Type: application/json" \\`);
console.log(` -H "Authorization: Bearer \${AUTH_TOKEN}" \\`);
console.log(` -d '{"message": "Research the latest AI agent frameworks", "model": "balanced"}'\n`);

console.log("# Database query (routes to agent_beta)");
console.log(`curl -X POST ${GATEWAY_URL} \\`);
console.log(` -H "Content-Type: application/json" \\`);
console.log(` -H "Authorization: Bearer \${AUTH_TOKEN}" \\`);
console.log(` -d '{"message": "Show me recent entries from the agent_logs table", "model": "balanced"}'\n`);
}

// Run demo
demo().catch(console.error);
133 changes: 133 additions & 0 deletions supabase/functions/_shared/channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/**
* Shared Supabase Channel Helpers
*
* Standardized channel management for inter-agent communication.
* Extracted from agent-manager and agent_alpha patterns.
*/

import { createClient, SupabaseClient } from "https://esm.sh/@supabase/supabase-js@2.38.4";
import { AgentMessage, isValidAgentMessage } from "./protocol.ts";

// Track active channel subscriptions to prevent duplicates
const activeChannels: Map<string, ReturnType<SupabaseClient['channel']>> = new Map();

/**
* Get or create a Supabase client from environment variables
*/
export function getSupabaseClient(): SupabaseClient {
const url = Deno.env.get("SB_URL") || Deno.env.get("SUPABASE_URL") || "";
const key = Deno.env.get("SB_SERVICE_KEY") || Deno.env.get("SUPABASE_SERVICE_ROLE_KEY") || "";

if (!url || !key) {
throw new Error("Supabase credentials not configured. Set SB_URL and SB_SERVICE_KEY.");
}

return createClient(url, key);
}

/**
* Idempotent channel subscription — returns existing channel if already subscribed
*/
export async function createChannel(
supabase: SupabaseClient,
name: string
): Promise<ReturnType<SupabaseClient['channel']>> {
const existing = activeChannels.get(name);
if (existing) return existing;

const channel = supabase.channel(name);

return new Promise((resolve, reject) => {
Comment on lines +35 to +40
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

activeChannels caches channels indefinitely. Because many call sites construct per-request response channel names that include correlation IDs, this map can grow without bound in a warm Edge Function instance. Consider not caching ephemeral response channels, and/or ensure channels are removed via removeChannel once a request completes or times out.

Copilot uses AI. Check for mistakes.
const timeout = setTimeout(() => {
reject(new Error(`Channel subscription timeout: ${name}`));
}, 10000);

channel.subscribe((status: string, err?: Error) => {
if (status === "SUBSCRIBED") {
clearTimeout(timeout);
activeChannels.set(name, channel);
resolve(channel);
}
if (err) {
clearTimeout(timeout);
reject(err);
}
});
});
}

/**
* Send a validated AgentMessage to a channel
*/
export async function sendMessage(
supabase: SupabaseClient,
channelName: string,
message: AgentMessage
): Promise<boolean> {
if (!isValidAgentMessage(message)) {
console.error(`[channel] Invalid message format, refusing to send to ${channelName}`);
return false;
}

try {
const channel = await createChannel(supabase, channelName);
await channel.send({
type: "broadcast",
event: "message",
payload: message,
});
return true;
} catch (error) {
console.error(`[channel] Error sending to ${channelName}:`, error);
return false;
}
}

/**
* Wait for a response matching a correlationId with timeout
*/
export function waitForResponse(
supabase: SupabaseClient,
channelName: string,
correlationId: string,
timeoutMs = 25000
): Promise<AgentMessage> {
return new Promise(async (resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error(`Response timeout after ${timeoutMs}ms for correlationId: ${correlationId}`));
}, timeoutMs);

try {
const channel = await createChannel(supabase, channelName);

channel.on("broadcast", { event: "message" }, (payload: { payload?: unknown }) => {
const msg = payload?.payload;
if (
Comment on lines +101 to +105
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitForResponse registers a channel.on(...) handler but never removes it after resolve/timeout. Over time this can accumulate handlers for the same channel and lead to duplicated callbacks and memory leaks. Consider unsubscribing/removing the handler and removing ephemeral channels in a finally block when the promise settles.

Copilot uses AI. Check for mistakes.
isValidAgentMessage(msg) &&
msg.correlationId === correlationId &&
(msg.type === "response" || msg.type === "error")
) {
clearTimeout(timeout);
resolve(msg);
}
});
} catch (error) {
clearTimeout(timeout);
reject(error);
}
});
}

/**
* Clean up a channel subscription
*/
export async function removeChannel(
supabase: SupabaseClient,
name: string
): Promise<void> {
const channel = activeChannels.get(name);
if (channel) {
await supabase.removeChannel(channel);
activeChannels.delete(name);
}
}
Loading