From cc377e231635259679995f90877ad5ca063a3cd7 Mon Sep 17 00:00:00 2001 From: Aegis Date: Tue, 2 Jun 2026 10:29:06 -0500 Subject: [PATCH] fix: route composite executor through providers --- web/src/composite.ts | 128 ++++++++++++++---------------------- web/tests/composite.test.ts | 54 ++++++++++----- 2 files changed, 85 insertions(+), 97 deletions(-) diff --git a/web/src/composite.ts b/web/src/composite.ts index 2be9c82..be8e15f 100755 --- a/web/src/composite.ts +++ b/web/src/composite.ts @@ -7,9 +7,10 @@ // Claude Sonnet: $3/$15 per MTok (synthesize) // Expected total: $0.01-0.03 per composite query +import type { LLMMessage, ToolResult as LLMToolResult } from '@stackbilt/llm-providers'; import { askGroqJson } from './groq.js'; import { buildContext, handleInProcessTool, callMcpWithRetry, resolveMcpTool } from './claude.js'; -import { toOpenAiTools, extractText, extractToolCalls, extractUsage, type AiChatResponse } from './workers-ai-chat.js'; +import { toOpenAiTools } from './workers-ai-chat.js'; import { McpClient, McpRegistry } from './mcp-client.js'; import { operatorConfig } from './operator/index.js'; import { buildPersonaPreamble } from './operator/prompt-builder.js'; @@ -17,6 +18,7 @@ import { getCognitiveState, formatCognitiveContext } from './kernel/cognition.js import { getAttachedBlocks, assembleBlockContext } from './kernel/memory/blocks.js'; import { getConversationHistory, budgetConversationHistory } from './kernel/memory/index.js'; import { classifyCourtCard, type CourtCard, type CourtCardProfile } from './kernel/court-cards.js'; +import { buildLLMProviderFactory } from './kernel/provider-factory.js'; import type { KernelIntent } from './kernel/types.js'; import type { EdgeEnv } from './kernel/dispatch.js'; @@ -43,10 +45,6 @@ interface SubtaskResult { // ─── Cost rates ───────────────────────────────────────────── const GROQ_GPT_OSS_RATES = { input: 0.15, output: 0.60 }; -const CF_GPT_OSS_RATES = { input: 0.35, output: 0.75 }; -const CLAUDE_SONNET_RATES = { input: 3, output: 15 }; -const CLAUDE_OPUS_RATES = { input: 15, output: 75 }; - // ─── Phase 1: Orchestrate ─────────────────────────────────── const ORCHESTRATOR_SYSTEM = `You are a task decomposition engine. Given a user query and a list of available tools, decompose the query into subtasks that can be executed independently. @@ -160,13 +158,7 @@ function validateDagIntent(userQuery: string, dag: ExecutionDAG): boolean { return true; } -// ─── Phase 2: Gather (CF Workers AI tool loop) ───────────── - -type ChatMessage = - | { role: 'system'; content: string } - | { role: 'user'; content: string } - | { role: 'assistant'; content: string; tool_calls?: Array<{ id: string; type: string; function: { name: string; arguments: string } }> } - | { role: 'tool'; tool_call_id: string; content: string }; +// ─── Phase 2: Gather (provider-backed Workers AI tool loop) ── const MAX_GATHER_ROUNDS = 6; @@ -210,7 +202,6 @@ async function synthesize( courtCard?: CourtCardProfile, ): Promise<{ text: string; cost: number }> { const model = useOpus ? env.opusModel : env.claudeModel; - const rates = useOpus ? CLAUDE_OPUS_RATES : CLAUDE_SONNET_RATES; const subtaskSummary = subtaskResults.map(r => { // Include raw gathered data so synthesis can recover structured values the analysis step may have dropped @@ -233,38 +224,17 @@ async function synthesize( } } catch { /* non-fatal — synthesize without cognitive context */ } - const response = await fetch(`${env.anthropicBaseUrl}/v1/messages`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'x-api-key': env.anthropicApiKey, - 'anthropic-version': '2023-06-01', - }, - body: JSON.stringify({ - model, - max_tokens: 4096, - system: `${buildPersonaPreamble()} Synthesize the analyzed subtask results into a coherent, actionable answer. Speak as AEGIS — the co-founder who knows the business inside-out. Be thorough but concise. Reference specific products, numbers, and context from the portfolio below. Never give generic consultant advice; give the answer a co-founder would give.${courtCard ? ` ${courtCard.synthesisVoice}` : ''}${contextSuffix}`, - messages: [{ - role: 'user', - content: `Original query: ${intent.raw}\n\nSynthesis instruction: ${synthesisInstruction}\n\nSubtask results:\n${subtaskSummary}`, - }], - }), + const result = await buildLLMProviderFactory(env).generateResponse({ + model, + maxTokens: 4096, + systemPrompt: `${buildPersonaPreamble()} Synthesize the analyzed subtask results into a coherent, actionable answer. Speak as AEGIS — the co-founder who knows the business inside-out. Be thorough but concise. Reference specific products, numbers, and context from the portfolio below. Never give generic consultant advice; give the answer a co-founder would give.${courtCard ? ` ${courtCard.synthesisVoice}` : ''}${contextSuffix}`, + messages: [{ + role: 'user', + content: `Original query: ${intent.raw}\n\nSynthesis instruction: ${synthesisInstruction}\n\nSubtask results:\n${subtaskSummary}`, + }], }); - if (!response.ok) { - const err = await response.text(); - throw new Error(`Anthropic API error ${response.status}: ${err}`); - } - - const data = await response.json<{ - content: Array<{ type: string; text?: string }>; - usage: { input_tokens: number; output_tokens: number }; - }>(); - - const text = data.content.filter(b => b.type === 'text').map(b => b.text ?? '').join(''); - const cost = (data.usage.input_tokens * rates.input + data.usage.output_tokens * rates.output) / 1_000_000; - - return { text: text || '(no synthesis)', cost }; + return { text: result.message || '(no synthesis)', cost: result.usage.cost }; } // ─── Groq synthesis fallback ──────────────────────────────── @@ -363,7 +333,10 @@ export async function executeComposite( githubRepo: env.githubRepo, braveApiKey: env.braveApiKey, roundtableDb: env.roundtableDb, + memoryBinding: env.memoryBinding, + resendApiKeys: { resendApiKey: env.resendApiKey, resendApiKeyPersonal: env.resendApiKeyPersonal }, userQuery: intent.raw, + edgeEnv: env, }, env.roundtableDb); // Load conversation history for context continuity @@ -602,7 +575,7 @@ async function gatherSubtaskInstrumented( ? `Original request: ${originalQuery}\n\nYour subtask: ${subtask.description}\n\nIMPORTANT: Use exact identifiers (UUIDs, IDs, enum values) from the original request when calling tools.` : subtask.description; - const messages: ChatMessage[] = [ + const messages: LLMMessage[] = [ { role: 'system', content: `${systemPrompt}\n\nFocus: ${subtask.description}\nGather the data needed and return your findings.` }, { role: 'user', content: userContent }, ]; @@ -613,29 +586,26 @@ async function gatherSubtaskInstrumented( // Tool loop — up to MAX_GATHER_ROUNDS for (let round = 0; round < MAX_GATHER_ROUNDS; round++) { subrequestCount += 1; // 1 AI call per round - const result = await env.ai.run(env.gptOssModel as Parameters[0], { - messages, - ...(openAiTools.length > 0 ? { tools: openAiTools } : {}), - max_tokens: 2048, + const result = await buildLLMProviderFactory(env).generateResponse({ + messages: [...messages], + model: env.gptOssModel, + ...(openAiTools.length > 0 ? { tools: openAiTools as Parameters['generateResponse']>[0]['tools'] } : {}), + maxTokens: 2048, temperature: 0.2, - top_p: 0.9, - frequency_penalty: 0.3, - } as Record) as AiChatResponse; - - const usage = extractUsage(result); - if (usage) { - totalCost += (usage.prompt_tokens * CF_GPT_OSS_RATES.input - + usage.completion_tokens * CF_GPT_OSS_RATES.output) / 1_000_000; - } + topP: 0.9, + frequencyPenalty: 0.3, + }); - const toolCalls = extractToolCalls(result); - const responseText = extractText(result); + totalCost += result.usage.cost; + const toolCalls = result.toolCalls ?? []; + const responseText = result.message; if (toolCalls.length === 0) { return { gathered: responseText ?? '(no data gathered)', cost: totalCost, subrequestCount }; } - messages.push({ role: 'assistant', content: responseText ?? '', tool_calls: toolCalls }); + messages.push({ role: 'assistant', content: responseText ?? '', toolCalls }); + const toolResults: LLMToolResult[] = []; for (const call of toolCalls) { subrequestCount += 1; // 1 subrequest per tool call (external fetch or DB query) @@ -650,6 +620,7 @@ async function gatherSubtaskInstrumented( { apiKey: env.anthropicApiKey, model: env.claudeModel, baseUrl: env.anthropicBaseUrl }, env.memoryBinding, { resendApiKey: env.resendApiKey, resendApiKeyPersonal: env.resendApiKeyPersonal }, + env, ); if (inProcess !== null) { @@ -663,26 +634,29 @@ async function gatherSubtaskInstrumented( } } - messages.push({ role: 'tool', tool_call_id: call.id, content: toolResult }); + toolResults.push({ id: call.id, output: toolResult }); } + messages.push({ role: 'user', content: '', toolResults }); } // Force summary if tool rounds exhausted // Condense messages: strip tool_calls metadata and truncate tool results // to prevent context overflow when sending to GPT-OSS without tools definition - const condensedGather: ChatMessage[] = [messages[0]]; // system + const condensedGather: LLMMessage[] = [messages[0]]; // system const gatherFindings: string[] = []; for (let i = 1; i < messages.length; i++) { const msg = messages[i]; - if (msg.role === 'user' && !('tool_call_id' in msg)) { - condensedGather.push(msg); + if (msg.role === 'user' && !msg.toolResults) { + condensedGather.push({ role: 'user', content: msg.content }); } else if (msg.role === 'assistant' && msg.content) { gatherFindings.push(msg.content); - } else if (msg.role === 'tool') { - const truncated = msg.content.length > 2000 - ? msg.content.slice(0, 2000) + '... [truncated]' - : msg.content; - gatherFindings.push(truncated); + } else if (msg.toolResults) { + for (const toolResult of msg.toolResults) { + const truncated = toolResult.output.length > 2000 + ? toolResult.output.slice(0, 2000) + '... [truncated]' + : toolResult.output; + gatherFindings.push(truncated); + } } } if (gatherFindings.length > 0) { @@ -699,17 +673,13 @@ async function gatherSubtaskInstrumented( condensedGather.push({ role: 'user', content: 'Summarize all data gathered so far. Return the raw findings.' }); subrequestCount += 1; - const summaryResult = await env.ai.run(env.gptOssModel as Parameters[0], { + const summaryResult = await buildLLMProviderFactory(env).generateResponse({ messages: condensedGather, - max_tokens: 2048, + model: env.gptOssModel, + maxTokens: 2048, temperature: 0.2, - } as Record) as AiChatResponse; - - const summaryUsage = extractUsage(summaryResult); - if (summaryUsage) { - totalCost += (summaryUsage.prompt_tokens * CF_GPT_OSS_RATES.input - + summaryUsage.completion_tokens * CF_GPT_OSS_RATES.output) / 1_000_000; - } + }); - return { gathered: extractText(summaryResult) ?? '(gather exhausted)', cost: totalCost, subrequestCount }; + totalCost += summaryResult.usage.cost; + return { gathered: summaryResult.message ?? '(gather exhausted)', cost: totalCost, subrequestCount }; } diff --git a/web/tests/composite.test.ts b/web/tests/composite.test.ts index 8c22b45..e96d9a2 100755 --- a/web/tests/composite.test.ts +++ b/web/tests/composite.test.ts @@ -78,6 +78,14 @@ vi.mock('../src/groq.js', () => ({ askGroqJson: (...args: unknown[]) => mockAskGroqJson(...args), })); +// Mock provider factory for gather + Claude synthesis +const mockProviderGenerateResponse = vi.fn(); +vi.mock('../src/kernel/provider-factory.js', () => ({ + buildLLMProviderFactory: () => ({ + generateResponse: mockProviderGenerateResponse, + }), +})); + // Mock Workers AI chat for gather fallback and DAG drift fallback const mockExecuteWorkersAiChat = vi.fn().mockResolvedValue({ text: 'Workers AI fallback', cost: 0.002 }); vi.mock('../src/workers-ai-chat.js', () => ({ @@ -91,10 +99,6 @@ vi.mock('../src/workers-ai-chat.js', () => ({ extractUsage: vi.fn().mockReturnValue({ prompt_tokens: 100, completion_tokens: 50 }), })); -// Mock global fetch for Claude synthesis -const mockFetch = vi.fn(); -vi.stubGlobal('fetch', mockFetch); - const { executeComposite } = await import('../src/composite.js'); const { buildContext: _buildContext, handleInProcessTool: _handleInProcessTool, callMcpWithRetry: _callMcpWithRetry, resolveMcpTool: _resolveMcpTool } = await import('../src/claude.js'); const { extractText: _extractText, extractToolCalls: _extractToolCalls, extractUsage: _extractUsage } = await import('../src/workers-ai-chat.js'); @@ -151,6 +155,17 @@ function makeEnv(overrides?: Partial): EdgeEnv { }; } +function providerResponse(message: string, cost = 0.0001, toolCalls?: Array<{ id: string; function: { name: string; arguments: string } }>) { + return { + message, + usage: { cost, inputTokens: 100, outputTokens: 50, totalTokens: 150 }, + model: 'test-model', + provider: 'test-provider', + responseTime: 10, + toolCalls: toolCalls?.map(call => ({ ...call, type: 'function' as const })), + }; +} + describe('executeComposite', () => { beforeEach(() => { vi.resetAllMocks(); @@ -172,6 +187,7 @@ describe('executeComposite', () => { vi.mocked(_budgetConversationHistory).mockReturnValue([]); vi.mocked(_getCognitiveState).mockResolvedValue(null); vi.mocked(_formatCognitiveContext).mockReturnValue(''); + mockProviderGenerateResponse.mockResolvedValue(providerResponse('gathered data')); }); it('falls back to gpt_oss when orchestration fails', async () => { @@ -220,11 +236,13 @@ describe('executeComposite', () => { usage: { prompt_tokens: 150, completion_tokens: 75 }, }); - // Phase 4: Claude synthesis - mockFetch.mockResolvedValue(new Response(JSON.stringify({ - content: [{ type: 'text', text: 'Everything looks good. No overdue items and routine activity.' }], - usage: { input_tokens: 500, output_tokens: 200 }, - }), { status: 200, headers: { 'Content-Type': 'application/json' } })); + mockProviderGenerateResponse + // Phase 2: Gather subtask 1 + .mockResolvedValueOnce(providerResponse('gathered data')) + // Phase 2: Gather subtask 2 + .mockResolvedValueOnce(providerResponse('gathered data')) + // Phase 4: Claude synthesis + .mockResolvedValueOnce(providerResponse('Everything looks good. No overdue items and routine activity.')); const result = await executeComposite(makeIntent('check compliance and review activity'), makeEnv()); @@ -269,8 +287,13 @@ describe('executeComposite', () => { usage: { prompt_tokens: 200, completion_tokens: 100 }, }); - // Claude synthesis fails with credit error - mockFetch.mockResolvedValue(new Response('Your credit balance is too low', { status: 400 })); + mockProviderGenerateResponse + // Gather subtask 1 + .mockResolvedValueOnce(providerResponse('gathered data')) + // Gather subtask 2 + .mockResolvedValueOnce(providerResponse('gathered data')) + // Claude synthesis fails with credit error + .mockRejectedValueOnce(new Error('Your credit balance is too low')); const result = await executeComposite(makeIntent('analyze data and check metrics'), makeEnv()); expect(result.text).toBe('Groq synthesized result'); @@ -300,11 +323,6 @@ describe('executeComposite', () => { usage: { prompt_tokens: 100, completion_tokens: 50 }, }); - mockFetch.mockResolvedValue(new Response(JSON.stringify({ - content: [{ type: 'text', text: 'Synthesized' }], - usage: { input_tokens: 300, output_tokens: 100 }, - }), { status: 200, headers: { 'Content-Type': 'application/json' } })); - // Set maxCost very low to trigger budget exhaustion const result = await executeComposite(makeIntent('do task one and task two'), makeEnv(), undefined, 0.0001); expect(result.meta).toBeDefined(); @@ -330,7 +348,7 @@ describe('executeComposite', () => { // No analyze or synthesize calls — just the orchestrate + 1 gather // askGroqJson called only once (for orchestrate) expect(mockAskGroqJson).toHaveBeenCalledOnce(); - // No Claude fetch (no synthesis) - expect(mockFetch).not.toHaveBeenCalled(); + // Provider called once for gather only; no synthesis call. + expect(mockProviderGenerateResponse).toHaveBeenCalledOnce(); }); });