Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 49 additions & 79 deletions web/src/composite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@
// Claude Sonnet: $3/$15 per MTok (synthesize)
// Expected total: $0.01-0.03 per composite query

import type { LLMMessage, ToolResult as LLMToolResult } from '@stackbilt/llm-providers';
import { askGroqJson } from './groq.js';
import { buildContext, handleInProcessTool, callMcpWithRetry, resolveMcpTool } from './claude.js';
import { toOpenAiTools, extractText, extractToolCalls, extractUsage, type AiChatResponse } from './workers-ai-chat.js';
import { toOpenAiTools } from './workers-ai-chat.js';
import { McpClient, McpRegistry } from './mcp-client.js';
import { operatorConfig } from './operator/index.js';
import { buildPersonaPreamble } from './operator/prompt-builder.js';
import { getCognitiveState, formatCognitiveContext } from './kernel/cognition.js';
import { getAttachedBlocks, assembleBlockContext } from './kernel/memory/blocks.js';
import { getConversationHistory, budgetConversationHistory } from './kernel/memory/index.js';
import { classifyCourtCard, type CourtCard, type CourtCardProfile } from './kernel/court-cards.js';
import { buildLLMProviderFactory } from './kernel/provider-factory.js';
import type { KernelIntent } from './kernel/types.js';
import type { EdgeEnv } from './kernel/dispatch.js';

Expand All @@ -43,10 +45,6 @@ interface SubtaskResult {
// ─── Cost rates ─────────────────────────────────────────────

const GROQ_GPT_OSS_RATES = { input: 0.15, output: 0.60 };
const CF_GPT_OSS_RATES = { input: 0.35, output: 0.75 };
const CLAUDE_SONNET_RATES = { input: 3, output: 15 };
const CLAUDE_OPUS_RATES = { input: 15, output: 75 };

// ─── Phase 1: Orchestrate ───────────────────────────────────

const ORCHESTRATOR_SYSTEM = `You are a task decomposition engine. Given a user query and a list of available tools, decompose the query into subtasks that can be executed independently.
Expand Down Expand Up @@ -160,13 +158,7 @@ function validateDagIntent(userQuery: string, dag: ExecutionDAG): boolean {
return true;
}

// ─── Phase 2: Gather (CF Workers AI tool loop) ─────────────

type ChatMessage =
| { role: 'system'; content: string }
| { role: 'user'; content: string }
| { role: 'assistant'; content: string; tool_calls?: Array<{ id: string; type: string; function: { name: string; arguments: string } }> }
| { role: 'tool'; tool_call_id: string; content: string };
// ─── Phase 2: Gather (provider-backed Workers AI tool loop) ──

const MAX_GATHER_ROUNDS = 6;

Expand Down Expand Up @@ -210,7 +202,6 @@ async function synthesize(
courtCard?: CourtCardProfile,
): Promise<{ text: string; cost: number }> {
const model = useOpus ? env.opusModel : env.claudeModel;
const rates = useOpus ? CLAUDE_OPUS_RATES : CLAUDE_SONNET_RATES;

const subtaskSummary = subtaskResults.map(r => {
// Include raw gathered data so synthesis can recover structured values the analysis step may have dropped
Expand All @@ -233,38 +224,17 @@ async function synthesize(
}
} catch { /* non-fatal — synthesize without cognitive context */ }

const response = await fetch(`${env.anthropicBaseUrl}/v1/messages`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': env.anthropicApiKey,
'anthropic-version': '2023-06-01',
},
body: JSON.stringify({
model,
max_tokens: 4096,
system: `${buildPersonaPreamble()} Synthesize the analyzed subtask results into a coherent, actionable answer. Speak as AEGIS — the co-founder who knows the business inside-out. Be thorough but concise. Reference specific products, numbers, and context from the portfolio below. Never give generic consultant advice; give the answer a co-founder would give.${courtCard ? ` ${courtCard.synthesisVoice}` : ''}${contextSuffix}`,
messages: [{
role: 'user',
content: `Original query: ${intent.raw}\n\nSynthesis instruction: ${synthesisInstruction}\n\nSubtask results:\n${subtaskSummary}`,
}],
}),
const result = await buildLLMProviderFactory(env).generateResponse({
model,
maxTokens: 4096,
systemPrompt: `${buildPersonaPreamble()} Synthesize the analyzed subtask results into a coherent, actionable answer. Speak as AEGIS — the co-founder who knows the business inside-out. Be thorough but concise. Reference specific products, numbers, and context from the portfolio below. Never give generic consultant advice; give the answer a co-founder would give.${courtCard ? ` ${courtCard.synthesisVoice}` : ''}${contextSuffix}`,
messages: [{
role: 'user',
content: `Original query: ${intent.raw}\n\nSynthesis instruction: ${synthesisInstruction}\n\nSubtask results:\n${subtaskSummary}`,
}],
});

if (!response.ok) {
const err = await response.text();
throw new Error(`Anthropic API error ${response.status}: ${err}`);
}

const data = await response.json<{
content: Array<{ type: string; text?: string }>;
usage: { input_tokens: number; output_tokens: number };
}>();

const text = data.content.filter(b => b.type === 'text').map(b => b.text ?? '').join('');
const cost = (data.usage.input_tokens * rates.input + data.usage.output_tokens * rates.output) / 1_000_000;

return { text: text || '(no synthesis)', cost };
return { text: result.message || '(no synthesis)', cost: result.usage.cost };
}

// ─── Groq synthesis fallback ────────────────────────────────
Expand Down Expand Up @@ -363,7 +333,10 @@ export async function executeComposite(
githubRepo: env.githubRepo,
braveApiKey: env.braveApiKey,
roundtableDb: env.roundtableDb,
memoryBinding: env.memoryBinding,
resendApiKeys: { resendApiKey: env.resendApiKey, resendApiKeyPersonal: env.resendApiKeyPersonal },
userQuery: intent.raw,
edgeEnv: env,
}, env.roundtableDb);

// Load conversation history for context continuity
Expand Down Expand Up @@ -602,7 +575,7 @@ async function gatherSubtaskInstrumented(
? `Original request: ${originalQuery}\n\nYour subtask: ${subtask.description}\n\nIMPORTANT: Use exact identifiers (UUIDs, IDs, enum values) from the original request when calling tools.`
: subtask.description;

const messages: ChatMessage[] = [
const messages: LLMMessage[] = [
{ role: 'system', content: `${systemPrompt}\n\nFocus: ${subtask.description}\nGather the data needed and return your findings.` },
{ role: 'user', content: userContent },
];
Expand All @@ -613,29 +586,26 @@ async function gatherSubtaskInstrumented(
// Tool loop — up to MAX_GATHER_ROUNDS
for (let round = 0; round < MAX_GATHER_ROUNDS; round++) {
subrequestCount += 1; // 1 AI call per round
const result = await env.ai.run(env.gptOssModel as Parameters<Ai['run']>[0], {
messages,
...(openAiTools.length > 0 ? { tools: openAiTools } : {}),
max_tokens: 2048,
const result = await buildLLMProviderFactory(env).generateResponse({
messages: [...messages],
model: env.gptOssModel,
...(openAiTools.length > 0 ? { tools: openAiTools as Parameters<ReturnType<typeof buildLLMProviderFactory>['generateResponse']>[0]['tools'] } : {}),
maxTokens: 2048,
temperature: 0.2,
top_p: 0.9,
frequency_penalty: 0.3,
} as Record<string, unknown>) as AiChatResponse;

const usage = extractUsage(result);
if (usage) {
totalCost += (usage.prompt_tokens * CF_GPT_OSS_RATES.input
+ usage.completion_tokens * CF_GPT_OSS_RATES.output) / 1_000_000;
}
topP: 0.9,
frequencyPenalty: 0.3,
});

const toolCalls = extractToolCalls(result);
const responseText = extractText(result);
totalCost += result.usage.cost;
const toolCalls = result.toolCalls ?? [];
const responseText = result.message;

if (toolCalls.length === 0) {
return { gathered: responseText ?? '(no data gathered)', cost: totalCost, subrequestCount };
}

messages.push({ role: 'assistant', content: responseText ?? '', tool_calls: toolCalls });
messages.push({ role: 'assistant', content: responseText ?? '', toolCalls });
const toolResults: LLMToolResult[] = [];

for (const call of toolCalls) {
subrequestCount += 1; // 1 subrequest per tool call (external fetch or DB query)
Expand All @@ -650,6 +620,7 @@ async function gatherSubtaskInstrumented(
{ apiKey: env.anthropicApiKey, model: env.claudeModel, baseUrl: env.anthropicBaseUrl },
env.memoryBinding,
{ resendApiKey: env.resendApiKey, resendApiKeyPersonal: env.resendApiKeyPersonal },
env,
);

if (inProcess !== null) {
Expand All @@ -663,26 +634,29 @@ async function gatherSubtaskInstrumented(
}
}

messages.push({ role: 'tool', tool_call_id: call.id, content: toolResult });
toolResults.push({ id: call.id, output: toolResult });
}
messages.push({ role: 'user', content: '', toolResults });
}

// Force summary if tool rounds exhausted
// Condense messages: strip tool_calls metadata and truncate tool results
// to prevent context overflow when sending to GPT-OSS without tools definition
const condensedGather: ChatMessage[] = [messages[0]]; // system
const condensedGather: LLMMessage[] = [messages[0]]; // system
const gatherFindings: string[] = [];
for (let i = 1; i < messages.length; i++) {
const msg = messages[i];
if (msg.role === 'user' && !('tool_call_id' in msg)) {
condensedGather.push(msg);
if (msg.role === 'user' && !msg.toolResults) {
condensedGather.push({ role: 'user', content: msg.content });
} else if (msg.role === 'assistant' && msg.content) {
gatherFindings.push(msg.content);
} else if (msg.role === 'tool') {
const truncated = msg.content.length > 2000
? msg.content.slice(0, 2000) + '... [truncated]'
: msg.content;
gatherFindings.push(truncated);
} else if (msg.toolResults) {
for (const toolResult of msg.toolResults) {
const truncated = toolResult.output.length > 2000
? toolResult.output.slice(0, 2000) + '... [truncated]'
: toolResult.output;
gatherFindings.push(truncated);
}
}
}
if (gatherFindings.length > 0) {
Expand All @@ -699,17 +673,13 @@ async function gatherSubtaskInstrumented(
condensedGather.push({ role: 'user', content: 'Summarize all data gathered so far. Return the raw findings.' });

subrequestCount += 1;
const summaryResult = await env.ai.run(env.gptOssModel as Parameters<Ai['run']>[0], {
const summaryResult = await buildLLMProviderFactory(env).generateResponse({
messages: condensedGather,
max_tokens: 2048,
model: env.gptOssModel,
maxTokens: 2048,
temperature: 0.2,
} as Record<string, unknown>) as AiChatResponse;

const summaryUsage = extractUsage(summaryResult);
if (summaryUsage) {
totalCost += (summaryUsage.prompt_tokens * CF_GPT_OSS_RATES.input
+ summaryUsage.completion_tokens * CF_GPT_OSS_RATES.output) / 1_000_000;
}
});

return { gathered: extractText(summaryResult) ?? '(gather exhausted)', cost: totalCost, subrequestCount };
totalCost += summaryResult.usage.cost;
return { gathered: summaryResult.message ?? '(gather exhausted)', cost: totalCost, subrequestCount };
}
54 changes: 36 additions & 18 deletions web/tests/composite.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ vi.mock('../src/groq.js', () => ({
askGroqJson: (...args: unknown[]) => mockAskGroqJson(...args),
}));

// Mock provider factory for gather + Claude synthesis
const mockProviderGenerateResponse = vi.fn();
vi.mock('../src/kernel/provider-factory.js', () => ({
buildLLMProviderFactory: () => ({
generateResponse: mockProviderGenerateResponse,
}),
}));

// Mock Workers AI chat for gather fallback and DAG drift fallback
const mockExecuteWorkersAiChat = vi.fn().mockResolvedValue({ text: 'Workers AI fallback', cost: 0.002 });
vi.mock('../src/workers-ai-chat.js', () => ({
Expand All @@ -91,10 +99,6 @@ vi.mock('../src/workers-ai-chat.js', () => ({
extractUsage: vi.fn().mockReturnValue({ prompt_tokens: 100, completion_tokens: 50 }),
}));

// Mock global fetch for Claude synthesis
const mockFetch = vi.fn();
vi.stubGlobal('fetch', mockFetch);

const { executeComposite } = await import('../src/composite.js');
const { buildContext: _buildContext, handleInProcessTool: _handleInProcessTool, callMcpWithRetry: _callMcpWithRetry, resolveMcpTool: _resolveMcpTool } = await import('../src/claude.js');
const { extractText: _extractText, extractToolCalls: _extractToolCalls, extractUsage: _extractUsage } = await import('../src/workers-ai-chat.js');
Expand Down Expand Up @@ -151,6 +155,17 @@ function makeEnv(overrides?: Partial<EdgeEnv>): EdgeEnv {
};
}

function providerResponse(message: string, cost = 0.0001, toolCalls?: Array<{ id: string; function: { name: string; arguments: string } }>) {
return {
message,
usage: { cost, inputTokens: 100, outputTokens: 50, totalTokens: 150 },
model: 'test-model',
provider: 'test-provider',
responseTime: 10,
toolCalls: toolCalls?.map(call => ({ ...call, type: 'function' as const })),
};
}

describe('executeComposite', () => {
beforeEach(() => {
vi.resetAllMocks();
Expand All @@ -172,6 +187,7 @@ describe('executeComposite', () => {
vi.mocked(_budgetConversationHistory).mockReturnValue([]);
vi.mocked(_getCognitiveState).mockResolvedValue(null);
vi.mocked(_formatCognitiveContext).mockReturnValue('');
mockProviderGenerateResponse.mockResolvedValue(providerResponse('gathered data'));
});

it('falls back to gpt_oss when orchestration fails', async () => {
Expand Down Expand Up @@ -220,11 +236,13 @@ describe('executeComposite', () => {
usage: { prompt_tokens: 150, completion_tokens: 75 },
});

// Phase 4: Claude synthesis
mockFetch.mockResolvedValue(new Response(JSON.stringify({
content: [{ type: 'text', text: 'Everything looks good. No overdue items and routine activity.' }],
usage: { input_tokens: 500, output_tokens: 200 },
}), { status: 200, headers: { 'Content-Type': 'application/json' } }));
mockProviderGenerateResponse
// Phase 2: Gather subtask 1
.mockResolvedValueOnce(providerResponse('gathered data'))
// Phase 2: Gather subtask 2
.mockResolvedValueOnce(providerResponse('gathered data'))
// Phase 4: Claude synthesis
.mockResolvedValueOnce(providerResponse('Everything looks good. No overdue items and routine activity.'));

const result = await executeComposite(makeIntent('check compliance and review activity'), makeEnv());

Expand Down Expand Up @@ -269,8 +287,13 @@ describe('executeComposite', () => {
usage: { prompt_tokens: 200, completion_tokens: 100 },
});

// Claude synthesis fails with credit error
mockFetch.mockResolvedValue(new Response('Your credit balance is too low', { status: 400 }));
mockProviderGenerateResponse
// Gather subtask 1
.mockResolvedValueOnce(providerResponse('gathered data'))
// Gather subtask 2
.mockResolvedValueOnce(providerResponse('gathered data'))
// Claude synthesis fails with credit error
.mockRejectedValueOnce(new Error('Your credit balance is too low'));

const result = await executeComposite(makeIntent('analyze data and check metrics'), makeEnv());
expect(result.text).toBe('Groq synthesized result');
Expand Down Expand Up @@ -300,11 +323,6 @@ describe('executeComposite', () => {
usage: { prompt_tokens: 100, completion_tokens: 50 },
});

mockFetch.mockResolvedValue(new Response(JSON.stringify({
content: [{ type: 'text', text: 'Synthesized' }],
usage: { input_tokens: 300, output_tokens: 100 },
}), { status: 200, headers: { 'Content-Type': 'application/json' } }));

// Set maxCost very low to trigger budget exhaustion
const result = await executeComposite(makeIntent('do task one and task two'), makeEnv(), undefined, 0.0001);
expect(result.meta).toBeDefined();
Expand All @@ -330,7 +348,7 @@ describe('executeComposite', () => {
// No analyze or synthesize calls — just the orchestrate + 1 gather
// askGroqJson called only once (for orchestrate)
expect(mockAskGroqJson).toHaveBeenCalledOnce();
// No Claude fetch (no synthesis)
expect(mockFetch).not.toHaveBeenCalled();
// Provider called once for gather only; no synthesis call.
expect(mockProviderGenerateResponse).toHaveBeenCalledOnce();
});
});